1 /*
2 * Copyright 2005 The Apache Software Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.tomcat.util.net;
18
19 import java.net.InetAddress;
20 import java.util.ArrayList;
21 import java.util.HashMap;
22 import java.util.Stack;
23
24 import org.apache.commons.logging.Log;
25 import org.apache.commons.logging.LogFactory;
26 import org.apache.tomcat.jni.OS;
27 import org.apache.tomcat.jni.Address;
28 import org.apache.tomcat.jni.Error;
29 import org.apache.tomcat.jni.File;
30 import org.apache.tomcat.jni.Library;
31 import org.apache.tomcat.jni.Poll;
32 import org.apache.tomcat.jni.Pool;
33 import org.apache.tomcat.jni.Socket;
34 import org.apache.tomcat.jni.Status;
35 import org.apache.tomcat.jni.SSL;
36 import org.apache.tomcat.jni.SSLContext;
37 import org.apache.tomcat.jni.SSLSocket;
38 import org.apache.tomcat.util.res.StringManager;
39 import org.apache.tomcat.util.threads.ThreadWithAttributes;
40
41 /**
42 * APR tailored thread pool, providing the following services:
43 * <ul>
44 * <li>Socket acceptor thread</li>
45 * <li>Socket poller thread</li>
46 * <li>Sendfile thread</li>
47 * <li>Worker threads pool</li>
48 * </ul>
49 *
50 * When switching to Java 5, there's an opportunity to use the virtual
51 * machine's thread pool.
52 *
53 * @author Mladen Turk
54 * @author Remy Maucherat
55 */
56 public class AprEndpoint {
57
58
59 // -------------------------------------------------------------- Constants
60
61
62 protected static Log log = LogFactory.getLog(AprEndpoint.class);
63
64 protected static StringManager sm =
65 StringManager.getManager("org.apache.tomcat.util.net.res");
66
67
68 /**
69 * The Request attribute key for the cipher suite.
70 */
71 public static final String CIPHER_SUITE_KEY = "javax.servlet.request.cipher_suite";
72
73 /**
74 * The Request attribute key for the key size.
75 */
76 public static final String KEY_SIZE_KEY = "javax.servlet.request.key_size";
77
78 /**
79 * The Request attribute key for the client certificate chain.
80 */
81 public static final String CERTIFICATE_KEY = "javax.servlet.request.X509Certificate";
82
83 /**
84 * The Request attribute key for the session id.
85 * This one is a Tomcat extension to the Servlet spec.
86 */
87 public static final String SESSION_ID_KEY = "javax.servlet.request.ssl_session";
88
89
90 // ----------------------------------------------------------------- Fields
91
92
93 /**
94 * Synchronization object.
95 */
96 protected final Object threadSync = new Object();
97
98
99 /**
100 * The acceptor thread.
101 */
102 protected Thread acceptorThread = null;
103
104
105 /**
106 * The socket poller thread.
107 */
108 protected Thread pollerThread = null;
109
110
111 /**
112 * The sendfile thread.
113 */
114 protected Thread sendfileThread = null;
115
116
117 /**
118 * Available processors.
119 */
120 // FIXME: Stack is synced, which makes it a non optimal choice
121 protected Stack workers = new Stack();
122
123
124 /**
125 * Running state of the endpoint.
126 */
127 protected volatile boolean running = false;
128
129
130 /**
131 * Will be set to true whenever the endpoint is paused.
132 */
133 protected volatile boolean paused = false;
134
135
136 /**
137 * Track the initialization state of the endpoint.
138 */
139 protected boolean initialized = false;
140
141
142 /**
143 * Current worker threads busy count.
144 */
145 protected int curThreadsBusy = 0;
146
147
148 /**
149 * Current worker threads count.
150 */
151 protected int curThreads = 0;
152
153
154 /**
155 * Sequence number used to generate thread names.
156 */
157 protected int sequence = 0;
158
159
160 /**
161 * Root APR memory pool.
162 */
163 protected long rootPool = 0;
164
165
166 /**
167 * Server socket "pointer".
168 */
169 protected long serverSock = 0;
170
171
172 /**
173 * APR memory pool for the server socket.
174 */
175 protected long serverSockPool = 0;
176
177
178 /**
179 * SSL context.
180 */
181 protected long sslContext = 0;
182
183
184 // ------------------------------------------------------------- Properties
185
186
187 /**
188 * Maximum amount of worker threads.
189 */
190 protected int maxThreads = 60;
191 public void setMaxThreads(int maxThreads) { this.maxThreads = maxThreads; }
192 public int getMaxThreads() { return maxThreads; }
193
194
195 /**
196 * Priority of the acceptor and poller threads.
197 */
198 protected int threadPriority = Thread.NORM_PRIORITY;
199 public void setThreadPriority(int threadPriority) { this.threadPriority = threadPriority; }
200 public int getThreadPriority() { return threadPriority; }
201
202
203 /**
204 * Size of the socket poller.
205 */
206 protected int pollerSize = 768;
207 public void setPollerSize(int pollerSize) { this.pollerSize = pollerSize; }
208 public int getPollerSize() { return pollerSize; }
209
210
211 /**
212 * Size of the sendfile (= concurrent files which can be served).
213 */
214 protected int sendfileSize = 256;
215 public void setSendfileSize(int sendfileSize) { this.sendfileSize = sendfileSize; }
216 public int getSendfileSize() { return sendfileSize; }
217
218
219 /**
220 * Server socket port.
221 */
222 protected int port;
223 public int getPort() { return port; }
224 public void setPort(int port ) { this.port=port; }
225
226
227 /**
228 * Address for the server socket.
229 */
230 protected InetAddress address;
231 public InetAddress getAddress() { return address; }
232 public void setAddress(InetAddress address) { this.address = address; }
233
234
235 /**
236 * Handling of accepted sockets.
237 */
238 protected Handler handler = null;
239 public void setHandler(Handler handler ) { this.handler = handler; }
240 public Handler getHandler() { return handler; }
241
242
243 /**
244 * Allows the server developer to specify the backlog that
245 * should be used for server sockets. By default, this value
246 * is 100.
247 */
248 protected int backlog = 100;
249 public void setBacklog(int backlog) { if (backlog > 0) this.backlog = backlog; }
250 public int getBacklog() { return backlog; }
251
252
253 /**
254 * Socket TCP no delay.
255 */
256 protected boolean tcpNoDelay = false;
257 public boolean getTcpNoDelay() { return tcpNoDelay; }
258 public void setTcpNoDelay(boolean tcpNoDelay) { this.tcpNoDelay = tcpNoDelay; }
259
260
261 /**
262 * Socket linger.
263 */
264 protected int soLinger = 100;
265 public int getSoLinger() { return soLinger; }
266 public void setSoLinger(int soLinger) { this.soLinger = soLinger; }
267
268
269 /**
270 * Socket timeout.
271 */
272 protected int soTimeout = -1;
273 public int getSoTimeout() { return soTimeout; }
274 public void setSoTimeout(int soTimeout) { this.soTimeout = soTimeout; }
275
276
277 /**
278 * Timeout on first request read before going to the poller, in ms.
279 */
280 protected int firstReadTimeout = 100;
281 public int getFirstReadTimeout() { return firstReadTimeout; }
282 public void setFirstReadTimeout(int firstReadTimeout) { this.firstReadTimeout = firstReadTimeout; }
283
284
285 /**
286 * Poll interval, in microseconds. The smaller the value, the more CPU the poller
287 * will use, but the more responsive to activity it will be.
288 */
289 protected int pollTime = 5000;
290 public int getPollTime() { return pollTime; }
291 public void setPollTime(int pollTime) { this.pollTime = pollTime; }
292
293
294 /**
295 * The default is true - the created threads will be
296 * in daemon mode. If set to false, the control thread
297 * will not be daemon - and will keep the process alive.
298 */
299 protected boolean daemon = true;
300 public void setDaemon(boolean b) { daemon = b; }
301 public boolean getDaemon() { return daemon; }
302
303
304 /**
305 * Name of the thread pool, which will be used for naming child threads.
306 */
307 protected String name = "TP";
308 public void setName(String name) { this.name = name; }
309 public String getName() { return name; }
310
311
312 /**
313 * Use endfile for sending static files.
314 */
315 protected boolean useSendfile = true;
316 public void setUseSendfile(boolean useSendfile) { this.useSendfile = useSendfile; }
317 public boolean getUseSendfile() { return useSendfile; }
318
319
320 /**
321 * Number of keepalive sockets.
322 */
323 protected int keepAliveCount = 0;
324 public int getKeepAliveCount() { return keepAliveCount; }
325
326
327 /**
328 * Number of sendfile sockets.
329 */
330 protected int sendfileCount = 0;
331 public int getSendfileCount() { return sendfileCount; }
332
333
334 /**
335 * The socket poller.
336 */
337 protected Poller poller = null;
338 public Poller getPoller() { return poller; }
339
340
341 /**
342 * The static file sender.
343 */
344 protected Sendfile sendfile = null;
345 public Sendfile getSendfile() { return sendfile; }
346
347
348 /**
349 * Dummy maxSpareThreads property.
350 */
351 public int getMaxSpareThreads() { return 0; }
352
353
354 /**
355 * Dummy minSpareThreads property.
356 */
357 public int getMinSpareThreads() { return 0; }
358
359
360 /**
361 * SSL engine.
362 */
363 protected String SSLEngine = "off";
364 public String getSSLEngine() { return SSLEngine; }
365 public void setSSLEngine(String SSLEngine) { this.SSLEngine = SSLEngine; }
366
367
368 /**
369 * SSL protocols.
370 */
371 protected String SSLProtocol = "all";
372 public String getSSLProtocol() { return SSLProtocol; }
373 public void setSSLProtocol(String SSLProtocol) { this.SSLProtocol = SSLProtocol; }
374
375
376 /**
377 * SSL password (if a cert is encrypted, and no password has been provided, a callback
378 * will ask for a password).
379 */
380 protected String SSLPassword = null;
381 public String getSSLPassword() { return SSLPassword; }
382 public void setSSLPassword(String SSLPassword) { this.SSLPassword = SSLPassword; }
383
384
385 /**
386 * SSL cipher suite.
387 */
388 protected String SSLCipherSuite = "ALL";
389 public String getSSLCipherSuite() { return SSLCipherSuite; }
390 public void setSSLCipherSuite(String SSLCipherSuite) { this.SSLCipherSuite = SSLCipherSuite; }
391
392
393 /**
394 * SSL certificate file.
395 */
396 protected String SSLCertificateFile = null;
397 public String getSSLCertificateFile() { return SSLCertificateFile; }
398 public void setSSLCertificateFile(String SSLCertificateFile) { this.SSLCertificateFile = SSLCertificateFile; }
399
400
401 /**
402 * SSL certificate key file.
403 */
404 protected String SSLCertificateKeyFile = null;
405 public String getSSLCertificateKeyFile() { return SSLCertificateKeyFile; }
406 public void setSSLCertificateKeyFile(String SSLCertificateKeyFile) { this.SSLCertificateKeyFile = SSLCertificateKeyFile; }
407
408
409 /**
410 * SSL certificate chain file.
411 */
412 protected String SSLCertificateChainFile = null;
413 public String getSSLCertificateChainFile() { return SSLCertificateChainFile; }
414 public void setSSLCertificateChainFile(String SSLCertificateChainFile) { this.SSLCertificateChainFile = SSLCertificateChainFile; }
415
416
417 /**
418 * SSL CA certificate path.
419 */
420 protected String SSLCACertificatePath = null;
421 public String getSSLCACertificatePath() { return SSLCACertificatePath; }
422 public void setSSLCACertificatePath(String SSLCACertificatePath) { this.SSLCACertificatePath = SSLCACertificatePath; }
423
424
425 /**
426 * SSL CA certificate file.
427 */
428 protected String SSLCACertificateFile = null;
429 public String getSSLCACertificateFile() { return SSLCACertificateFile; }
430 public void setSSLCACertificateFile(String SSLCACertificateFile) { this.SSLCACertificateFile = SSLCACertificateFile; }
431
432
433 /**
434 * SSL CA revocation path.
435 */
436 protected String SSLCARevocationPath = null;
437 public String getSSLCARevocationPath() { return SSLCARevocationPath; }
438 public void setSSLCARevocationPath(String SSLCARevocationPath) { this.SSLCARevocationPath = SSLCARevocationPath; }
439
440
441 /**
442 * SSL CA revocation file.
443 */
444 protected String SSLCARevocationFile = null;
445 public String getSSLCARevocationFile() { return SSLCARevocationFile; }
446 public void setSSLCARevocationFile(String SSLCARevocationFile) { this.SSLCARevocationFile = SSLCARevocationFile; }
447
448
449 /**
450 * SSL verify client.
451 */
452 protected String SSLVerifyClient = "none";
453 public String getSSLVerifyClient() { return SSLVerifyClient; }
454 public void setSSLVerifyClient(String SSLVerifyClient) { this.SSLVerifyClient = SSLVerifyClient; }
455
456
457 /**
458 * SSL verify depth.
459 */
460 protected int SSLVerifyDepth = 10;
461 public int getSSLVerifyDepth() { return SSLVerifyDepth; }
462 public void setSSLVerifyDepth(int SSLVerifyDepth) { this.SSLVerifyDepth = SSLVerifyDepth; }
463
464
465 // --------------------------------------------------------- Public Methods
466
467
468 /**
469 * Return the amount of threads that are managed by the pool.
470 *
471 * @return the amount of threads that are managed by the pool
472 */
473 public int getCurrentThreadCount() {
474 return curThreads;
475 }
476
477
478 /**
479 * Return the amount of threads currently busy.
480 *
481 * @return the amount of threads currently busy
482 */
483 public int getCurrentThreadsBusy() {
484 return curThreadsBusy;
485 }
486
487
488 /**
489 * Return the state of the endpoint.
490 *
491 * @return true if the endpoint is running, false otherwise
492 */
493 public boolean isRunning() {
494 return running;
495 }
496
497
498 /**
499 * Return the state of the endpoint.
500 *
501 * @return true if the endpoint is paused, false otherwise
502 */
503 public boolean isPaused() {
504 return paused;
505 }
506
507
508 // ----------------------------------------------- Public Lifecycle Methods
509
510
511 /**
512 * Initialize the endpoint.
513 */
514 public void init()
515 throws Exception {
516
517 if (initialized)
518 return;
519
520 // Create the root APR memory pool
521 rootPool = Pool.create(0);
522 // Create the pool for the server socket
523 serverSockPool = Pool.create(rootPool);
524 // Create the APR address that will be bound
525 String addressStr = null;
526 if (address == null) {
527 addressStr = null;
528 } else {
529 addressStr = address.getHostAddress();
530 }
531 long inetAddress = Address.info(addressStr, Socket.APR_INET,
532 port, 0, rootPool);
533 // Create the APR server socket
534 serverSock = Socket.create(Socket.APR_INET, Socket.SOCK_STREAM,
535 Socket.APR_PROTO_TCP, rootPool);
536 if (OS.IS_UNIX) {
537 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
538 }
539 // Deal with the firewalls that tend to drop the inactive sockets
540 Socket.optSet(serverSock, Socket.APR_SO_KEEPALIVE, 1);
541 // Bind the server socket
542 int ret = Socket.bind(serverSock, inetAddress);
543 if (ret != 0) {
544 throw new Exception(sm.getString("endpoint.init.bind", "" + ret));
545 }
546 // Start listening on the server socket
547 ret = Socket.listen(serverSock, backlog);
548 if (ret != 0) {
549 throw new Exception(sm.getString("endpoint.init.listen", "" + ret));
550 }
551 if (OS.IS_WIN32 || OS.IS_WIN64) {
552 // On Windows set the reuseaddr flag after the bind/listen
553 Socket.optSet(serverSock, Socket.APR_SO_REUSEADDR, 1);
554 }
555
556 // Sendfile usage on systems which don't support it cause major problems
557 if (useSendfile && !Library.APR_HAS_SENDFILE) {
558 log.warn(sm.getString("endpoint.sendfile.nosupport"));
559 useSendfile = false;
560 }
561
562 // Delay accepting of new connections until data is available
563 // Only Linux kernels 2.4 + have that implemented
564 // on other platforms this call is noop and will return APR_ENOTIMPL.
565 Socket.optSet(serverSock, Socket.APR_TCP_DEFER_ACCEPT, 1);
566
567 // Initialize SSL if needed
568 if (!"off".equalsIgnoreCase(SSLEngine)) {
569 // Initialize SSL
570 // FIXME: one per VM call ?
571 if ("on".equalsIgnoreCase(SSLEngine)) {
572 SSL.initialize(null);
573 } else {
574 SSL.initialize(SSLEngine);
575 }
576 // SSL protocol
577 int value = SSL.SSL_PROTOCOL_ALL;
578 if ("SSLv2".equalsIgnoreCase(SSLProtocol)) {
579 value = SSL.SSL_PROTOCOL_SSLV2;
580 } else if ("SSLv3".equalsIgnoreCase(SSLProtocol)) {
581 value = SSL.SSL_PROTOCOL_SSLV3;
582 } else if ("TLSv1".equalsIgnoreCase(SSLProtocol)) {
583 value = SSL.SSL_PROTOCOL_TLSV1;
584 } else if ("SSLv2+SSLv3".equalsIgnoreCase(SSLProtocol)) {
585 value = SSL.SSL_PROTOCOL_SSLV2 | SSL.SSL_PROTOCOL_SSLV3;
586 }
587 // Create SSL Context
588 sslContext = SSLContext.make(rootPool, value, SSL.SSL_MODE_SERVER);
589 // List the ciphers that the client is permitted to negotiate
590 SSLContext.setCipherSuite(sslContext, SSLCipherSuite);
591 // Load Server key and certificate
592 SSLContext.setCertificate(sslContext, SSLCertificateFile, SSLCertificateKeyFile, SSLPassword, SSL.SSL_AIDX_RSA);
593 // Support Client Certificates
594 if (SSLCACertificateFile != null) {
595 SSLContext.setCACertificate(sslContext, SSLCACertificateFile, null);
596 }
597 // Client certificate verification
598 value = SSL.SSL_CVERIFY_NONE;
599 if ("optional".equalsIgnoreCase(SSLVerifyClient)) {
600 value = SSL.SSL_CVERIFY_OPTIONAL;
601 } else if ("require".equalsIgnoreCase(SSLVerifyClient)) {
602 value = SSL.SSL_CVERIFY_REQUIRE;
603 } else if ("optionalNoCA".equalsIgnoreCase(SSLVerifyClient)) {
604 value = SSL.SSL_CVERIFY_OPTIONAL_NO_CA;
605 }
606 SSLContext.setVerify(sslContext, value, SSLVerifyDepth);
607 // For now, sendfile is not supported with SSL
608 useSendfile = false;
609 }
610
611 initialized = true;
612
613 }
614
615
616 /**
617 * Start the APR endpoint, creating acceptor, poller and sendfile threads.
618 */
619 public void start()
620 throws Exception {
621 // Initialize socket if not done before
622 if (!initialized) {
623 init();
624 }
625 if (!running) {
626 running = true;
627 paused = false;
628
629 // Start acceptor thread
630 acceptorThread = new Thread(new Acceptor(), getName() + "-Acceptor");
631 acceptorThread.setPriority(getThreadPriority());
632 acceptorThread.setDaemon(true);
633 acceptorThread.start();
634
635 // Start poller thread
636 poller = new Poller();
637 poller.init();
638 pollerThread = new Thread(poller, getName() + "-Poller");
639 pollerThread.setPriority(getThreadPriority());
640 pollerThread.setDaemon(true);
641 pollerThread.start();
642
643 // Start sendfile thread
644 if (useSendfile) {
645 sendfile = new Sendfile();
646 sendfile.init();
647 sendfileThread = new Thread(sendfile, getName() + "-Sendfile");
648 sendfileThread.setPriority(getThreadPriority());
649 sendfileThread.setDaemon(true);
650 sendfileThread.start();
651 }
652 }
653 }
654
655
656 /**
657 * Pause the endpoint, which will make it stop accepting new sockets.
658 */
659 public void pause() {
660 if (running && !paused) {
661 paused = true;
662 unlockAccept();
663 }
664 }
665
666
667 /**
668 * Resume the endpoint, which will make it start accepting new sockets
669 * again.
670 */
671 public void resume() {
672 if (running) {
673 paused = false;
674 }
675 }
676
677
678 /**
679 * Stop the endpoint. This will cause all processing threads to stop.
680 */
681 public void stop() {
682 if (running) {
683 running = false;
684 unlockAccept();
685 poller.destroy();
686 if (useSendfile) {
687 sendfile.destroy();
688 }
689 acceptorThread = null;
690 pollerThread = null;
691 sendfileThread = null;
692 }
693 }
694
695
696 /**
697 * Deallocate APR memory pools, and close server socket.
698 */
699 public void destroy() throws Exception {
700 if (running) {
701 stop();
702 }
703 Pool.destroy(serverSockPool);
704 serverSockPool = 0;
705 // Close server socket
706 Socket.close(serverSock);
707 serverSock = 0;
708 sslContext = 0;
709 // Close all APR memory pools and resources
710 Pool.destroy(rootPool);
711 rootPool = 0;
712 initialized = false ;
713 }
714
715
716 // ------------------------------------------------------ Protected Methods
717
718
719 /**
720 * Get a sequence number used for thread naming.
721 */
722 protected int getSequence() {
723 return sequence++;
724 }
725
726
727 /**
728 * Unlock the server socket accept using a bugus connection.
729 */
730 protected void unlockAccept() {
731 java.net.Socket s = null;
732 try {
733 // Need to create a connection to unlock the accept();
734 if (address == null) {
735 s = new java.net.Socket("127.0.0.1", port);
736 } else {
737 s = new java.net.Socket(address, port);
738 // setting soLinger to a small value will help shutdown the
739 // connection quicker
740 s.setSoLinger(true, 0);
741 }
742 } catch(Exception e) {
743 if (log.isDebugEnabled()) {
744 log.debug(sm.getString("endpoint.debug.unlock", "" + port), e);
745 }
746 } finally {
747 if (s != null) {
748 try {
749 s.close();
750 } catch (Exception e) {
751 // Ignore
752 }
753 }
754 }
755 }
756
757
758 /**
759 * Process the specified connection.
760 */
761 protected boolean setSocketOptions(long socket) {
762 // Process the connection
763 int step = 1;
764 try {
765
766 // 1: Set socket options: timeout, linger, etc
767 if (soLinger >= 0)
768 Socket.optSet(socket, Socket.APR_SO_LINGER, soLinger);
769 if (tcpNoDelay)
770 Socket.optSet(socket, Socket.APR_TCP_NODELAY, (tcpNoDelay ? 1 : 0));
771 if (soTimeout > 0)
772 Socket.timeoutSet(socket, soTimeout * 1000);
773
774 // 2: SSL handshake
775 step = 2;
776 if (sslContext != 0) {
777 SSLSocket.attach(sslContext, socket);
778 if (SSLSocket.handshake(socket) != 0) {
779 if (log.isDebugEnabled()) {
780 log.debug(sm.getString("endpoint.err.handshake") + ": " + SSL.getLastError());
781 }
782 return false;
783 }
784 }
785
786 } catch (Throwable t) {
787 if (step == 2) {
788 if (log.isDebugEnabled()) {
789 log.debug(sm.getString("endpoint.err.handshake"), t);
790 }
791 } else {
792 log.error(sm.getString("endpoint.err.unexpected"), t);
793 }
794 // Tell to close the socket
795 return false;
796 }
797 return true;
798 }
799
800
801 /**
802 * Create (or allocate) and return an available processor for use in
803 * processing a specific HTTP request, if possible. If the maximum
804 * allowed processors have already been created and are in use, return
805 * <code>null</code> instead.
806 */
807 protected Worker createWorkerThread() {
808
809 synchronized (workers) {
810 if (workers.size() > 0) {
811 curThreadsBusy++;
812 return ((Worker) workers.pop());
813 }
814 if ((maxThreads > 0) && (curThreads < maxThreads)) {
815 curThreadsBusy++;
816 return (newWorkerThread());
817 } else {
818 if (maxThreads < 0) {
819 curThreadsBusy++;
820 return (newWorkerThread());
821 } else {
822 return (null);
823 }
824 }
825 }
826
827 }
828
829
830 /**
831 * Create and return a new processor suitable for processing HTTP
832 * requests and returning the corresponding responses.
833 */
834 protected Worker newWorkerThread() {
835
836 Worker workerThread = new Worker();
837 workerThread.start();
838 return (workerThread);
839
840 }
841
842
843 /**
844 * Return a new worker thread, and block while to worker is available.
845 */
846 protected Worker getWorkerThread() {
847 // Allocate a new worker thread
848 Worker workerThread = createWorkerThread();
849 while (workerThread == null) {
850 try {
851 synchronized (workers) {
852 workers.wait();
853 }
854 } catch (InterruptedException e) {
855 // Ignore
856 }
857 workerThread = createWorkerThread();
858 }
859 return workerThread;
860 }
861
862
863 /**
864 * Recycle the specified Processor so that it can be used again.
865 *
866 * @param workerThread The processor to be recycled
867 */
868 protected void recycleWorkerThread(Worker workerThread) {
869 synchronized (workers) {
870 workers.push(workerThread);
871 curThreadsBusy--;
872 workers.notify();
873 }
874 }
875
876
877 // --------------------------------------------------- Acceptor Inner Class
878
879
880 /**
881 * Server socket acceptor thread.
882 */
883 protected class Acceptor implements Runnable {
884
885
886 /**
887 * The background thread that listens for incoming TCP/IP connections and
888 * hands them off to an appropriate processor.
889 */
890 public void run() {
891
892 // Loop until we receive a shutdown command
893 while (running) {
894
895 // Loop if endpoint is paused
896 while (paused) {
897 try {
898 Thread.sleep(1000);
899 } catch (InterruptedException e) {
900 // Ignore
901 }
902 }
903
904 // Allocate a new worker thread
905 Worker workerThread = getWorkerThread();
906
907 // Accept the next incoming connection from the server socket
908 try {
909 long socket = Socket.accept(serverSock);
910 // Hand this socket off to an appropriate processor
911 if (setSocketOptions(socket)) {
912 workerThread.assign(socket);
913 } else {
914 // Close socket and pool right away
915 Socket.destroy(socket);
916 }
917 } catch (Exception e) {
918 log.error(sm.getString("endpoint.accept.fail"), e);
919 }
920
921 // The processor will recycle itself when it finishes
922
923 }
924
925 // Notify the threadStop() method that we have shut ourselves down
926 synchronized (threadSync) {
927 threadSync.notifyAll();
928 }
929
930 }
931
932 }
933
934
935 // ----------------------------------------------------- Poller Inner Class
936
937
938 /**
939 * Poller class.
940 */
941 public class Poller implements Runnable {
942
943 protected long serverPollset = 0;
944 protected long pool = 0;
945 protected long[] desc;
946
947 protected long[] addS;
948 protected int addCount = 0;
949
950 /**
951 * Create the poller. With some versions of APR, the maximum poller size will
952 * be 62 (reocmpiling APR is necessary to remove this limitation).
953 */
954 protected void init() {
955 pool = Pool.create(serverSockPool);
956 try {
957 serverPollset = Poll.create(pollerSize, pool, 0, soTimeout * 1000);
958 } catch (Error e) {
959 if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
960 try {
961 // Use WIN32 maximum poll size
962 pollerSize = 62;
963 serverPollset = Poll.create(pollerSize, pool, 0, soTimeout * 1000);
964 log.warn(sm.getString("endpoint.poll.limitedpollsize"));
965 } catch (Error err) {
966 log.error(sm.getString("endpoint.poll.initfail"), e);
967 }
968 } else {
969 log.error(sm.getString("endpoint.poll.initfail"), e);
970 }
971 }
972 desc = new long[pollerSize * 2];
973 keepAliveCount = 0;
974 addS = new long[pollerSize];
975 addCount = 0;
976 }
977
978 /**
979 * Destroy the poller.
980 */
981 protected void destroy() {
982 // Close all sockets in the add queue
983 for (int i = 0; i < addCount; i--) {
984 Socket.destroy(addS[i]);
985 }
986 // Close all sockets still in the poller
987 int rv = Poll.pollset(serverPollset, desc);
988 if (rv > 0) {
989 for (int n = 0; n < rv; n++) {
990 Socket.destroy(desc[n*2+1]);
991 }
992 }
993 Pool.destroy(pool);
994 keepAliveCount = 0;
995 addCount = 0;
996 }
997
998 /**
999 * Add specified socket and associated pool to the poller. The socket will
1000 * be added to a temporary array, and polled first after a maximum amount
1001 * of time equal to pollTime (in most cases, latency will be much lower,
1002 * however).
1003 *
1004 * @param socket to add to the poller
1005 */
1006 public void add(long socket) {
1007 synchronized (addS) {
1008 // Add socket to the list. Newly added sockets will wait
1009 // at most for pollTime before being polled
1010 if (addCount >= addS.length) {
1011 // Can't do anything: close the socket right away
1012 Socket.destroy(socket);
1013 return;
1014 }
1015 addS[addCount] = socket;
1016 addCount++;
1017 addS.notify();
1018 }
1019 }
1020
1021 /**
1022 * The background thread that listens for incoming TCP/IP connections and
1023 * hands them off to an appropriate processor.
1024 */
1025 public void run() {
1026
1027 long maintainTime = 0;
1028 // Loop until we receive a shutdown command
1029 while (running) {
1030 // Loop if endpoint is paused
1031 while (paused) {
1032 try {
1033 // TODO: We can easly do the maintenance here
1034 Thread.sleep(1000);
1035 } catch (InterruptedException e) {
1036 // Ignore
1037 }
1038 }
1039
1040 while (keepAliveCount < 1 && addCount < 1) {
1041 // Reset maintain time.
1042 maintainTime = 0;
1043 try {
1044 synchronized (addS) {
1045 addS.wait();
1046 }
1047 } catch (InterruptedException e) {
1048 // Ignore
1049 }
1050 }
1051
1052 try {
1053 // Add sockets which are waiting to the poller
1054 if (addCount > 0) {
1055 synchronized (addS) {
1056 for (int i = (addCount - 1); i >= 0; i--) {
1057 int rv = Poll.add
1058 (serverPollset, addS[i], Poll.APR_POLLIN);
1059 if (rv == Status.APR_SUCCESS) {
1060 keepAliveCount++;
1061 } else {
1062 // Can't do anything: close the socket right away
1063 Socket.destroy(addS[i]);
1064 }
1065 }
1066 addCount = 0;
1067 }
1068 }
1069 maintainTime += pollTime;
1070 // Pool for the specified interval
1071 int rv = Poll.poll(serverPollset, pollTime, desc, true);
1072 if (rv > 0) {
1073 keepAliveCount -= rv;
1074 for (int n = 0; n < rv; n++) {
1075 // Check for failed sockets
1076 if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1077 || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
1078 // Close socket and clear pool
1079 Socket.destroy(desc[n*2+1]);
1080 continue;
1081 }
1082 // Hand this socket off to a worker
1083 getWorkerThread().assign(desc[n*2+1]);
1084 }
1085 } else if (rv < 0) {
1086 /* Any non timeup error is critical */
1087 if (-rv != Status.TIMEUP) {
1088 log.error(sm.getString("endpoint.poll.fail", Error.strerror(-rv)));
1089 // Handle poll critical failure
1090 synchronized (this) {
1091 destroy();
1092 init();
1093 }
1094 }
1095 }
1096 if (soTimeout > 0 && maintainTime > 1000000L) {
1097 rv = Poll.maintain(serverPollset, desc, true);
1098 maintainTime = 0;
1099 if (rv > 0) {
1100 keepAliveCount -= rv;
1101 for (int n = 0; n < rv; n++) {
1102 // Close socket and clear pool
1103 Socket.destroy(desc[n]);
1104 }
1105 }
1106 }
1107 } catch (Throwable t) {
1108 log.error(sm.getString("endpoint.poll.error"), t);
1109 }
1110
1111 }
1112
1113 // Notify the threadStop() method that we have shut ourselves down
1114 synchronized (threadSync) {
1115 threadSync.notifyAll();
1116 }
1117
1118 }
1119
1120 }
1121
1122
1123 // ----------------------------------------------------- Worker Inner Class
1124
1125
1126 /**
1127 * Server processor class.
1128 */
1129 protected class Worker implements Runnable {
1130
1131
1132 protected Thread thread = null;
1133 protected boolean available = false;
1134 protected long socket = 0;
1135
1136
1137 /**
1138 * Process an incoming TCP/IP connection on the specified socket. Any
1139 * exception that occurs during processing must be logged and swallowed.
1140 * <b>NOTE</b>: This method is called from our Connector's thread. We
1141 * must assign it to our own thread so that multiple simultaneous
1142 * requests can be handled.
1143 *
1144 * @param socket TCP socket to process
1145 */
1146 protected synchronized void assign(long socket) {
1147
1148 // Wait for the Processor to get the previous Socket
1149 while (available) {
1150 try {
1151 wait();
1152 } catch (InterruptedException e) {
1153 }
1154 }
1155
1156 // Store the newly available Socket and notify our thread
1157 this.socket = socket;
1158 available = true;
1159 notifyAll();
1160
1161 }
1162
1163
1164 /**
1165 * Await a newly assigned Socket from our Connector, or <code>null</code>
1166 * if we are supposed to shut down.
1167 */
1168 protected synchronized long await() {
1169
1170 // Wait for the Connector to provide a new Socket
1171 while (!available) {
1172 try {
1173 wait();
1174 } catch (InterruptedException e) {
1175 }
1176 }
1177
1178 // Notify the Connector that we have received this Socket
1179 long socket = this.socket;
1180 available = false;
1181 notifyAll();
1182
1183 return (socket);
1184
1185 }
1186
1187
1188 /**
1189 * The background thread that listens for incoming TCP/IP connections and
1190 * hands them off to an appropriate processor.
1191 */
1192 public void run() {
1193
1194 // Process requests until we receive a shutdown signal
1195 while (running) {
1196
1197 // Wait for the next socket to be assigned
1198 long socket = await();
1199 if (socket == 0)
1200 continue;
1201
1202 // Process the request from this socket
1203 if (!handler.process(socket)) {
1204 // Close socket and pool
1205 Socket.destroy(socket);
1206 socket = 0;
1207 }
1208
1209 // Finish up this request
1210 recycleWorkerThread(this);
1211
1212 }
1213
1214 // Tell threadStop() we have shut ourselves down successfully
1215 synchronized (this) {
1216 threadSync.notifyAll();
1217 }
1218
1219 }
1220
1221
1222 /**
1223 * Start the background processing thread.
1224 */
1225 public void start() {
1226 thread = new ThreadWithAttributes(AprEndpoint.this, this);
1227 thread.setName(getName() + "-" + (++curThreads));
1228 thread.setDaemon(true);
1229 thread.start();
1230 }
1231
1232
1233 }
1234
1235
1236 // ----------------------------------------------- SendfileData Inner Class
1237
1238
1239 /**
1240 * SendfileData class.
1241 */
1242 public static class SendfileData {
1243 // File
1244 public String fileName;
1245 public long fd;
1246 public long fdpool;
1247 // Range information
1248 public long start;
1249 public long end;
1250 // Socket and socket pool
1251 public long socket;
1252 // Position
1253 public long pos;
1254 // KeepAlive flag
1255 public boolean keepAlive;
1256 }
1257
1258
1259 // --------------------------------------------------- Sendfile Inner Class
1260
1261
1262 /**
1263 * Sendfile class.
1264 */
1265 public class Sendfile implements Runnable {
1266
1267 protected long sendfilePollset = 0;
1268 protected long pool = 0;
1269 protected long[] desc;
1270 protected HashMap sendfileData;
1271
1272 protected ArrayList addS;
1273
1274 /**
1275 * Create the sendfile poller. With some versions of APR, the maximum poller size will
1276 * be 62 (reocmpiling APR is necessary to remove this limitation).
1277 */
1278 protected void init() {
1279 pool = Pool.create(serverSockPool);
1280 try {
1281 sendfilePollset = Poll.create(sendfileSize, pool, 0, soTimeout * 1000);
1282 } catch (Error e) {
1283 if (Status.APR_STATUS_IS_EINVAL(e.getError())) {
1284 try {
1285 // Use WIN32 maximum poll size
1286 sendfileSize = 62;
1287 sendfilePollset = Poll.create(sendfileSize, pool, 0, soTimeout * 1000);
1288 log.warn(sm.getString("endpoint.poll.limitedpollsize"));
1289 } catch (Error err) {
1290 log.error(sm.getString("endpoint.poll.initfail"), e);
1291 }
1292 } else {
1293 log.error(sm.getString("endpoint.poll.initfail"), e);
1294 }
1295 }
1296 desc = new long[sendfileSize * 2];
1297 sendfileData = new HashMap(sendfileSize);
1298 addS = new ArrayList();
1299 }
1300
1301 /**
1302 * Destroy the poller.
1303 */
1304 protected void destroy() {
1305 // Close any socket remaining in the add queue
1306 for (int i = (addS.size() - 1); i >= 0; i--) {
1307 SendfileData data = (SendfileData) addS.get(i);
1308 Socket.destroy(data.socket);
1309 }
1310 // Close all sockets still in the poller
1311 int rv = Poll.pollset(sendfilePollset, desc);
1312 if (rv > 0) {
1313 for (int n = 0; n < rv; n++) {
1314 Socket.destroy(desc[n*2+1]);
1315 }
1316 }
1317 Pool.destroy(pool);
1318 sendfileData.clear();
1319 }
1320
1321 /**
1322 * Add the sendfile data to the sendfile poller. Note that in most cases,
1323 * the initial non blocking calls to sendfile will return right away, and
1324 * will be handled asynchronously inside the kernel. As a result,
1325 * the poller will never be used.
1326 *
1327 * @param data containing the reference to the data which should be snet
1328 * @return true if all the data has been sent right away, and false
1329 * otherwise
1330 */
1331 public boolean add(SendfileData data) {
1332 // Initialize fd from data given
1333 try {
1334 data.fdpool = Socket.pool(data.socket);
1335 data.fd = File.open
1336 (data.fileName, File.APR_FOPEN_READ
1337 | File.APR_FOPEN_SENDFILE_ENABLED | File.APR_FOPEN_BINARY,
1338 0, data.fdpool);
1339 data.pos = data.start;
1340 // Set the socket to nonblocking mode
1341 Socket.timeoutSet(data.socket, 0);
1342 while (true) {
1343 long nw = Socket.sendfilen(data.socket, data.fd,
1344 data.pos, data.end - data.pos, 0);
1345 if (nw < 0) {
1346 if (!(-nw == Status.EAGAIN)) {
1347 Socket.destroy(data.socket);
1348 data.socket = 0;
1349 return false;
1350 } else {
1351 // Break the loop and add the socket to poller.
1352 break;
1353 }
1354 } else {
1355 data.pos = data.pos + nw;
1356 if (data.pos >= data.end) {
1357 // Entire file has been sent
1358 Pool.destroy(data.fdpool);
1359 // Set back socket to blocking mode
1360 Socket.timeoutSet(data.socket, soTimeout * 1000);
1361 return true;
1362 }
1363 }
1364 }
1365 } catch (Exception e) {
1366 log.error(sm.getString("endpoint.sendfile.error"), e);
1367 return false;
1368 }
1369 // Add socket to the list. Newly added sockets will wait
1370 // at most for pollTime before being polled
1371 synchronized (addS) {
1372 addS.add(data);
1373 addS.notify();
1374 }
1375 return false;
1376 }
1377
1378 /**
1379 * Remove socket from the poller.
1380 *
1381 * @param data the sendfile data which should be removed
1382 */
1383 protected void remove(SendfileData data) {
1384 int rv = Poll.remove(sendfilePollset, data.socket);
1385 if (rv == Status.APR_SUCCESS) {
1386 sendfileCount--;
1387 }
1388 sendfileData.remove(data);
1389 }
1390
1391 /**
1392 * The background thread that listens for incoming TCP/IP connections and
1393 * hands them off to an appropriate processor.
1394 */
1395 public void run() {
1396
1397 // Loop until we receive a shutdown command
1398 while (running) {
1399
1400 // Loop if endpoint is paused
1401 while (paused) {
1402 try {
1403 Thread.sleep(1000);
1404 } catch (InterruptedException e) {
1405 // Ignore
1406 }
1407 }
1408
1409 while (sendfileCount < 1 && addS.size() < 1) {
1410 try {
1411 synchronized (addS) {
1412 addS.wait();
1413 }
1414 } catch (InterruptedException e) {
1415 // Ignore
1416 }
1417 }
1418
1419 try {
1420 // Add socket to the poller
1421 if (addS.size() > 0) {
1422 synchronized (addS) {
1423 for (int i = (addS.size() - 1); i >= 0; i--) {
1424 SendfileData data = (SendfileData) addS.get(i);
1425 int rv = Poll.add(sendfilePollset, data.socket, Poll.APR_POLLOUT);
1426 if (rv == Status.APR_SUCCESS) {
1427 sendfileData.put(new Long(data.socket), data);
1428 sendfileCount++;
1429 } else {
1430 log.warn(sm.getString("endpoint.sendfile.addfail", "" + rv));
1431 // Can't do anything: close the socket right away
1432 Socket.destroy(data.socket);
1433 }
1434 }
1435 addS.clear();
1436 }
1437 }
1438 // Pool for the specified interval
1439 int rv = Poll.poll(sendfilePollset, pollTime, desc, false);
1440 if (rv > 0) {
1441 for (int n = 0; n < rv; n++) {
1442 // Get the sendfile state
1443 SendfileData state =
1444 (SendfileData) sendfileData.get(new Long(desc[n*2+1]));
1445 // Problem events
1446 if (((desc[n*2] & Poll.APR_POLLHUP) == Poll.APR_POLLHUP)
1447 || ((desc[n*2] & Poll.APR_POLLERR) == Poll.APR_POLLERR)) {
1448 // Close socket and clear pool
1449 remove(state);
1450 // Destroy file descriptor pool, which should close the file
1451 // Close the socket, as the reponse would be incomplete
1452 Socket.destroy(state.socket);
1453 continue;
1454 }
1455 // Write some data using sendfile
1456 long nw = Socket.sendfilen(state.socket, state.fd,
1457 state.pos,
1458 state.end - state.pos, 0);
1459 if (nw < 0) {
1460 // Close socket and clear pool
1461 remove(state);
1462 // Close the socket, as the reponse would be incomplete
1463 // This will close the file too.
1464 Socket.destroy(state.socket);
1465 continue;
1466 }
1467
1468 state.pos = state.pos + nw;
1469 if (state.pos >= state.end) {
1470 remove(state);
1471 if (state.keepAlive) {
1472 // Destroy file descriptor pool, which should close the file
1473 Pool.destroy(state.fdpool);
1474 Socket.timeoutSet(state.socket, soTimeout * 1000);
1475 // If all done hand this socket off to a worker for
1476 // processing of further requests
1477 getWorkerThread().assign(state.socket);
1478 } else {
1479 // Close the socket since this is
1480 // the end of not keep-alive request.
1481 Socket.destroy(state.socket);
1482 }
1483 }
1484 }
1485 } else if (rv < 0) {
1486 /* Any non timeup error is critical */
1487 if (-rv == Status.TIMEUP)
1488 rv = 0;
1489 else {
1490 log.error(sm.getString("endpoint.poll.fail", Error.strerror(-rv)));
1491 // Handle poll critical failure
1492 synchronized (this) {
1493 destroy();
1494 init();
1495 }
1496 }
1497 }
1498 /* TODO: See if we need to call the maintain for sendfile poller */
1499 } catch (Throwable t) {
1500 log.error(sm.getString("endpoint.poll.error"), t);
1501 }
1502 }
1503
1504 // Notify the threadStop() method that we have shut ourselves down
1505 synchronized (threadSync) {
1506 threadSync.notifyAll();
1507 }
1508
1509 }
1510
1511 }
1512
1513
1514 // -------------------------------------- ConnectionHandler Inner Interface
1515
1516
1517 /**
1518 * Bare bones interface used for socket processing. Per thread data is to be
1519 * stored in the ThreadWithAttributes extra folders, or alternately in
1520 * thread local fields.
1521 */
1522 public interface Handler {
1523 public boolean process(long socket);
1524 }
1525
1526
1527 }