Source code: org/apache/jk/common/ChannelSocket.java
1 /*
2 * Copyright 1999-2005 The Apache Software Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.jk.common;
18
19 import java.io.BufferedInputStream;
20 import java.io.BufferedOutputStream;
21 import java.io.IOException;
22 import java.io.InputStream;
23 import java.io.OutputStream;
24 import java.net.URLEncoder;
25 import java.net.InetAddress;
26 import java.net.ServerSocket;
27 import java.net.Socket;
28 import java.net.SocketException;
29
30 import javax.management.ListenerNotFoundException;
31 import javax.management.MBeanNotificationInfo;
32 import javax.management.Notification;
33 import javax.management.NotificationBroadcaster;
34 import javax.management.NotificationBroadcasterSupport;
35 import javax.management.NotificationFilter;
36 import javax.management.NotificationListener;
37 import javax.management.ObjectName;
38
39 import org.apache.commons.modeler.Registry;
40 import org.apache.jk.core.JkHandler;
41 import org.apache.jk.core.Msg;
42 import org.apache.jk.core.MsgContext;
43 import org.apache.jk.core.JkChannel;
44 import org.apache.jk.core.WorkerEnv;
45 import org.apache.coyote.Request;
46 import org.apache.coyote.RequestGroupInfo;
47 import org.apache.coyote.RequestInfo;
48 import org.apache.tomcat.util.threads.ThreadPool;
49 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
50
51 /**
52 * Accept ( and send ) TCP messages.
53 *
54 * @author Costin Manolache
55 * @author Bill Barker
56 * jmx:mbean name="jk:service=ChannelNioSocket"
57 * description="Accept socket connections"
58 * jmx:notification name="org.apache.coyote.INVOKE
59 * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
60 * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
61 * jmx:notification-handler name="org.apache.jk.JK_FLUSH
62 *
63 * Jk can use multiple protocols/transports.
64 * Various container adapters should load this object ( as a bean ),
65 * set configurations and use it. Note that the connector will handle
66 * all incoming protocols - it's not specific to ajp1x. The protocol
67 * is abstracted by MsgContext/Message/Channel.
68 *
69 * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
70 * TCP, Ajp14 API etc.
71 * As we add other protocols/transports/APIs this will change, the current goal
72 * is to get the same level of functionality as in the original jk connector.
73 *
74 * XXX Make the 'message type' pluggable
75 */
76 public class ChannelSocket extends JkHandler
77 implements NotificationBroadcaster, JkChannel {
78 private static org.apache.commons.logging.Log log =
79 org.apache.commons.logging.LogFactory.getLog( ChannelSocket.class );
80
81 private int startPort=8009;
82 private int maxPort=8019; // 0 for backward compat.
83 private int port=startPort;
84 private InetAddress inet;
85 private int serverTimeout;
86 private boolean tcpNoDelay=true; // nodelay to true by default
87 private int linger=100;
88 private int socketTimeout;
89 private int bufferSize = -1;
90
91 private long requestCount=0;
92
93 ThreadPool tp=ThreadPool.createThreadPool(true);
94
95 /* ==================== Tcp socket options ==================== */
96
97 /**
98 * jmx:managed-constructor description="default constructor"
99 */
100 public ChannelSocket() {
101 // This should be integrated with the domain setup
102 }
103
104 public ThreadPool getThreadPool() {
105 return tp;
106 }
107
108 public long getRequestCount() {
109 return requestCount;
110 }
111
112 /** Set the port for the ajp13 channel.
113 * To support seemless load balancing and jni, we treat this
114 * as the 'base' port - we'll try up until we find one that is not
115 * used. We'll also provide the 'difference' to the main coyote
116 * handler - that will be our 'sessionID' and the position in
117 * the scoreboard and the suffix for the unix domain socket.
118 *
119 * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
120 */
121 public void setPort( int port ) {
122 this.startPort=port;
123 this.port=port;
124 this.maxPort=port+10;
125 }
126
127 public int getPort() {
128 return port;
129 }
130
131 public void setAddress(InetAddress inet) {
132 this.inet=inet;
133 }
134
135 /**
136 * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
137 */
138 public void setAddress(String inet) {
139 try {
140 this.inet= InetAddress.getByName( inet );
141 } catch( Exception ex ) {
142 log.error("Error parsing "+inet,ex);
143 }
144 }
145
146 public String getAddress() {
147 if( inet!=null)
148 return inet.toString();
149 return "/0.0.0.0";
150 }
151
152 /**
153 * Sets the timeout in ms of the server sockets created by this
154 * server. This method allows the developer to make servers
155 * more or less responsive to having their server sockets
156 * shut down.
157 *
158 * <p>By default this value is 1000ms.
159 */
160 public void setServerTimeout(int timeout) {
161 this.serverTimeout = timeout;
162 }
163 public int getServerTimeout() {
164 return serverTimeout;
165 }
166
167 public void setTcpNoDelay( boolean b ) {
168 tcpNoDelay=b;
169 }
170
171 public boolean getTcpNoDelay() {
172 return tcpNoDelay;
173 }
174
175 public void setSoLinger( int i ) {
176 linger=i;
177 }
178
179 public int getSoLinger() {
180 return linger;
181 }
182
183 public void setSoTimeout( int i ) {
184 socketTimeout=i;
185 }
186
187 public int getSoTimeout() {
188 return socketTimeout;
189 }
190
191 public void setMaxPort( int i ) {
192 maxPort=i;
193 }
194
195 public int getMaxPort() {
196 return maxPort;
197 }
198
199 public void setBufferSize(int bs) {
200 bufferSize = bs;
201 }
202
203 public int getBufferSize() {
204 return bufferSize;
205 }
206
207 /** At startup we'll look for the first free port in the range.
208 The difference between this port and the beggining of the range
209 is the 'id'.
210 This is usefull for lb cases ( less config ).
211 */
212 public int getInstanceId() {
213 return port-startPort;
214 }
215
216 /** If set to false, the thread pool will be created in
217 * non-daemon mode, and will prevent main from exiting
218 */
219 public void setDaemon( boolean b ) {
220 tp.setDaemon( b );
221 }
222
223 public boolean getDaemon() {
224 return tp.getDaemon();
225 }
226
227
228 public void setMaxThreads( int i ) {
229 if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
230 tp.setMaxThreads(i);
231 }
232
233 public void setMinSpareThreads( int i ) {
234 if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
235 tp.setMinSpareThreads(i);
236 }
237
238 public void setMaxSpareThreads( int i ) {
239 if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
240 tp.setMaxSpareThreads(i);
241 }
242
243 public int getMaxThreads() {
244 return tp.getMaxThreads();
245 }
246
247 public int getMinSpareThreads() {
248 return tp.getMinSpareThreads();
249 }
250
251 public int getMaxSpareThreads() {
252 return tp.getMaxSpareThreads();
253 }
254
255 public void setBacklog(int i) {
256 }
257
258
259 /* ==================== ==================== */
260 ServerSocket sSocket;
261 final int socketNote=1;
262 final int isNote=2;
263 final int osNote=3;
264 final int notifNote=4;
265 boolean paused = false;
266
267 public void pause() throws Exception {
268 synchronized(this) {
269 paused = true;
270 unLockSocket();
271 }
272 }
273
274 public void resume() throws Exception {
275 synchronized(this) {
276 paused = false;
277 notify();
278 }
279 }
280
281
282 public void accept( MsgContext ep ) throws IOException {
283 if( sSocket==null ) return;
284 synchronized(this) {
285 while(paused) {
286 try{
287 wait();
288 } catch(InterruptedException ie) {
289 //Ignore, since can't happen
290 }
291 }
292 }
293 Socket s=sSocket.accept();
294 ep.setNote( socketNote, s );
295 if(log.isDebugEnabled() )
296 log.debug("Accepted socket " + s );
297
298 try {
299 setSocketOptions(s);
300 } catch(SocketException sex) {
301 log.debug("Error initializing Socket Options", sex);
302 }
303
304 requestCount++;
305
306 InputStream is=new BufferedInputStream(s.getInputStream());
307 OutputStream os;
308 if( bufferSize > 0 )
309 os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
310 else
311 os = s.getOutputStream();
312 ep.setNote( isNote, is );
313 ep.setNote( osNote, os );
314 ep.setControl( tp );
315 }
316
317 private void setSocketOptions(Socket s) throws SocketException {
318 if( socketTimeout > 0 )
319 s.setSoTimeout( socketTimeout );
320
321 s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
322
323 if( linger > 0 )
324 s.setSoLinger( true, linger);
325 }
326
327 public void resetCounters() {
328 requestCount=0;
329 }
330
331 /** Called after you change some fields at runtime using jmx.
332 Experimental for now.
333 */
334 public void reinit() throws IOException {
335 destroy();
336 init();
337 }
338
339 /**
340 * jmx:managed-operation
341 */
342 public void init() throws IOException {
343 // Find a port.
344 if (startPort == 0) {
345 port = 0;
346 if(log.isInfoEnabled())
347 log.info("JK: ajp13 disabling channelSocket");
348 running = true;
349 return;
350 }
351 if (maxPort < startPort)
352 maxPort = startPort;
353 for( int i=startPort; i<=maxPort; i++ ) {
354 try {
355 if( inet == null ) {
356 sSocket = new ServerSocket( i, 0 );
357 } else {
358 sSocket=new ServerSocket( i, 0, inet );
359 }
360 port=i;
361 break;
362 } catch( IOException ex ) {
363 if(log.isInfoEnabled())
364 log.info("Port busy " + i + " " + ex.toString());
365 continue;
366 }
367 }
368
369 if( sSocket==null ) {
370 log.error("Can't find free port " + startPort + " " + maxPort );
371 return;
372 }
373 if(log.isInfoEnabled())
374 log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
375
376 // If this is not the base port and we are the 'main' channleSocket and
377 // SHM didn't already set the localId - we'll set the instance id
378 if( "channelSocket".equals( name ) &&
379 port != startPort &&
380 (wEnv.getLocalId()==0) ) {
381 wEnv.setLocalId( port - startPort );
382 }
383 if( serverTimeout > 0 )
384 sSocket.setSoTimeout( serverTimeout );
385
386 // XXX Reverse it -> this is a notification generator !!
387 if( next==null && wEnv!=null ) {
388 if( nextName!=null )
389 setNext( wEnv.getHandler( nextName ) );
390 if( next==null )
391 next=wEnv.getHandler( "dispatch" );
392 if( next==null )
393 next=wEnv.getHandler( "request" );
394 }
395 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
396 running = true;
397
398 // Run a thread that will accept connections.
399 // XXX Try to find a thread first - not sure how...
400 if( this.domain != null ) {
401 try {
402 tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
403 getChannelName());
404
405 Registry.getRegistry(null, null)
406 .registerComponent(tp, tpOName, null);
407
408 rgOName = new ObjectName
409 (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
410 Registry.getRegistry(null, null)
411 .registerComponent(global, rgOName, null);
412 } catch (Exception e) {
413 log.error("Can't register threadpool" );
414 }
415 }
416
417 tp.start();
418 SocketAcceptor acceptAjp=new SocketAcceptor( this );
419 tp.runIt( acceptAjp);
420
421 }
422
423 ObjectName tpOName;
424 ObjectName rgOName;
425 RequestGroupInfo global=new RequestGroupInfo();
426 int JMXRequestNote;
427
428 public void start() throws IOException{
429 if( sSocket==null )
430 init();
431 }
432
433 public void stop() throws IOException {
434 destroy();
435 }
436
437 public void registerRequest(Request req, MsgContext ep, int count) {
438 if(this.domain != null) {
439 try {
440 RequestInfo rp=req.getRequestProcessor();
441 rp.setGlobalProcessor(global);
442 ObjectName roname = new ObjectName
443 (getDomain() + ":type=RequestProcessor,worker="+
444 getChannelName()+",name=JkRequest" +count);
445 ep.setNote(JMXRequestNote, roname);
446
447 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
448 } catch( Exception ex ) {
449 log.warn("Error registering request");
450 }
451 }
452 }
453
454 public void open(MsgContext ep) throws IOException {
455 }
456
457
458 public void close(MsgContext ep) throws IOException {
459 Socket s=(Socket)ep.getNote( socketNote );
460 s.close();
461 }
462
463 private void unLockSocket() throws IOException {
464 // Need to create a connection to unlock the accept();
465 Socket s;
466 InetAddress ladr = inet;
467
468 if(port == 0)
469 return;
470 if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
471 ladr = InetAddress.getLocalHost();
472 }
473 s=new Socket(ladr, port );
474 // setting soLinger to a small value will help shutdown the
475 // connection quicker
476 s.setSoLinger(true, 0);
477
478 s.close();
479 }
480
481 public void destroy() throws IOException {
482 running = false;
483 try {
484 /* If we disabled the channel return */
485 if (port == 0)
486 return;
487 tp.shutdown();
488
489 if(!paused) {
490 unLockSocket();
491 }
492
493 sSocket.close(); // XXX?
494
495 if( tpOName != null ) {
496 Registry.getRegistry(null, null).unregisterComponent(tpOName);
497 }
498 if( rgOName != null ) {
499 Registry.getRegistry(null, null).unregisterComponent(rgOName);
500 }
501 } catch(Exception e) {
502 log.info("Error shutting down the channel " + port + " " +
503 e.toString());
504 if( log.isDebugEnabled() ) log.debug("Trace", e);
505 }
506 }
507
508 public int send( Msg msg, MsgContext ep)
509 throws IOException {
510 msg.end(); // Write the packet header
511 byte buf[]=msg.getBuffer();
512 int len=msg.getLen();
513
514 if(log.isTraceEnabled() )
515 log.trace("send() " + len + " " + buf[4] );
516
517 OutputStream os=(OutputStream)ep.getNote( osNote );
518 os.write( buf, 0, len );
519 return len;
520 }
521
522 public int flush( Msg msg, MsgContext ep)
523 throws IOException {
524 if( bufferSize > 0 ) {
525 OutputStream os=(OutputStream)ep.getNote( osNote );
526 os.flush();
527 }
528 return 0;
529 }
530
531 public int receive( Msg msg, MsgContext ep )
532 throws IOException {
533 if (log.isDebugEnabled()) {
534 log.debug("receive() ");
535 }
536
537 byte buf[]=msg.getBuffer();
538 int hlen=msg.getHeaderLength();
539
540 // XXX If the length in the packet header doesn't agree with the
541 // actual number of bytes read, it should probably return an error
542 // value. Also, callers of this method never use the length
543 // returned -- should probably return true/false instead.
544
545 int rd = this.read(ep, buf, 0, hlen );
546
547 if(rd < 0) {
548 // Most likely normal apache restart.
549 // log.warn("Wrong message " + rd );
550 return rd;
551 }
552
553 msg.processHeader();
554
555 /* After processing the header we know the body
556 length
557 */
558 int blen=msg.getLen();
559
560 // XXX check if enough space - it's assert()-ed !!!
561
562 int total_read = 0;
563
564 total_read = this.read(ep, buf, hlen, blen);
565
566 if ((total_read <= 0) && (blen > 0)) {
567 log.warn("can't read body, waited #" + blen);
568 return -1;
569 }
570
571 if (total_read != blen) {
572 log.warn( "incomplete read, waited #" + blen +
573 " got only " + total_read);
574 return -2;
575 }
576
577 return total_read;
578 }
579
580 /**
581 * Read N bytes from the InputStream, and ensure we got them all
582 * Under heavy load we could experience many fragmented packets
583 * just read Unix Network Programming to recall that a call to
584 * read didn't ensure you got all the data you want
585 *
586 * from read() Linux manual
587 *
588 * On success, the number of bytes read is returned (zero indicates end
589 * of file),and the file position is advanced by this number.
590 * It is not an error if this number is smaller than the number of bytes
591 * requested; this may happen for example because fewer bytes
592 * are actually available right now (maybe because we were close to
593 * end-of-file, or because we are reading from a pipe, or from a
594 * terminal), or because read() was interrupted by a signal.
595 * On error, -1 is returned, and errno is set appropriately. In this
596 * case it is left unspecified whether the file position (if any) changes.
597 *
598 **/
599 public int read( MsgContext ep, byte[] b, int offset, int len)
600 throws IOException {
601 InputStream is=(InputStream)ep.getNote( isNote );
602 int pos = 0;
603 int got;
604
605 while(pos < len) {
606 try {
607 got = is.read(b, pos + offset, len - pos);
608 } catch(SocketException sex) {
609 if(pos > 0) {
610 log.info("Error reading data after "+pos+"bytes",sex);
611 } else {
612 log.debug("Error reading data", sex);
613 }
614 got = -1;
615 }
616 if (log.isTraceEnabled()) {
617 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
618 offset + " " + len + " = " + got );
619 }
620
621 // connection just closed by remote.
622 if (got <= 0) {
623 // This happens periodically, as apache restarts
624 // periodically.
625 // It should be more gracefull ! - another feature for Ajp14
626 // log.warn( "server has closed the current connection (-1)" );
627 return -3;
628 }
629
630 pos += got;
631 }
632 return pos;
633 }
634
635 protected boolean running=true;
636
637 /** Accept incoming connections, dispatch to the thread pool
638 */
639 void acceptConnections() {
640 if( log.isDebugEnabled() )
641 log.debug("Accepting ajp connections on " + port);
642 while( running ) {
643 try{
644 MsgContext ep=createMsgContext();
645 ep.setSource(this);
646 ep.setWorkerEnv( wEnv );
647 this.accept(ep);
648
649 if( !running ) break;
650
651 // Since this is a long-running connection, we don't care
652 // about the small GC
653 SocketConnection ajpConn=
654 new SocketConnection(this, ep);
655 tp.runIt( ajpConn );
656 }catch(Exception ex) {
657 if (running)
658 log.warn("Exception executing accept" ,ex);
659 }
660 }
661 }
662
663 /** Process a single ajp connection.
664 */
665 void processConnection(MsgContext ep) {
666 try {
667 MsgAjp recv=new MsgAjp();
668 while( running ) {
669 if(paused) { // Drop the connection on pause
670 break;
671 }
672 int status= this.receive( recv, ep );
673 if( status <= 0 ) {
674 if( status==-3)
675 log.debug( "server has been restarted or reset this connection" );
676 else
677 log.warn("Closing ajp connection " + status );
678 break;
679 }
680 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
681
682 ep.setType( 0 );
683 // Will call next
684 status= this.invoke( recv, ep );
685 if( status!= JkHandler.OK ) {
686 log.warn("processCallbacks status " + status );
687 break;
688 }
689 }
690 } catch( Exception ex ) {
691 String msg = ex.getMessage();
692 if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
693 log.debug( "Server has been restarted or reset this connection");
694 else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
695 log.debug( "connection timeout reached");
696 else
697 log.error( "Error, processing connection", ex);
698 } finally {
699 /*
700 * Whatever happened to this connection (remote closed it, timeout, read error)
701 * the socket SHOULD be closed, or we may be in situation where the webserver
702 * will continue to think the socket is still open and will forward request
703 * to tomcat without receiving ever a reply
704 */
705 try {
706 this.close( ep );
707 }
708 catch( Exception e) {
709 log.error( "Error, closing connection", e);
710 }
711 try{
712 Request req = (Request)ep.getRequest();
713 if( req != null ) {
714 ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
715 if( roname != null ) {
716 Registry.getRegistry(null, null).unregisterComponent(roname);
717 }
718 req.getRequestProcessor().setGlobalProcessor(null);
719 }
720 } catch( Exception ee) {
721 log.error( "Error, releasing connection",ee);
722 }
723 }
724 }
725
726 // XXX This should become handleNotification
727 public int invoke( Msg msg, MsgContext ep ) throws IOException {
728 int type=ep.getType();
729
730 switch( type ) {
731 case JkHandler.HANDLE_RECEIVE_PACKET:
732 if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
733 return receive( msg, ep );
734 case JkHandler.HANDLE_SEND_PACKET:
735 return send( msg, ep );
736 case JkHandler.HANDLE_FLUSH:
737 return flush( msg, ep );
738 }
739
740 if( log.isDebugEnabled() )
741 log.debug("Call next " + type + " " + next);
742
743 // Send notification
744 if( nSupport!=null ) {
745 Notification notif=(Notification)ep.getNote(notifNote);
746 if( notif==null ) {
747 notif=new Notification("channelSocket.message", ep, requestCount );
748 ep.setNote( notifNote, notif);
749 }
750 nSupport.sendNotification(notif);
751 }
752
753 if( next != null ) {
754 return next.invoke( msg, ep );
755 } else {
756 log.info("No next ");
757 }
758
759 return OK;
760 }
761
762 public boolean isSameAddress(MsgContext ep) {
763 Socket s=(Socket)ep.getNote( socketNote );
764 return isSameAddress( s.getLocalAddress(), s.getInetAddress());
765 }
766
767 public String getChannelName() {
768 String encodedAddr = "";
769 if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
770 encodedAddr = getAddress();
771 if (encodedAddr.startsWith("/"))
772 encodedAddr = encodedAddr.substring(1);
773 encodedAddr = URLEncoder.encode(encodedAddr) + "-";
774 }
775 return ("jk-" + encodedAddr + port);
776 }
777
778 /**
779 * Return <code>true</code> if the specified client and server addresses
780 * are the same. This method works around a bug in the IBM 1.1.8 JVM on
781 * Linux, where the address bytes are returned reversed in some
782 * circumstances.
783 *
784 * @param server The server's InetAddress
785 * @param client The client's InetAddress
786 */
787 public static boolean isSameAddress(InetAddress server, InetAddress client)
788 {
789 // Compare the byte array versions of the two addresses
790 byte serverAddr[] = server.getAddress();
791 byte clientAddr[] = client.getAddress();
792 if (serverAddr.length != clientAddr.length)
793 return (false);
794 boolean match = true;
795 for (int i = 0; i < serverAddr.length; i++) {
796 if (serverAddr[i] != clientAddr[i]) {
797 match = false;
798 break;
799 }
800 }
801 if (match)
802 return (true);
803
804 // Compare the reversed form of the two addresses
805 for (int i = 0; i < serverAddr.length; i++) {
806 if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
807 return (false);
808 }
809 return (true);
810 }
811
812 public void sendNewMessageNotification(Notification notification) {
813 if( nSupport!= null )
814 nSupport.sendNotification(notification);
815 }
816
817 private NotificationBroadcasterSupport nSupport= null;
818
819 public void addNotificationListener(NotificationListener listener,
820 NotificationFilter filter,
821 Object handback)
822 throws IllegalArgumentException
823 {
824 if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
825 nSupport.addNotificationListener(listener, filter, handback);
826 }
827
828 public void removeNotificationListener(NotificationListener listener)
829 throws ListenerNotFoundException
830 {
831 if( nSupport!=null)
832 nSupport.removeNotificationListener(listener);
833 }
834
835 MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
836
837 public void setNotificationInfo( MBeanNotificationInfo info[]) {
838 this.notifInfo=info;
839 }
840
841 public MBeanNotificationInfo[] getNotificationInfo() {
842 return notifInfo;
843 }
844
845 static class SocketAcceptor implements ThreadPoolRunnable {
846 ChannelSocket wajp;
847
848 SocketAcceptor(ChannelSocket wajp ) {
849 this.wajp=wajp;
850 }
851
852 public Object[] getInitData() {
853 return null;
854 }
855
856 public void runIt(Object thD[]) {
857 wajp.acceptConnections();
858 }
859 }
860
861 static class SocketConnection implements ThreadPoolRunnable {
862 ChannelSocket wajp;
863 MsgContext ep;
864
865 SocketConnection(ChannelSocket wajp, MsgContext ep) {
866 this.wajp=wajp;
867 this.ep=ep;
868 }
869
870
871 public Object[] getInitData() {
872 return null;
873 }
874
875 public void runIt(Object perTh[]) {
876 wajp.processConnection(ep);
877 ep = null;
878 }
879 }
880
881 }
882