1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.jk.common;
19
20 import java.io.BufferedInputStream;
21 import java.io.BufferedOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.net.URLEncoder;
26 import java.net.InetAddress;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.net.SocketException;
30
31 import javax.management.ListenerNotFoundException;
32 import javax.management.MBeanNotificationInfo;
33 import javax.management.Notification;
34 import javax.management.NotificationBroadcaster;
35 import javax.management.NotificationBroadcasterSupport;
36 import javax.management.NotificationFilter;
37 import javax.management.NotificationListener;
38 import javax.management.ObjectName;
39
40 import org.apache.jk.core.JkHandler;
41 import org.apache.jk.core.Msg;
42 import org.apache.jk.core.MsgContext;
43 import org.apache.jk.core.JkChannel;
44 import org.apache.jk.core.WorkerEnv;
45 import org.apache.coyote.Request;
46 import org.apache.coyote.RequestGroupInfo;
47 import org.apache.coyote.RequestInfo;
48 import org.apache.coyote.ActionCode;
49 import org.apache.tomcat.util.modeler.Registry;
50 import org.apache.tomcat.util.threads.ThreadPool;
51 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
52
53 /**
54 * Accept ( and send ) TCP messages.
55 *
56 * @author Costin Manolache
57 * @author Bill Barker
58 * jmx:mbean name="jk:service=ChannelNioSocket"
59 * description="Accept socket connections"
60 * jmx:notification name="org.apache.coyote.INVOKE
61 * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
62 * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
63 * jmx:notification-handler name="org.apache.jk.JK_FLUSH
64 *
65 * Jk can use multiple protocols/transports.
66 * Various container adapters should load this object ( as a bean ),
67 * set configurations and use it. Note that the connector will handle
68 * all incoming protocols - it's not specific to ajp1x. The protocol
69 * is abstracted by MsgContext/Message/Channel.
70 *
71 * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
72 * TCP, Ajp14 API etc.
73 * As we add other protocols/transports/APIs this will change, the current goal
74 * is to get the same level of functionality as in the original jk connector.
75 *
76 * XXX Make the 'message type' pluggable
77 */
78 public class ChannelSocket extends JkHandler
79 implements NotificationBroadcaster, JkChannel {
80 private static org.apache.juli.logging.Log log =
81 org.apache.juli.logging.LogFactory.getLog( ChannelSocket.class );
82
83 private int startPort=8009;
84 private int maxPort=8019; // 0 for backward compat.
85 private int port=startPort;
86 private InetAddress inet;
87 private int serverTimeout;
88 private boolean tcpNoDelay=true; // nodelay to true by default
89 private int linger=100;
90 private int socketTimeout;
91 private int bufferSize = -1;
92 private int packetSize = AjpConstants.MAX_PACKET_SIZE;
93
94 private long requestCount=0;
95
96 ThreadPool tp=ThreadPool.createThreadPool(true);
97
98 /* ==================== Tcp socket options ==================== */
99
100 /**
101 * jmx:managed-constructor description="default constructor"
102 */
103 public ChannelSocket() {
104 // This should be integrated with the domain setup
105 }
106
107 public ThreadPool getThreadPool() {
108 return tp;
109 }
110
111 public long getRequestCount() {
112 return requestCount;
113 }
114
115 /** Set the port for the ajp13 channel.
116 * To support seemless load balancing and jni, we treat this
117 * as the 'base' port - we'll try up until we find one that is not
118 * used. We'll also provide the 'difference' to the main coyote
119 * handler - that will be our 'sessionID' and the position in
120 * the scoreboard and the suffix for the unix domain socket.
121 *
122 * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
123 */
124 public void setPort( int port ) {
125 this.startPort=port;
126 this.port=port;
127 this.maxPort=port+10;
128 }
129
130 public int getPort() {
131 return port;
132 }
133
134 public void setAddress(InetAddress inet) {
135 this.inet=inet;
136 }
137
138 /**
139 * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
140 */
141 public void setAddress(String inet) {
142 try {
143 this.inet= InetAddress.getByName( inet );
144 } catch( Exception ex ) {
145 log.error("Error parsing "+inet,ex);
146 }
147 }
148
149 public String getAddress() {
150 if( inet!=null)
151 return inet.toString();
152 return "/0.0.0.0";
153 }
154
155 /**
156 * Sets the timeout in ms of the server sockets created by this
157 * server. This method allows the developer to make servers
158 * more or less responsive to having their server sockets
159 * shut down.
160 *
161 * <p>By default this value is 1000ms.
162 */
163 public void setServerTimeout(int timeout) {
164 this.serverTimeout = timeout;
165 }
166 public int getServerTimeout() {
167 return serverTimeout;
168 }
169
170 public void setTcpNoDelay( boolean b ) {
171 tcpNoDelay=b;
172 }
173
174 public boolean getTcpNoDelay() {
175 return tcpNoDelay;
176 }
177
178 public void setSoLinger( int i ) {
179 linger=i;
180 }
181
182 public int getSoLinger() {
183 return linger;
184 }
185
186 public void setSoTimeout( int i ) {
187 socketTimeout=i;
188 }
189
190 public int getSoTimeout() {
191 return socketTimeout;
192 }
193
194 public void setMaxPort( int i ) {
195 maxPort=i;
196 }
197
198 public int getMaxPort() {
199 return maxPort;
200 }
201
202 public void setBufferSize(int bs) {
203 bufferSize = bs;
204 }
205
206 public int getBufferSize() {
207 return bufferSize;
208 }
209
210 public void setPacketSize(int ps) {
211 if(ps < AjpConstants.MAX_PACKET_SIZE) {
212 ps = AjpConstants.MAX_PACKET_SIZE;
213 }
214 packetSize = ps;
215 }
216
217 public int getPacketSize() {
218 return packetSize;
219 }
220
221 /** At startup we'll look for the first free port in the range.
222 The difference between this port and the beggining of the range
223 is the 'id'.
224 This is usefull for lb cases ( less config ).
225 */
226 public int getInstanceId() {
227 return port-startPort;
228 }
229
230 /** If set to false, the thread pool will be created in
231 * non-daemon mode, and will prevent main from exiting
232 */
233 public void setDaemon( boolean b ) {
234 tp.setDaemon( b );
235 }
236
237 public boolean getDaemon() {
238 return tp.getDaemon();
239 }
240
241
242 public void setMaxThreads( int i ) {
243 if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
244 tp.setMaxThreads(i);
245 }
246
247 public void setMinSpareThreads( int i ) {
248 if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
249 tp.setMinSpareThreads(i);
250 }
251
252 public void setMaxSpareThreads( int i ) {
253 if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
254 tp.setMaxSpareThreads(i);
255 }
256
257 public int getMaxThreads() {
258 return tp.getMaxThreads();
259 }
260
261 public int getMinSpareThreads() {
262 return tp.getMinSpareThreads();
263 }
264
265 public int getMaxSpareThreads() {
266 return tp.getMaxSpareThreads();
267 }
268
269 public void setBacklog(int i) {
270 }
271
272
273 /* ==================== ==================== */
274 ServerSocket sSocket;
275 final int socketNote=1;
276 final int isNote=2;
277 final int osNote=3;
278 final int notifNote=4;
279 boolean paused = false;
280
281 public void pause() throws Exception {
282 synchronized(this) {
283 paused = true;
284 unLockSocket();
285 }
286 }
287
288 public void resume() throws Exception {
289 synchronized(this) {
290 paused = false;
291 notify();
292 }
293 }
294
295
296 public void accept( MsgContext ep ) throws IOException {
297 if( sSocket==null ) return;
298 synchronized(this) {
299 while(paused) {
300 try{
301 wait();
302 } catch(InterruptedException ie) {
303 //Ignore, since can't happen
304 }
305 }
306 }
307 Socket s=sSocket.accept();
308 ep.setNote( socketNote, s );
309 if(log.isDebugEnabled() )
310 log.debug("Accepted socket " + s );
311
312 try {
313 setSocketOptions(s);
314 } catch(SocketException sex) {
315 log.debug("Error initializing Socket Options", sex);
316 }
317
318 requestCount++;
319
320 InputStream is=new BufferedInputStream(s.getInputStream());
321 OutputStream os;
322 if( bufferSize > 0 )
323 os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
324 else
325 os = s.getOutputStream();
326 ep.setNote( isNote, is );
327 ep.setNote( osNote, os );
328 ep.setControl( tp );
329 }
330
331 private void setSocketOptions(Socket s) throws SocketException {
332 if( socketTimeout > 0 )
333 s.setSoTimeout( socketTimeout );
334
335 s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
336
337 if( linger > 0 )
338 s.setSoLinger( true, linger);
339 }
340
341 public void resetCounters() {
342 requestCount=0;
343 }
344
345 /** Called after you change some fields at runtime using jmx.
346 Experimental for now.
347 */
348 public void reinit() throws IOException {
349 destroy();
350 init();
351 }
352
353 /**
354 * jmx:managed-operation
355 */
356 public void init() throws IOException {
357 // Find a port.
358 if (startPort == 0) {
359 port = 0;
360 if(log.isInfoEnabled())
361 log.info("JK: ajp13 disabling channelSocket");
362 running = true;
363 return;
364 }
365 if (maxPort < startPort)
366 maxPort = startPort;
367 for( int i=startPort; i<=maxPort; i++ ) {
368 try {
369 if( inet == null ) {
370 sSocket = new ServerSocket( i, 0 );
371 } else {
372 sSocket=new ServerSocket( i, 0, inet );
373 }
374 port=i;
375 break;
376 } catch( IOException ex ) {
377 if(log.isInfoEnabled())
378 log.info("Port busy " + i + " " + ex.toString());
379 continue;
380 }
381 }
382
383 if( sSocket==null ) {
384 log.error("Can't find free port " + startPort + " " + maxPort );
385 return;
386 }
387 if(log.isInfoEnabled())
388 log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
389
390 // If this is not the base port and we are the 'main' channleSocket and
391 // SHM didn't already set the localId - we'll set the instance id
392 if( "channelSocket".equals( name ) &&
393 port != startPort &&
394 (wEnv.getLocalId()==0) ) {
395 wEnv.setLocalId( port - startPort );
396 }
397 if( serverTimeout > 0 )
398 sSocket.setSoTimeout( serverTimeout );
399
400 // XXX Reverse it -> this is a notification generator !!
401 if( next==null && wEnv!=null ) {
402 if( nextName!=null )
403 setNext( wEnv.getHandler( nextName ) );
404 if( next==null )
405 next=wEnv.getHandler( "dispatch" );
406 if( next==null )
407 next=wEnv.getHandler( "request" );
408 }
409 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
410 running = true;
411
412 // Run a thread that will accept connections.
413 // XXX Try to find a thread first - not sure how...
414 if( this.domain != null ) {
415 try {
416 tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
417 getChannelName());
418
419 Registry.getRegistry(null, null)
420 .registerComponent(tp, tpOName, null);
421
422 rgOName = new ObjectName
423 (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
424 Registry.getRegistry(null, null)
425 .registerComponent(global, rgOName, null);
426 } catch (Exception e) {
427 log.error("Can't register threadpool" );
428 }
429 }
430
431 tp.start();
432 SocketAcceptor acceptAjp=new SocketAcceptor( this );
433 tp.runIt( acceptAjp);
434
435 }
436
437 ObjectName tpOName;
438 ObjectName rgOName;
439 RequestGroupInfo global=new RequestGroupInfo();
440 int JMXRequestNote;
441
442 public void start() throws IOException{
443 if( sSocket==null )
444 init();
445 }
446
447 public void stop() throws IOException {
448 destroy();
449 }
450
451 public void registerRequest(Request req, MsgContext ep, int count) {
452 if(this.domain != null) {
453 try {
454 RequestInfo rp=req.getRequestProcessor();
455 rp.setGlobalProcessor(global);
456 ObjectName roname = new ObjectName
457 (getDomain() + ":type=RequestProcessor,worker="+
458 getChannelName()+",name=JkRequest" +count);
459 ep.setNote(JMXRequestNote, roname);
460
461 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
462 } catch( Exception ex ) {
463 log.warn("Error registering request");
464 }
465 }
466 }
467
468 public void open(MsgContext ep) throws IOException {
469 }
470
471
472 public void close(MsgContext ep) throws IOException {
473 Socket s=(Socket)ep.getNote( socketNote );
474 s.close();
475 }
476
477 private void unLockSocket() throws IOException {
478 // Need to create a connection to unlock the accept();
479 Socket s;
480 InetAddress ladr = inet;
481
482 if(port == 0)
483 return;
484 if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
485 ladr = InetAddress.getLocalHost();
486 }
487 s=new Socket(ladr, port );
488 // setting soLinger to a small value will help shutdown the
489 // connection quicker
490 s.setSoLinger(true, 0);
491
492 s.close();
493 }
494
495 public void destroy() throws IOException {
496 running = false;
497 try {
498 /* If we disabled the channel return */
499 if (port == 0)
500 return;
501 tp.shutdown();
502
503 if(!paused) {
504 unLockSocket();
505 }
506
507 sSocket.close(); // XXX?
508
509 if( tpOName != null ) {
510 Registry.getRegistry(null, null).unregisterComponent(tpOName);
511 }
512 if( rgOName != null ) {
513 Registry.getRegistry(null, null).unregisterComponent(rgOName);
514 }
515 } catch(Exception e) {
516 log.info("Error shutting down the channel " + port + " " +
517 e.toString());
518 if( log.isDebugEnabled() ) log.debug("Trace", e);
519 }
520 }
521
522 public int send( Msg msg, MsgContext ep)
523 throws IOException {
524 msg.end(); // Write the packet header
525 byte buf[]=msg.getBuffer();
526 int len=msg.getLen();
527
528 if(log.isTraceEnabled() )
529 log.trace("send() " + len + " " + buf[4] );
530
531 OutputStream os=(OutputStream)ep.getNote( osNote );
532 os.write( buf, 0, len );
533 return len;
534 }
535
536 public int flush( Msg msg, MsgContext ep)
537 throws IOException {
538 if( bufferSize > 0 ) {
539 OutputStream os=(OutputStream)ep.getNote( osNote );
540 os.flush();
541 }
542 return 0;
543 }
544
545 public int receive( Msg msg, MsgContext ep )
546 throws IOException {
547 if (log.isDebugEnabled()) {
548 log.debug("receive() ");
549 }
550
551 byte buf[]=msg.getBuffer();
552 int hlen=msg.getHeaderLength();
553
554 // XXX If the length in the packet header doesn't agree with the
555 // actual number of bytes read, it should probably return an error
556 // value. Also, callers of this method never use the length
557 // returned -- should probably return true/false instead.
558
559 int rd = this.read(ep, buf, 0, hlen );
560
561 if(rd < 0) {
562 // Most likely normal apache restart.
563 // log.warn("Wrong message " + rd );
564 return rd;
565 }
566
567 msg.processHeader();
568
569 /* After processing the header we know the body
570 length
571 */
572 int blen=msg.getLen();
573
574 // XXX check if enough space - it's assert()-ed !!!
575
576 int total_read = 0;
577
578 total_read = this.read(ep, buf, hlen, blen);
579
580 if ((total_read <= 0) && (blen > 0)) {
581 log.warn("can't read body, waited #" + blen);
582 return -1;
583 }
584
585 if (total_read != blen) {
586 log.warn( "incomplete read, waited #" + blen +
587 " got only " + total_read);
588 return -2;
589 }
590
591 return total_read;
592 }
593
594 /**
595 * Read N bytes from the InputStream, and ensure we got them all
596 * Under heavy load we could experience many fragmented packets
597 * just read Unix Network Programming to recall that a call to
598 * read didn't ensure you got all the data you want
599 *
600 * from read() Linux manual
601 *
602 * On success, the number of bytes read is returned (zero indicates end
603 * of file),and the file position is advanced by this number.
604 * It is not an error if this number is smaller than the number of bytes
605 * requested; this may happen for example because fewer bytes
606 * are actually available right now (maybe because we were close to
607 * end-of-file, or because we are reading from a pipe, or from a
608 * terminal), or because read() was interrupted by a signal.
609 * On error, -1 is returned, and errno is set appropriately. In this
610 * case it is left unspecified whether the file position (if any) changes.
611 *
612 **/
613 public int read( MsgContext ep, byte[] b, int offset, int len)
614 throws IOException {
615 InputStream is=(InputStream)ep.getNote( isNote );
616 int pos = 0;
617 int got;
618
619 while(pos < len) {
620 try {
621 got = is.read(b, pos + offset, len - pos);
622 } catch(SocketException sex) {
623 if(pos > 0) {
624 log.info("Error reading data after "+pos+"bytes",sex);
625 } else {
626 log.debug("Error reading data", sex);
627 }
628 got = -1;
629 }
630 if (log.isTraceEnabled()) {
631 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
632 offset + " " + len + " = " + got );
633 }
634
635 // connection just closed by remote.
636 if (got <= 0) {
637 // This happens periodically, as apache restarts
638 // periodically.
639 // It should be more gracefull ! - another feature for Ajp14
640 // log.warn( "server has closed the current connection (-1)" );
641 return -3;
642 }
643
644 pos += got;
645 }
646 return pos;
647 }
648
649 protected boolean running=true;
650
651 /** Accept incoming connections, dispatch to the thread pool
652 */
653 void acceptConnections() {
654 if( log.isDebugEnabled() )
655 log.debug("Accepting ajp connections on " + port);
656 while( running ) {
657 try{
658 MsgContext ep=createMsgContext(packetSize);
659 ep.setSource(this);
660 ep.setWorkerEnv( wEnv );
661 this.accept(ep);
662
663 if( !running ) break;
664
665 // Since this is a long-running connection, we don't care
666 // about the small GC
667 SocketConnection ajpConn=
668 new SocketConnection(this, ep);
669 tp.runIt( ajpConn );
670 }catch(Exception ex) {
671 if (running)
672 log.warn("Exception executing accept" ,ex);
673 }
674 }
675 }
676
677 /** Process a single ajp connection.
678 */
679 void processConnection(MsgContext ep) {
680 try {
681 MsgAjp recv=new MsgAjp(packetSize);
682 while( running ) {
683 if(paused) { // Drop the connection on pause
684 break;
685 }
686 int status= this.receive( recv, ep );
687 if( status <= 0 ) {
688 if( status==-3)
689 log.debug( "server has been restarted or reset this connection" );
690 else
691 log.warn("Closing ajp connection " + status );
692 break;
693 }
694 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
695
696 ep.setType( 0 );
697 // Will call next
698 status= this.invoke( recv, ep );
699 if( status!= JkHandler.OK ) {
700 log.warn("processCallbacks status " + status );
701 ep.action(ActionCode.ACTION_CLOSE, ep.getRequest().getResponse());
702 break;
703 }
704 }
705 } catch( Exception ex ) {
706 String msg = ex.getMessage();
707 if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
708 log.debug( "Server has been restarted or reset this connection");
709 else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
710 log.debug( "connection timeout reached");
711 else
712 log.error( "Error, processing connection", ex);
713 } finally {
714 /*
715 * Whatever happened to this connection (remote closed it, timeout, read error)
716 * the socket SHOULD be closed, or we may be in situation where the webserver
717 * will continue to think the socket is still open and will forward request
718 * to tomcat without receiving ever a reply
719 */
720 try {
721 this.close( ep );
722 }
723 catch( Exception e) {
724 log.error( "Error, closing connection", e);
725 }
726 try{
727 Request req = (Request)ep.getRequest();
728 if( req != null ) {
729 ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
730 if( roname != null ) {
731 Registry.getRegistry(null, null).unregisterComponent(roname);
732 }
733 req.getRequestProcessor().setGlobalProcessor(null);
734 }
735 } catch( Exception ee) {
736 log.error( "Error, releasing connection",ee);
737 }
738 }
739 }
740
741 // XXX This should become handleNotification
742 public int invoke( Msg msg, MsgContext ep ) throws IOException {
743 int type=ep.getType();
744
745 switch( type ) {
746 case JkHandler.HANDLE_RECEIVE_PACKET:
747 if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
748 return receive( msg, ep );
749 case JkHandler.HANDLE_SEND_PACKET:
750 return send( msg, ep );
751 case JkHandler.HANDLE_FLUSH:
752 return flush( msg, ep );
753 }
754
755 if( log.isDebugEnabled() )
756 log.debug("Call next " + type + " " + next);
757
758 // Send notification
759 if( nSupport!=null ) {
760 Notification notif=(Notification)ep.getNote(notifNote);
761 if( notif==null ) {
762 notif=new Notification("channelSocket.message", ep, requestCount );
763 ep.setNote( notifNote, notif);
764 }
765 nSupport.sendNotification(notif);
766 }
767
768 if( next != null ) {
769 return next.invoke( msg, ep );
770 } else {
771 log.info("No next ");
772 }
773
774 return OK;
775 }
776
777 public boolean isSameAddress(MsgContext ep) {
778 Socket s=(Socket)ep.getNote( socketNote );
779 return isSameAddress( s.getLocalAddress(), s.getInetAddress());
780 }
781
782 public String getChannelName() {
783 String encodedAddr = "";
784 if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
785 encodedAddr = getAddress();
786 if (encodedAddr.startsWith("/"))
787 encodedAddr = encodedAddr.substring(1);
788 encodedAddr = URLEncoder.encode(encodedAddr) + "-";
789 }
790 return ("jk-" + encodedAddr + port);
791 }
792
793 /**
794 * Return <code>true</code> if the specified client and server addresses
795 * are the same. This method works around a bug in the IBM 1.1.8 JVM on
796 * Linux, where the address bytes are returned reversed in some
797 * circumstances.
798 *
799 * @param server The server's InetAddress
800 * @param client The client's InetAddress
801 */
802 public static boolean isSameAddress(InetAddress server, InetAddress client)
803 {
804 // Compare the byte array versions of the two addresses
805 byte serverAddr[] = server.getAddress();
806 byte clientAddr[] = client.getAddress();
807 if (serverAddr.length != clientAddr.length)
808 return (false);
809 boolean match = true;
810 for (int i = 0; i < serverAddr.length; i++) {
811 if (serverAddr[i] != clientAddr[i]) {
812 match = false;
813 break;
814 }
815 }
816 if (match)
817 return (true);
818
819 // Compare the reversed form of the two addresses
820 for (int i = 0; i < serverAddr.length; i++) {
821 if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
822 return (false);
823 }
824 return (true);
825 }
826
827 public void sendNewMessageNotification(Notification notification) {
828 if( nSupport!= null )
829 nSupport.sendNotification(notification);
830 }
831
832 private NotificationBroadcasterSupport nSupport= null;
833
834 public void addNotificationListener(NotificationListener listener,
835 NotificationFilter filter,
836 Object handback)
837 throws IllegalArgumentException
838 {
839 if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
840 nSupport.addNotificationListener(listener, filter, handback);
841 }
842
843 public void removeNotificationListener(NotificationListener listener)
844 throws ListenerNotFoundException
845 {
846 if( nSupport!=null)
847 nSupport.removeNotificationListener(listener);
848 }
849
850 MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
851
852 public void setNotificationInfo( MBeanNotificationInfo info[]) {
853 this.notifInfo=info;
854 }
855
856 public MBeanNotificationInfo[] getNotificationInfo() {
857 return notifInfo;
858 }
859
860 static class SocketAcceptor implements ThreadPoolRunnable {
861 ChannelSocket wajp;
862
863 SocketAcceptor(ChannelSocket wajp ) {
864 this.wajp=wajp;
865 }
866
867 public Object[] getInitData() {
868 return null;
869 }
870
871 public void runIt(Object thD[]) {
872 wajp.acceptConnections();
873 }
874 }
875
876 static class SocketConnection implements ThreadPoolRunnable {
877 ChannelSocket wajp;
878 MsgContext ep;
879
880 SocketConnection(ChannelSocket wajp, MsgContext ep) {
881 this.wajp=wajp;
882 this.ep=ep;
883 }
884
885
886 public Object[] getInitData() {
887 return null;
888 }
889
890 public void runIt(Object perTh[]) {
891 wajp.processConnection(ep);
892 ep = null;
893 }
894 }
895
896 }
897