Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/apache/jk/common/ChannelSocket.java


1   /*
2    *  Copyright 1999-2005 The Apache Software Foundation
3    *
4    *  Licensed under the Apache License, Version 2.0 (the "License");
5    *  you may not use this file except in compliance with the License.
6    *  You may obtain a copy of the License at
7    *
8    *      http://www.apache.org/licenses/LICENSE-2.0
9    *
10   *  Unless required by applicable law or agreed to in writing, software
11   *  distributed under the License is distributed on an "AS IS" BASIS,
12   *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   *  See the License for the specific language governing permissions and
14   *  limitations under the License.
15   */
16  
17  package org.apache.jk.common;
18  
19  import java.io.BufferedInputStream;
20  import java.io.BufferedOutputStream;
21  import java.io.IOException;
22  import java.io.InputStream;
23  import java.io.OutputStream;
24  import java.net.URLEncoder;
25  import java.net.InetAddress;
26  import java.net.ServerSocket;
27  import java.net.Socket;
28  import java.net.SocketException;
29  
30  import javax.management.ListenerNotFoundException;
31  import javax.management.MBeanNotificationInfo;
32  import javax.management.Notification;
33  import javax.management.NotificationBroadcaster;
34  import javax.management.NotificationBroadcasterSupport;
35  import javax.management.NotificationFilter;
36  import javax.management.NotificationListener;
37  import javax.management.ObjectName;
38  
39  import org.apache.commons.modeler.Registry;
40  import org.apache.jk.core.JkHandler;
41  import org.apache.jk.core.Msg;
42  import org.apache.jk.core.MsgContext;
43  import org.apache.jk.core.JkChannel;
44  import org.apache.jk.core.WorkerEnv;
45  import org.apache.coyote.Request;
46  import org.apache.coyote.RequestGroupInfo;
47  import org.apache.coyote.RequestInfo;
48  import org.apache.tomcat.util.threads.ThreadPool;
49  import org.apache.tomcat.util.threads.ThreadPoolRunnable;
50  
51  /** 
52   * Accept ( and send ) TCP messages.
53   *
54   * @author Costin Manolache
55   * @author Bill Barker
56   * jmx:mbean name="jk:service=ChannelNioSocket"
57   *            description="Accept socket connections"
58   * jmx:notification name="org.apache.coyote.INVOKE
59   * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
60   * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
61   * jmx:notification-handler name="org.apache.jk.JK_FLUSH
62   *
63   * Jk can use multiple protocols/transports.
64   * Various container adapters should load this object ( as a bean ),
65   * set configurations and use it. Note that the connector will handle
66   * all incoming protocols - it's not specific to ajp1x. The protocol
67   * is abstracted by MsgContext/Message/Channel.
68   *
69   * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
70   * TCP, Ajp14 API etc.
71   * As we add other protocols/transports/APIs this will change, the current goal
72   * is to get the same level of functionality as in the original jk connector.
73   *
74   * XXX Make the 'message type' pluggable
75   */
76  public class ChannelSocket extends JkHandler
77      implements NotificationBroadcaster, JkChannel {
78      private static org.apache.commons.logging.Log log =
79          org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class );
80  
81      private int startPort=8009;
82      private int maxPort=8019; // 0 for backward compat.
83      private int port=startPort;
84      private InetAddress inet;
85      private int serverTimeout;
86      private boolean tcpNoDelay=true; // nodelay to true by default
87      private int linger=100;
88      private int socketTimeout;
89      private int bufferSize = -1;
90  
91      private long requestCount=0;
92      
93      ThreadPool tp=ThreadPool.createThreadPool(true);
94  
95      /* ==================== Tcp socket options ==================== */
96  
97      /**
98       * jmx:managed-constructor description="default constructor"
99       */
100     public ChannelSocket() {
101         // This should be integrated with the  domain setup
102     }
103     
104     public ThreadPool getThreadPool() {
105         return tp;
106     }
107 
108     public long getRequestCount() {
109         return requestCount;
110     }
111     
112     /** Set the port for the ajp13 channel.
113      *  To support seemless load balancing and jni, we treat this
114      *  as the 'base' port - we'll try up until we find one that is not
115      *  used. We'll also provide the 'difference' to the main coyote
116      *  handler - that will be our 'sessionID' and the position in
117      *  the scoreboard and the suffix for the unix domain socket.
118      *
119      * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
120      */
121     public void setPort( int port ) {
122         this.startPort=port;
123         this.port=port;
124         this.maxPort=port+10;
125     }
126 
127     public int getPort() {
128         return port;
129     }
130 
131     public void setAddress(InetAddress inet) {
132         this.inet=inet;
133     }
134 
135     /**
136      * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
137      */
138     public void setAddress(String inet) {
139         try {
140             this.inet= InetAddress.getByName( inet );
141         } catch( Exception ex ) {
142             log.error("Error parsing "+inet,ex);
143         }
144     }
145 
146     public String getAddress() {
147         if( inet!=null)
148             return inet.toString();
149         return "/0.0.0.0";
150     }
151 
152     /**
153      * Sets the timeout in ms of the server sockets created by this
154      * server. This method allows the developer to make servers
155      * more or less responsive to having their server sockets
156      * shut down.
157      *
158      * <p>By default this value is 1000ms.
159      */
160     public void setServerTimeout(int timeout) {
161   this.serverTimeout = timeout;
162     }
163     public int getServerTimeout() {
164         return serverTimeout;
165     }
166 
167     public void setTcpNoDelay( boolean b ) {
168   tcpNoDelay=b;
169     }
170 
171     public boolean getTcpNoDelay() {
172         return tcpNoDelay;
173     }
174     
175     public void setSoLinger( int i ) {
176   linger=i;
177     }
178 
179     public int getSoLinger() {
180         return linger;
181     }
182     
183     public void setSoTimeout( int i ) {
184   socketTimeout=i;
185     }
186 
187     public int getSoTimeout() {
188   return socketTimeout;
189     }
190 
191     public void setMaxPort( int i ) {
192         maxPort=i;
193     }
194 
195     public int getMaxPort() {
196         return maxPort;
197     }
198 
199     public void setBufferSize(int bs) {
200         bufferSize = bs;
201     }
202 
203     public int getBufferSize() {
204         return bufferSize;
205     }
206 
207     /** At startup we'll look for the first free port in the range.
208         The difference between this port and the beggining of the range
209         is the 'id'.
210         This is usefull for lb cases ( less config ).
211     */
212     public int getInstanceId() {
213         return port-startPort;
214     }
215 
216     /** If set to false, the thread pool will be created in
217      *  non-daemon mode, and will prevent main from exiting
218      */
219     public void setDaemon( boolean b ) {
220         tp.setDaemon( b );
221     }
222 
223     public boolean getDaemon() {
224         return tp.getDaemon();
225     }
226 
227 
228     public void setMaxThreads( int i ) {
229         if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
230         tp.setMaxThreads(i);
231     }
232     
233     public void setMinSpareThreads( int i ) {
234         if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
235         tp.setMinSpareThreads(i);
236     }
237     
238     public void setMaxSpareThreads( int i ) {
239         if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
240         tp.setMaxSpareThreads(i);
241     }
242 
243     public int getMaxThreads() {
244         return tp.getMaxThreads();   
245     }
246     
247     public int getMinSpareThreads() {
248         return tp.getMinSpareThreads();   
249     }
250 
251     public int getMaxSpareThreads() {
252         return tp.getMaxSpareThreads();   
253     }
254 
255     public void setBacklog(int i) {
256     }
257     
258     
259     /* ==================== ==================== */
260     ServerSocket sSocket;
261     final int socketNote=1;
262     final int isNote=2;
263     final int osNote=3;
264     final int notifNote=4;
265     boolean paused = false;
266 
267     public void pause() throws Exception {
268         synchronized(this) {
269             paused = true;
270             unLockSocket();
271         }
272     }
273 
274     public void resume() throws Exception {
275         synchronized(this) {
276             paused = false;
277             notify();
278         }
279     }
280 
281 
282     public void accept( MsgContext ep ) throws IOException {
283         if( sSocket==null ) return;
284         synchronized(this) {
285             while(paused) {
286                 try{ 
287                     wait();
288                 } catch(InterruptedException ie) {
289                     //Ignore, since can't happen
290                 }
291             }
292         }
293         Socket s=sSocket.accept();
294         ep.setNote( socketNote, s );
295         if(log.isDebugEnabled() )
296             log.debug("Accepted socket " + s );
297 
298         try {
299             setSocketOptions(s);
300         } catch(SocketException sex) {
301             log.debug("Error initializing Socket Options", sex);
302         }
303         
304         requestCount++;
305 
306         InputStream is=new BufferedInputStream(s.getInputStream());
307         OutputStream os;
308         if( bufferSize > 0 )
309             os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
310         else
311             os = s.getOutputStream();
312         ep.setNote( isNote, is );
313         ep.setNote( osNote, os );
314         ep.setControl( tp );
315     }
316 
317     private void setSocketOptions(Socket s) throws SocketException {
318         if( socketTimeout > 0 ) 
319             s.setSoTimeout( socketTimeout );
320         
321         s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
322 
323         if( linger > 0 )
324             s.setSoLinger( true, linger);
325     }
326 
327     public void resetCounters() {
328         requestCount=0;
329     }
330 
331     /** Called after you change some fields at runtime using jmx.
332         Experimental for now.
333     */
334     public void reinit() throws IOException {
335         destroy();
336         init();
337     }
338 
339     /**
340      * jmx:managed-operation
341      */
342     public void init() throws IOException {
343         // Find a port.
344         if (startPort == 0) {
345             port = 0;
346             if(log.isInfoEnabled())
347                 log.info("JK: ajp13 disabling channelSocket");
348             running = true;
349             return;
350         }
351         if (maxPort < startPort)
352             maxPort = startPort;
353         for( int i=startPort; i<=maxPort; i++ ) {
354             try {
355                 if( inet == null ) {
356                     sSocket = new ServerSocket( i, 0 );
357                 } else {
358                     sSocket=new ServerSocket( i, 0, inet );
359                 }
360                 port=i;
361                 break;
362             } catch( IOException ex ) {
363                 if(log.isInfoEnabled())
364                     log.info("Port busy " + i + " " + ex.toString());
365                 continue;
366             }
367         }
368 
369         if( sSocket==null ) {
370             log.error("Can't find free port " + startPort + " " + maxPort );
371             return;
372         }
373         if(log.isInfoEnabled())
374             log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
375 
376         // If this is not the base port and we are the 'main' channleSocket and
377         // SHM didn't already set the localId - we'll set the instance id
378         if( "channelSocket".equals( name ) &&
379             port != startPort &&
380             (wEnv.getLocalId()==0) ) {
381             wEnv.setLocalId(  port - startPort );
382         }
383         if( serverTimeout > 0 )
384             sSocket.setSoTimeout( serverTimeout );
385 
386         // XXX Reverse it -> this is a notification generator !!
387         if( next==null && wEnv!=null ) {
388             if( nextName!=null )
389                 setNext( wEnv.getHandler( nextName ) );
390             if( next==null )
391                 next=wEnv.getHandler( "dispatch" );
392             if( next==null )
393                 next=wEnv.getHandler( "request" );
394         }
395         JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
396         running = true;
397 
398         // Run a thread that will accept connections.
399         // XXX Try to find a thread first - not sure how...
400         if( this.domain != null ) {
401             try {
402                 tpOName=new ObjectName(domain + ":type=ThreadPool,name=" + 
403                                        getChannelName());
404 
405                 Registry.getRegistry(null, null)
406                     .registerComponent(tp, tpOName, null);
407 
408                 rgOName = new ObjectName
409                     (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
410                 Registry.getRegistry(null, null)
411                     .registerComponent(global, rgOName, null);
412             } catch (Exception e) {
413                 log.error("Can't register threadpool" );
414             }
415         }
416 
417         tp.start();
418         SocketAcceptor acceptAjp=new SocketAcceptor(  this );
419         tp.runIt( acceptAjp);
420 
421     }
422 
423     ObjectName tpOName;
424     ObjectName rgOName;
425     RequestGroupInfo global=new RequestGroupInfo();
426     int JMXRequestNote;
427 
428     public void start() throws IOException{
429         if( sSocket==null )
430             init();
431     }
432 
433     public void stop() throws IOException {
434         destroy();
435     }
436 
437     public void registerRequest(Request req, MsgContext ep, int count) {
438         if(this.domain != null) {
439             try {
440                 RequestInfo rp=req.getRequestProcessor();
441                 rp.setGlobalProcessor(global);
442                 ObjectName roname = new ObjectName
443                     (getDomain() + ":type=RequestProcessor,worker="+
444                      getChannelName()+",name=JkRequest" +count);
445                 ep.setNote(JMXRequestNote, roname);
446                         
447                 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
448             } catch( Exception ex ) {
449                 log.warn("Error registering request");
450             }
451         }
452     }
453 
454     public void open(MsgContext ep) throws IOException {
455     }
456 
457     
458     public void close(MsgContext ep) throws IOException {
459         Socket s=(Socket)ep.getNote( socketNote );
460         s.close();
461     }
462 
463     private void unLockSocket() throws IOException {
464         // Need to create a connection to unlock the accept();
465         Socket s;
466         InetAddress ladr = inet;
467 
468         if(port == 0)
469             return;
470         if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
471             ladr = InetAddress.getLocalHost();
472         }
473         s=new Socket(ladr, port );
474         // setting soLinger to a small value will help shutdown the
475         // connection quicker
476         s.setSoLinger(true, 0);
477 
478   s.close();
479     }
480 
481     public void destroy() throws IOException {
482         running = false;
483         try {
484             /* If we disabled the channel return */
485             if (port == 0)
486                 return;
487             tp.shutdown();
488 
489       if(!paused) {
490     unLockSocket();
491       }
492 
493             sSocket.close(); // XXX?
494             
495             if( tpOName != null )  {
496                 Registry.getRegistry(null, null).unregisterComponent(tpOName);
497             }
498             if( rgOName != null ) {
499                 Registry.getRegistry(null, null).unregisterComponent(rgOName);
500             }
501         } catch(Exception e) {
502             log.info("Error shutting down the channel " + port + " " +
503                     e.toString());
504             if( log.isDebugEnabled() ) log.debug("Trace", e);
505         }
506     }
507 
508     public int send( Msg msg, MsgContext ep)
509         throws IOException    {
510         msg.end(); // Write the packet header
511         byte buf[]=msg.getBuffer();
512         int len=msg.getLen();
513         
514         if(log.isTraceEnabled() )
515             log.trace("send() " + len + " " + buf[4] );
516 
517         OutputStream os=(OutputStream)ep.getNote( osNote );
518         os.write( buf, 0, len );
519         return len;
520     }
521 
522     public int flush( Msg msg, MsgContext ep)
523         throws IOException    {
524         if( bufferSize > 0 ) {
525             OutputStream os=(OutputStream)ep.getNote( osNote );
526             os.flush();
527         }
528         return 0;
529     }
530 
531     public int receive( Msg msg, MsgContext ep )
532         throws IOException    {
533         if (log.isDebugEnabled()) {
534             log.debug("receive() ");
535         }
536 
537         byte buf[]=msg.getBuffer();
538         int hlen=msg.getHeaderLength();
539         
540   // XXX If the length in the packet header doesn't agree with the
541   // actual number of bytes read, it should probably return an error
542   // value.  Also, callers of this method never use the length
543   // returned -- should probably return true/false instead.
544 
545         int rd = this.read(ep, buf, 0, hlen );
546         
547         if(rd < 0) {
548             // Most likely normal apache restart.
549             // log.warn("Wrong message " + rd );
550             return rd;
551         }
552 
553         msg.processHeader();
554 
555         /* After processing the header we know the body
556            length
557         */
558         int blen=msg.getLen();
559         
560   // XXX check if enough space - it's assert()-ed !!!
561         
562    int total_read = 0;
563         
564         total_read = this.read(ep, buf, hlen, blen);
565         
566         if ((total_read <= 0) && (blen > 0)) {
567             log.warn("can't read body, waited #" + blen);
568             return  -1;
569         }
570         
571         if (total_read != blen) {
572              log.warn( "incomplete read, waited #" + blen +
573                         " got only " + total_read);
574             return -2;
575         }
576         
577   return total_read;
578     }
579     
580     /**
581      * Read N bytes from the InputStream, and ensure we got them all
582      * Under heavy load we could experience many fragmented packets
583      * just read Unix Network Programming to recall that a call to
584      * read didn't ensure you got all the data you want
585      *
586      * from read() Linux manual
587      *
588      * On success, the number of bytes read is returned (zero indicates end
589      * of file),and the file position is advanced by this number.
590      * It is not an error if this number is smaller than the number of bytes
591      * requested; this may happen for example because fewer bytes
592      * are actually available right now (maybe because we were close to
593      * end-of-file, or because we are reading from a pipe, or  from  a
594      * terminal),  or  because  read()  was interrupted by a signal.
595      * On error, -1 is returned, and errno is set appropriately. In this
596      * case it is left unspecified whether the file position (if any) changes.
597      *
598      **/
599     public int read( MsgContext ep, byte[] b, int offset, int len)
600         throws IOException    {
601         InputStream is=(InputStream)ep.getNote( isNote );
602         int pos = 0;
603         int got;
604 
605         while(pos < len) {
606             try {
607                 got = is.read(b, pos + offset, len - pos);
608             } catch(SocketException sex) {
609                 if(pos > 0) {
610                     log.info("Error reading data after "+pos+"bytes",sex);
611                 } else {
612                     log.debug("Error reading data", sex);
613                 }
614                 got = -1;
615             }
616             if (log.isTraceEnabled()) {
617                 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
618                           offset + " " + len + " = " + got );
619             }
620 
621             // connection just closed by remote. 
622             if (got <= 0) {
623                 // This happens periodically, as apache restarts
624                 // periodically.
625                 // It should be more gracefull ! - another feature for Ajp14
626                 // log.warn( "server has closed the current connection (-1)" );
627                 return -3;
628             }
629 
630             pos += got;
631         }
632         return pos;
633     }
634     
635     protected boolean running=true;
636     
637     /** Accept incoming connections, dispatch to the thread pool
638      */
639     void acceptConnections() {
640         if( log.isDebugEnabled() )
641             log.debug("Accepting ajp connections on " + port);
642         while( running ) {
643       try{
644                 MsgContext ep=createMsgContext();
645                 ep.setSource(this);
646                 ep.setWorkerEnv( wEnv );
647                 this.accept(ep);
648 
649                 if( !running ) break;
650                 
651                 // Since this is a long-running connection, we don't care
652                 // about the small GC
653                 SocketConnection ajpConn=
654                     new SocketConnection(this, ep);
655                 tp.runIt( ajpConn );
656       }catch(Exception ex) {
657                 if (running)
658                     log.warn("Exception executing accept" ,ex);
659       }
660         }
661     }
662 
663     /** Process a single ajp connection.
664      */
665     void processConnection(MsgContext ep) {
666         try {
667             MsgAjp recv=new MsgAjp();
668             while( running ) {
669                 if(paused) { // Drop the connection on pause
670                     break;
671                 }
672                 int status= this.receive( recv, ep );
673                 if( status <= 0 ) {
674                     if( status==-3)
675                         log.debug( "server has been restarted or reset this connection" );
676                     else 
677                         log.warn("Closing ajp connection " + status );
678                     break;
679                 }
680                 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
681                 
682                 ep.setType( 0 );
683                 // Will call next
684                 status= this.invoke( recv, ep );
685                 if( status!= JkHandler.OK ) {
686                     log.warn("processCallbacks status " + status );
687                     break;
688                 }
689             }
690         } catch( Exception ex ) {
691             String msg = ex.getMessage();
692             if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
693                 log.debug( "Server has been restarted or reset this connection");
694             else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
695                 log.debug( "connection timeout reached");            
696             else
697                 log.error( "Error, processing connection", ex);
698         } finally {
699         /*
700          * Whatever happened to this connection (remote closed it, timeout, read error)
701          * the socket SHOULD be closed, or we may be in situation where the webserver
702          * will continue to think the socket is still open and will forward request
703          * to tomcat without receiving ever a reply
704          */
705             try {
706                 this.close( ep );
707             }
708             catch( Exception e) {
709                 log.error( "Error, closing connection", e);
710             }
711             try{
712                 Request req = (Request)ep.getRequest();
713                 if( req != null ) {
714                     ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
715                     if( roname != null ) {
716                         Registry.getRegistry(null, null).unregisterComponent(roname);
717                     }
718                     req.getRequestProcessor().setGlobalProcessor(null);
719                 }
720             } catch( Exception ee) {
721                 log.error( "Error, releasing connection",ee);
722             }
723         }
724     }
725 
726     // XXX This should become handleNotification
727     public int invoke( Msg msg, MsgContext ep ) throws IOException {
728         int type=ep.getType();
729 
730         switch( type ) {
731         case JkHandler.HANDLE_RECEIVE_PACKET:
732             if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
733             return receive( msg, ep );
734         case JkHandler.HANDLE_SEND_PACKET:
735             return send( msg, ep );
736         case JkHandler.HANDLE_FLUSH:
737             return flush( msg, ep );
738         }
739 
740         if( log.isDebugEnabled() )
741             log.debug("Call next " + type + " " + next);
742 
743         // Send notification
744         if( nSupport!=null ) {
745             Notification notif=(Notification)ep.getNote(notifNote);
746             if( notif==null ) {
747                 notif=new Notification("channelSocket.message", ep, requestCount );
748                 ep.setNote( notifNote, notif);
749             }
750             nSupport.sendNotification(notif);
751         }
752 
753         if( next != null ) {
754             return next.invoke( msg, ep );
755         } else {
756             log.info("No next ");
757         }
758 
759         return OK;
760     }
761     
762     public boolean isSameAddress(MsgContext ep) {
763         Socket s=(Socket)ep.getNote( socketNote );
764         return isSameAddress( s.getLocalAddress(), s.getInetAddress());
765     }
766     
767     public String getChannelName() {
768         String encodedAddr = "";
769         if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
770             encodedAddr = getAddress();
771             if (encodedAddr.startsWith("/"))
772                 encodedAddr = encodedAddr.substring(1);
773       encodedAddr = URLEncoder.encode(encodedAddr) + "-";
774         }
775         return ("jk-" + encodedAddr + port);
776     }
777     
778     /**
779      * Return <code>true</code> if the specified client and server addresses
780      * are the same.  This method works around a bug in the IBM 1.1.8 JVM on
781      * Linux, where the address bytes are returned reversed in some
782      * circumstances.
783      *
784      * @param server The server's InetAddress
785      * @param client The client's InetAddress
786      */
787     public static boolean isSameAddress(InetAddress server, InetAddress client)
788     {
789   // Compare the byte array versions of the two addresses
790   byte serverAddr[] = server.getAddress();
791   byte clientAddr[] = client.getAddress();
792   if (serverAddr.length != clientAddr.length)
793       return (false);
794   boolean match = true;
795   for (int i = 0; i < serverAddr.length; i++) {
796       if (serverAddr[i] != clientAddr[i]) {
797     match = false;
798     break;
799       }
800   }
801   if (match)
802       return (true);
803 
804   // Compare the reversed form of the two addresses
805   for (int i = 0; i < serverAddr.length; i++) {
806       if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
807     return (false);
808   }
809   return (true);
810     }
811 
812     public void sendNewMessageNotification(Notification notification) {
813         if( nSupport!= null )
814             nSupport.sendNotification(notification);
815     }
816 
817     private NotificationBroadcasterSupport nSupport= null;
818 
819     public void addNotificationListener(NotificationListener listener,
820                                         NotificationFilter filter,
821                                         Object handback)
822             throws IllegalArgumentException
823     {
824         if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
825         nSupport.addNotificationListener(listener, filter, handback);
826     }
827 
828     public void removeNotificationListener(NotificationListener listener)
829             throws ListenerNotFoundException
830     {
831         if( nSupport!=null)
832             nSupport.removeNotificationListener(listener);
833     }
834 
835     MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
836 
837     public void setNotificationInfo( MBeanNotificationInfo info[]) {
838         this.notifInfo=info;
839     }
840 
841     public MBeanNotificationInfo[] getNotificationInfo() {
842         return notifInfo;
843     }
844 
845     static class SocketAcceptor implements ThreadPoolRunnable {
846   ChannelSocket wajp;
847     
848   SocketAcceptor(ChannelSocket wajp ) {
849       this.wajp=wajp;
850   }
851   
852   public Object[] getInitData() {
853       return null;
854   }
855   
856   public void runIt(Object thD[]) {
857       wajp.acceptConnections();
858   }
859     }
860 
861     static class SocketConnection implements ThreadPoolRunnable {
862   ChannelSocket wajp;
863   MsgContext ep;
864 
865   SocketConnection(ChannelSocket wajp, MsgContext ep) {
866       this.wajp=wajp;
867       this.ep=ep;
868   }
869 
870 
871   public Object[] getInitData() {
872       return null;
873   }
874   
875   public void runIt(Object perTh[]) {
876       wajp.processConnection(ep);
877       ep = null;
878   }
879     }
880 
881 }
882