1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.jk.common;
19
20 import java.io.BufferedInputStream;
21 import java.io.BufferedOutputStream;
22 import java.io.IOException;
23 import java.io.InputStream;
24 import java.io.OutputStream;
25 import java.net.URLEncoder;
26 import java.net.InetAddress;
27 import java.net.ServerSocket;
28 import java.net.Socket;
29 import java.net.SocketException;
30
31 import javax.management.ListenerNotFoundException;
32 import javax.management.MBeanNotificationInfo;
33 import javax.management.Notification;
34 import javax.management.NotificationBroadcaster;
35 import javax.management.NotificationBroadcasterSupport;
36 import javax.management.NotificationFilter;
37 import javax.management.NotificationListener;
38 import javax.management.ObjectName;
39
40 import org.apache.jk.core.JkHandler;
41 import org.apache.jk.core.Msg;
42 import org.apache.jk.core.MsgContext;
43 import org.apache.jk.core.JkChannel;
44 import org.apache.jk.core.WorkerEnv;
45 import org.apache.coyote.Request;
46 import org.apache.coyote.RequestGroupInfo;
47 import org.apache.coyote.RequestInfo;
48 import org.apache.tomcat.util.modeler.Registry;
49 import org.apache.tomcat.util.threads.ThreadPool;
50 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
51
52 /**
53 * Accept ( and send ) TCP messages.
54 *
55 * @author Costin Manolache
56 * @author Bill Barker
57 * jmx:mbean name="jk:service=ChannelNioSocket"
58 * description="Accept socket connections"
59 * jmx:notification name="org.apache.coyote.INVOKE
60 * jmx:notification-handler name="org.apache.jk.JK_SEND_PACKET
61 * jmx:notification-handler name="org.apache.jk.JK_RECEIVE_PACKET
62 * jmx:notification-handler name="org.apache.jk.JK_FLUSH
63 *
64 * Jk can use multiple protocols/transports.
65 * Various container adapters should load this object ( as a bean ),
66 * set configurations and use it. Note that the connector will handle
67 * all incoming protocols - it's not specific to ajp1x. The protocol
68 * is abstracted by MsgContext/Message/Channel.
69 *
70 * A lot of the 'original' behavior is hardcoded - this uses Ajp13 wire protocol,
71 * TCP, Ajp14 API etc.
72 * As we add other protocols/transports/APIs this will change, the current goal
73 * is to get the same level of functionality as in the original jk connector.
74 *
75 * XXX Make the 'message type' pluggable
76 */
77 public class ChannelSocket extends JkHandler
78 implements NotificationBroadcaster, JkChannel {
79 private static org.apache.juli.logging.Log log =
80 org.apache.juli.logging.LogFactory.getLog( ChannelSocket.class );
81
82 private int startPort=8009;
83 private int maxPort=8019; // 0 for backward compat.
84 private int port=startPort;
85 private InetAddress inet;
86 private int serverTimeout;
87 private boolean tcpNoDelay=true; // nodelay to true by default
88 private int linger=100;
89 private int socketTimeout;
90 private int bufferSize = -1;
91 private int packetSize = 8*1024;
92
93 private long requestCount=0;
94
95 ThreadPool tp=ThreadPool.createThreadPool(true);
96
97 /* ==================== Tcp socket options ==================== */
98
99 /**
100 * jmx:managed-constructor description="default constructor"
101 */
102 public ChannelSocket() {
103 // This should be integrated with the domain setup
104 }
105
106 public ThreadPool getThreadPool() {
107 return tp;
108 }
109
110 public long getRequestCount() {
111 return requestCount;
112 }
113
114 /** Set the port for the ajp13 channel.
115 * To support seemless load balancing and jni, we treat this
116 * as the 'base' port - we'll try up until we find one that is not
117 * used. We'll also provide the 'difference' to the main coyote
118 * handler - that will be our 'sessionID' and the position in
119 * the scoreboard and the suffix for the unix domain socket.
120 *
121 * jmx:managed-attribute description="Port to listen" access="READ_WRITE"
122 */
123 public void setPort( int port ) {
124 this.startPort=port;
125 this.port=port;
126 this.maxPort=port+10;
127 }
128
129 public int getPort() {
130 return port;
131 }
132
133 public void setAddress(InetAddress inet) {
134 this.inet=inet;
135 }
136
137 /**
138 * jmx:managed-attribute description="Bind on a specified address" access="READ_WRITE"
139 */
140 public void setAddress(String inet) {
141 try {
142 this.inet= InetAddress.getByName( inet );
143 } catch( Exception ex ) {
144 log.error("Error parsing "+inet,ex);
145 }
146 }
147
148 public String getAddress() {
149 if( inet!=null)
150 return inet.toString();
151 return "/0.0.0.0";
152 }
153
154 /**
155 * Sets the timeout in ms of the server sockets created by this
156 * server. This method allows the developer to make servers
157 * more or less responsive to having their server sockets
158 * shut down.
159 *
160 * <p>By default this value is 1000ms.
161 */
162 public void setServerTimeout(int timeout) {
163 this.serverTimeout = timeout;
164 }
165 public int getServerTimeout() {
166 return serverTimeout;
167 }
168
169 public void setTcpNoDelay( boolean b ) {
170 tcpNoDelay=b;
171 }
172
173 public boolean getTcpNoDelay() {
174 return tcpNoDelay;
175 }
176
177 public void setSoLinger( int i ) {
178 linger=i;
179 }
180
181 public int getSoLinger() {
182 return linger;
183 }
184
185 public void setSoTimeout( int i ) {
186 socketTimeout=i;
187 }
188
189 public int getSoTimeout() {
190 return socketTimeout;
191 }
192
193 public void setMaxPort( int i ) {
194 maxPort=i;
195 }
196
197 public int getMaxPort() {
198 return maxPort;
199 }
200
201 public void setBufferSize(int bs) {
202 bufferSize = bs;
203 }
204
205 public int getBufferSize() {
206 return bufferSize;
207 }
208
209 public void setPacketSize(int ps) {
210 if(ps < 8*1024) {
211 ps = 8*1024;
212 }
213 packetSize = ps;
214 }
215
216 public int getPacketSize() {
217 return packetSize;
218 }
219
220 /** At startup we'll look for the first free port in the range.
221 The difference between this port and the beggining of the range
222 is the 'id'.
223 This is usefull for lb cases ( less config ).
224 */
225 public int getInstanceId() {
226 return port-startPort;
227 }
228
229 /** If set to false, the thread pool will be created in
230 * non-daemon mode, and will prevent main from exiting
231 */
232 public void setDaemon( boolean b ) {
233 tp.setDaemon( b );
234 }
235
236 public boolean getDaemon() {
237 return tp.getDaemon();
238 }
239
240
241 public void setMaxThreads( int i ) {
242 if( log.isDebugEnabled()) log.debug("Setting maxThreads " + i);
243 tp.setMaxThreads(i);
244 }
245
246 public void setMinSpareThreads( int i ) {
247 if( log.isDebugEnabled()) log.debug("Setting minSpareThreads " + i);
248 tp.setMinSpareThreads(i);
249 }
250
251 public void setMaxSpareThreads( int i ) {
252 if( log.isDebugEnabled()) log.debug("Setting maxSpareThreads " + i);
253 tp.setMaxSpareThreads(i);
254 }
255
256 public int getMaxThreads() {
257 return tp.getMaxThreads();
258 }
259
260 public int getMinSpareThreads() {
261 return tp.getMinSpareThreads();
262 }
263
264 public int getMaxSpareThreads() {
265 return tp.getMaxSpareThreads();
266 }
267
268 public void setBacklog(int i) {
269 }
270
271
272 /* ==================== ==================== */
273 ServerSocket sSocket;
274 final int socketNote=1;
275 final int isNote=2;
276 final int osNote=3;
277 final int notifNote=4;
278 boolean paused = false;
279
280 public void pause() throws Exception {
281 synchronized(this) {
282 paused = true;
283 unLockSocket();
284 }
285 }
286
287 public void resume() throws Exception {
288 synchronized(this) {
289 paused = false;
290 notify();
291 }
292 }
293
294
295 public void accept( MsgContext ep ) throws IOException {
296 if( sSocket==null ) return;
297 synchronized(this) {
298 while(paused) {
299 try{
300 wait();
301 } catch(InterruptedException ie) {
302 //Ignore, since can't happen
303 }
304 }
305 }
306 Socket s=sSocket.accept();
307 ep.setNote( socketNote, s );
308 if(log.isDebugEnabled() )
309 log.debug("Accepted socket " + s );
310
311 try {
312 setSocketOptions(s);
313 } catch(SocketException sex) {
314 log.debug("Error initializing Socket Options", sex);
315 }
316
317 requestCount++;
318
319 InputStream is=new BufferedInputStream(s.getInputStream());
320 OutputStream os;
321 if( bufferSize > 0 )
322 os = new BufferedOutputStream( s.getOutputStream(), bufferSize);
323 else
324 os = s.getOutputStream();
325 ep.setNote( isNote, is );
326 ep.setNote( osNote, os );
327 ep.setControl( tp );
328 }
329
330 private void setSocketOptions(Socket s) throws SocketException {
331 if( socketTimeout > 0 )
332 s.setSoTimeout( socketTimeout );
333
334 s.setTcpNoDelay( tcpNoDelay ); // set socket tcpnodelay state
335
336 if( linger > 0 )
337 s.setSoLinger( true, linger);
338 }
339
340 public void resetCounters() {
341 requestCount=0;
342 }
343
344 /** Called after you change some fields at runtime using jmx.
345 Experimental for now.
346 */
347 public void reinit() throws IOException {
348 destroy();
349 init();
350 }
351
352 /**
353 * jmx:managed-operation
354 */
355 public void init() throws IOException {
356 // Find a port.
357 if (startPort == 0) {
358 port = 0;
359 if(log.isInfoEnabled())
360 log.info("JK: ajp13 disabling channelSocket");
361 running = true;
362 return;
363 }
364 if (maxPort < startPort)
365 maxPort = startPort;
366 for( int i=startPort; i<=maxPort; i++ ) {
367 try {
368 if( inet == null ) {
369 sSocket = new ServerSocket( i, 0 );
370 } else {
371 sSocket=new ServerSocket( i, 0, inet );
372 }
373 port=i;
374 break;
375 } catch( IOException ex ) {
376 if(log.isInfoEnabled())
377 log.info("Port busy " + i + " " + ex.toString());
378 continue;
379 }
380 }
381
382 if( sSocket==null ) {
383 log.error("Can't find free port " + startPort + " " + maxPort );
384 return;
385 }
386 if(log.isInfoEnabled())
387 log.info("JK: ajp13 listening on " + getAddress() + ":" + port );
388
389 // If this is not the base port and we are the 'main' channleSocket and
390 // SHM didn't already set the localId - we'll set the instance id
391 if( "channelSocket".equals( name ) &&
392 port != startPort &&
393 (wEnv.getLocalId()==0) ) {
394 wEnv.setLocalId( port - startPort );
395 }
396 if( serverTimeout > 0 )
397 sSocket.setSoTimeout( serverTimeout );
398
399 // XXX Reverse it -> this is a notification generator !!
400 if( next==null && wEnv!=null ) {
401 if( nextName!=null )
402 setNext( wEnv.getHandler( nextName ) );
403 if( next==null )
404 next=wEnv.getHandler( "dispatch" );
405 if( next==null )
406 next=wEnv.getHandler( "request" );
407 }
408 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
409 running = true;
410
411 // Run a thread that will accept connections.
412 // XXX Try to find a thread first - not sure how...
413 if( this.domain != null ) {
414 try {
415 tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
416 getChannelName());
417
418 Registry.getRegistry(null, null)
419 .registerComponent(tp, tpOName, null);
420
421 rgOName = new ObjectName
422 (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
423 Registry.getRegistry(null, null)
424 .registerComponent(global, rgOName, null);
425 } catch (Exception e) {
426 log.error("Can't register threadpool" );
427 }
428 }
429
430 tp.start();
431 SocketAcceptor acceptAjp=new SocketAcceptor( this );
432 tp.runIt( acceptAjp);
433
434 }
435
436 ObjectName tpOName;
437 ObjectName rgOName;
438 RequestGroupInfo global=new RequestGroupInfo();
439 int JMXRequestNote;
440
441 public void start() throws IOException{
442 if( sSocket==null )
443 init();
444 }
445
446 public void stop() throws IOException {
447 destroy();
448 }
449
450 public void registerRequest(Request req, MsgContext ep, int count) {
451 if(this.domain != null) {
452 try {
453 RequestInfo rp=req.getRequestProcessor();
454 rp.setGlobalProcessor(global);
455 ObjectName roname = new ObjectName
456 (getDomain() + ":type=RequestProcessor,worker="+
457 getChannelName()+",name=JkRequest" +count);
458 ep.setNote(JMXRequestNote, roname);
459
460 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
461 } catch( Exception ex ) {
462 log.warn("Error registering request");
463 }
464 }
465 }
466
467 public void open(MsgContext ep) throws IOException {
468 }
469
470
471 public void close(MsgContext ep) throws IOException {
472 Socket s=(Socket)ep.getNote( socketNote );
473 s.close();
474 }
475
476 private void unLockSocket() throws IOException {
477 // Need to create a connection to unlock the accept();
478 Socket s;
479 InetAddress ladr = inet;
480
481 if(port == 0)
482 return;
483 if (ladr == null || "0.0.0.0".equals(ladr.getHostAddress())) {
484 ladr = InetAddress.getLocalHost();
485 }
486 s=new Socket(ladr, port );
487 // setting soLinger to a small value will help shutdown the
488 // connection quicker
489 s.setSoLinger(true, 0);
490
491 s.close();
492 }
493
494 public void destroy() throws IOException {
495 running = false;
496 try {
497 /* If we disabled the channel return */
498 if (port == 0)
499 return;
500 tp.shutdown();
501
502 if(!paused) {
503 unLockSocket();
504 }
505
506 sSocket.close(); // XXX?
507
508 if( tpOName != null ) {
509 Registry.getRegistry(null, null).unregisterComponent(tpOName);
510 }
511 if( rgOName != null ) {
512 Registry.getRegistry(null, null).unregisterComponent(rgOName);
513 }
514 } catch(Exception e) {
515 log.info("Error shutting down the channel " + port + " " +
516 e.toString());
517 if( log.isDebugEnabled() ) log.debug("Trace", e);
518 }
519 }
520
521 public int send( Msg msg, MsgContext ep)
522 throws IOException {
523 msg.end(); // Write the packet header
524 byte buf[]=msg.getBuffer();
525 int len=msg.getLen();
526
527 if(log.isTraceEnabled() )
528 log.trace("send() " + len + " " + buf[4] );
529
530 OutputStream os=(OutputStream)ep.getNote( osNote );
531 os.write( buf, 0, len );
532 return len;
533 }
534
535 public int flush( Msg msg, MsgContext ep)
536 throws IOException {
537 if( bufferSize > 0 ) {
538 OutputStream os=(OutputStream)ep.getNote( osNote );
539 os.flush();
540 }
541 return 0;
542 }
543
544 public int receive( Msg msg, MsgContext ep )
545 throws IOException {
546 if (log.isDebugEnabled()) {
547 log.debug("receive() ");
548 }
549
550 byte buf[]=msg.getBuffer();
551 int hlen=msg.getHeaderLength();
552
553 // XXX If the length in the packet header doesn't agree with the
554 // actual number of bytes read, it should probably return an error
555 // value. Also, callers of this method never use the length
556 // returned -- should probably return true/false instead.
557
558 int rd = this.read(ep, buf, 0, hlen );
559
560 if(rd < 0) {
561 // Most likely normal apache restart.
562 // log.warn("Wrong message " + rd );
563 return rd;
564 }
565
566 msg.processHeader();
567
568 /* After processing the header we know the body
569 length
570 */
571 int blen=msg.getLen();
572
573 // XXX check if enough space - it's assert()-ed !!!
574
575 int total_read = 0;
576
577 total_read = this.read(ep, buf, hlen, blen);
578
579 if ((total_read <= 0) && (blen > 0)) {
580 log.warn("can't read body, waited #" + blen);
581 return -1;
582 }
583
584 if (total_read != blen) {
585 log.warn( "incomplete read, waited #" + blen +
586 " got only " + total_read);
587 return -2;
588 }
589
590 return total_read;
591 }
592
593 /**
594 * Read N bytes from the InputStream, and ensure we got them all
595 * Under heavy load we could experience many fragmented packets
596 * just read Unix Network Programming to recall that a call to
597 * read didn't ensure you got all the data you want
598 *
599 * from read() Linux manual
600 *
601 * On success, the number of bytes read is returned (zero indicates end
602 * of file),and the file position is advanced by this number.
603 * It is not an error if this number is smaller than the number of bytes
604 * requested; this may happen for example because fewer bytes
605 * are actually available right now (maybe because we were close to
606 * end-of-file, or because we are reading from a pipe, or from a
607 * terminal), or because read() was interrupted by a signal.
608 * On error, -1 is returned, and errno is set appropriately. In this
609 * case it is left unspecified whether the file position (if any) changes.
610 *
611 **/
612 public int read( MsgContext ep, byte[] b, int offset, int len)
613 throws IOException {
614 InputStream is=(InputStream)ep.getNote( isNote );
615 int pos = 0;
616 int got;
617
618 while(pos < len) {
619 try {
620 got = is.read(b, pos + offset, len - pos);
621 } catch(SocketException sex) {
622 if(pos > 0) {
623 log.info("Error reading data after "+pos+"bytes",sex);
624 } else {
625 log.debug("Error reading data", sex);
626 }
627 got = -1;
628 }
629 if (log.isTraceEnabled()) {
630 log.trace("read() " + b + " " + (b==null ? 0: b.length) + " " +
631 offset + " " + len + " = " + got );
632 }
633
634 // connection just closed by remote.
635 if (got <= 0) {
636 // This happens periodically, as apache restarts
637 // periodically.
638 // It should be more gracefull ! - another feature for Ajp14
639 // log.warn( "server has closed the current connection (-1)" );
640 return -3;
641 }
642
643 pos += got;
644 }
645 return pos;
646 }
647
648 protected boolean running=true;
649
650 /** Accept incoming connections, dispatch to the thread pool
651 */
652 void acceptConnections() {
653 if( log.isDebugEnabled() )
654 log.debug("Accepting ajp connections on " + port);
655 while( running ) {
656 try{
657 MsgContext ep=createMsgContext(packetSize);
658 ep.setSource(this);
659 ep.setWorkerEnv( wEnv );
660 this.accept(ep);
661
662 if( !running ) break;
663
664 // Since this is a long-running connection, we don't care
665 // about the small GC
666 SocketConnection ajpConn=
667 new SocketConnection(this, ep);
668 tp.runIt( ajpConn );
669 }catch(Exception ex) {
670 if (running)
671 log.warn("Exception executing accept" ,ex);
672 }
673 }
674 }
675
676 /** Process a single ajp connection.
677 */
678 void processConnection(MsgContext ep) {
679 try {
680 MsgAjp recv=new MsgAjp(packetSize);
681 while( running ) {
682 if(paused) { // Drop the connection on pause
683 break;
684 }
685 int status= this.receive( recv, ep );
686 if( status <= 0 ) {
687 if( status==-3)
688 log.debug( "server has been restarted or reset this connection" );
689 else
690 log.warn("Closing ajp connection " + status );
691 break;
692 }
693 ep.setLong( MsgContext.TIMER_RECEIVED, System.currentTimeMillis());
694
695 ep.setType( 0 );
696 // Will call next
697 status= this.invoke( recv, ep );
698 if( status!= JkHandler.OK ) {
699 log.warn("processCallbacks status " + status );
700 break;
701 }
702 }
703 } catch( Exception ex ) {
704 String msg = ex.getMessage();
705 if( msg != null && msg.indexOf( "Connection reset" ) >= 0)
706 log.debug( "Server has been restarted or reset this connection");
707 else if (msg != null && msg.indexOf( "Read timed out" ) >=0 )
708 log.debug( "connection timeout reached");
709 else
710 log.error( "Error, processing connection", ex);
711 } finally {
712 /*
713 * Whatever happened to this connection (remote closed it, timeout, read error)
714 * the socket SHOULD be closed, or we may be in situation where the webserver
715 * will continue to think the socket is still open and will forward request
716 * to tomcat without receiving ever a reply
717 */
718 try {
719 this.close( ep );
720 }
721 catch( Exception e) {
722 log.error( "Error, closing connection", e);
723 }
724 try{
725 Request req = (Request)ep.getRequest();
726 if( req != null ) {
727 ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
728 if( roname != null ) {
729 Registry.getRegistry(null, null).unregisterComponent(roname);
730 }
731 req.getRequestProcessor().setGlobalProcessor(null);
732 }
733 } catch( Exception ee) {
734 log.error( "Error, releasing connection",ee);
735 }
736 }
737 }
738
739 // XXX This should become handleNotification
740 public int invoke( Msg msg, MsgContext ep ) throws IOException {
741 int type=ep.getType();
742
743 switch( type ) {
744 case JkHandler.HANDLE_RECEIVE_PACKET:
745 if( log.isDebugEnabled()) log.debug("RECEIVE_PACKET ?? ");
746 return receive( msg, ep );
747 case JkHandler.HANDLE_SEND_PACKET:
748 return send( msg, ep );
749 case JkHandler.HANDLE_FLUSH:
750 return flush( msg, ep );
751 }
752
753 if( log.isDebugEnabled() )
754 log.debug("Call next " + type + " " + next);
755
756 // Send notification
757 if( nSupport!=null ) {
758 Notification notif=(Notification)ep.getNote(notifNote);
759 if( notif==null ) {
760 notif=new Notification("channelSocket.message", ep, requestCount );
761 ep.setNote( notifNote, notif);
762 }
763 nSupport.sendNotification(notif);
764 }
765
766 if( next != null ) {
767 return next.invoke( msg, ep );
768 } else {
769 log.info("No next ");
770 }
771
772 return OK;
773 }
774
775 public boolean isSameAddress(MsgContext ep) {
776 Socket s=(Socket)ep.getNote( socketNote );
777 return isSameAddress( s.getLocalAddress(), s.getInetAddress());
778 }
779
780 public String getChannelName() {
781 String encodedAddr = "";
782 if (inet != null && !"0.0.0.0".equals(inet.getHostAddress())) {
783 encodedAddr = getAddress();
784 if (encodedAddr.startsWith("/"))
785 encodedAddr = encodedAddr.substring(1);
786 encodedAddr = URLEncoder.encode(encodedAddr) + "-";
787 }
788 return ("jk-" + encodedAddr + port);
789 }
790
791 /**
792 * Return <code>true</code> if the specified client and server addresses
793 * are the same. This method works around a bug in the IBM 1.1.8 JVM on
794 * Linux, where the address bytes are returned reversed in some
795 * circumstances.
796 *
797 * @param server The server's InetAddress
798 * @param client The client's InetAddress
799 */
800 public static boolean isSameAddress(InetAddress server, InetAddress client)
801 {
802 // Compare the byte array versions of the two addresses
803 byte serverAddr[] = server.getAddress();
804 byte clientAddr[] = client.getAddress();
805 if (serverAddr.length != clientAddr.length)
806 return (false);
807 boolean match = true;
808 for (int i = 0; i < serverAddr.length; i++) {
809 if (serverAddr[i] != clientAddr[i]) {
810 match = false;
811 break;
812 }
813 }
814 if (match)
815 return (true);
816
817 // Compare the reversed form of the two addresses
818 for (int i = 0; i < serverAddr.length; i++) {
819 if (serverAddr[i] != clientAddr[(serverAddr.length-1)-i])
820 return (false);
821 }
822 return (true);
823 }
824
825 public void sendNewMessageNotification(Notification notification) {
826 if( nSupport!= null )
827 nSupport.sendNotification(notification);
828 }
829
830 private NotificationBroadcasterSupport nSupport= null;
831
832 public void addNotificationListener(NotificationListener listener,
833 NotificationFilter filter,
834 Object handback)
835 throws IllegalArgumentException
836 {
837 if( nSupport==null ) nSupport=new NotificationBroadcasterSupport();
838 nSupport.addNotificationListener(listener, filter, handback);
839 }
840
841 public void removeNotificationListener(NotificationListener listener)
842 throws ListenerNotFoundException
843 {
844 if( nSupport!=null)
845 nSupport.removeNotificationListener(listener);
846 }
847
848 MBeanNotificationInfo notifInfo[]=new MBeanNotificationInfo[0];
849
850 public void setNotificationInfo( MBeanNotificationInfo info[]) {
851 this.notifInfo=info;
852 }
853
854 public MBeanNotificationInfo[] getNotificationInfo() {
855 return notifInfo;
856 }
857
858 static class SocketAcceptor implements ThreadPoolRunnable {
859 ChannelSocket wajp;
860
861 SocketAcceptor(ChannelSocket wajp ) {
862 this.wajp=wajp;
863 }
864
865 public Object[] getInitData() {
866 return null;
867 }
868
869 public void runIt(Object thD[]) {
870 wajp.acceptConnections();
871 }
872 }
873
874 static class SocketConnection implements ThreadPoolRunnable {
875 ChannelSocket wajp;
876 MsgContext ep;
877
878 SocketConnection(ChannelSocket wajp, MsgContext ep) {
879 this.wajp=wajp;
880 this.ep=ep;
881 }
882
883
884 public Object[] getInitData() {
885 return null;
886 }
887
888 public void runIt(Object perTh[]) {
889 wajp.processConnection(ep);
890 ep = null;
891 }
892 }
893
894 }
895