1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.resource.connectionmanager;
23
24 import java.util.ArrayList;
25 import java.util.Collections;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.Set;
29
30 import javax.resource.ResourceException;
31 import javax.resource.spi.ConnectionRequestInfo;
32 import javax.resource.spi.ManagedConnection;
33 import javax.resource.spi.ManagedConnectionFactory;
34 import javax.resource.spi.ValidatingManagedConnectionFactory;
35 import javax.security.auth.Subject;
36
37 import org.jboss.logging.Logger;
38 import org.jboss.resource.JBossResourceException;
39 import org.jboss.util.UnreachableStatementException;
40
41 import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
42 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
43
44 /**
45 * The internal pool implementation
46 *
47 * @author <a href="mailto:d_jencks@users.sourceforge.net">David Jencks</a>
48 * @author <a href="mailto:adrian@jboss.org">Adrian Brock</a>
49 * @author <a href="mailto:weston.price@jboss.com">Weston Price</a>
50 * @version $Revision: 71846 $
51 */
52 public class InternalManagedConnectionPool implements IdleConnectionRemovalSupport
53 {
54 /** The managed connection factory */
55 private final ManagedConnectionFactory mcf;
56
57 /** The connection listener factory */
58 private final ConnectionListenerFactory clf;
59
60 /** The default subject */
61 private final Subject defaultSubject;
62
63 /** The default connection request information */
64 private final ConnectionRequestInfo defaultCri;
65
66 /** The pooling parameters */
67 private final PoolParams poolParams;
68
69 /** Copy of the maximum size from the pooling parameters.
70 * Dynamic changes to this value are not compatible with
71 * the semaphore which cannot change be dynamically changed.
72 */
73 private int maxSize;
74
75 /** The available connection event listeners */
76 private ArrayList cls;
77
78 /** The permits used to control who can checkout a connection */
79 private final FIFOSemaphore permits;
80
81 /** The log */
82 private final Logger log;
83
84 /** Whether trace is enabled */
85 private final boolean trace;
86
87 /** Stats */
88 private final Counter connectionCounter = new Counter();
89
90 /** The checked out connections */
91 private final HashSet checkedOut = new HashSet();
92
93 /** Whether the pool has been started */
94 private boolean started = false;
95
96 /** Whether the pool has been shutdown */
97 private SynchronizedBoolean shutdown = new SynchronizedBoolean(false);
98
99 /** the max connections ever checked out **/
100 private volatile int maxUsedConnections = 0;
101
102 /**
103 * Create a new internal pool
104 *
105 * @param mcf the managed connection factory
106 * @param subject the subject
107 * @param cri the connection request information
108 * @param poolParams the pooling parameters
109 * @param log the log
110 */
111 protected InternalManagedConnectionPool(ManagedConnectionFactory mcf, ConnectionListenerFactory clf, Subject subject,
112 ConnectionRequestInfo cri, PoolParams poolParams, Logger log)
113 {
114 this.mcf = mcf;
115 this.clf = clf;
116 defaultSubject = subject;
117 defaultCri = cri;
118 this.poolParams = poolParams;
119 this.maxSize = poolParams.maxSize;
120 this.log = log;
121 this.trace = log.isTraceEnabled();
122 cls = new ArrayList(this.maxSize);
123 permits = new FIFOSemaphore(this.maxSize);
124
125 if(poolParams.prefill){
126
127 PoolFiller.fillPool(this);
128
129 }
130 }
131
132 /**
133 * Initialize the pool
134 */
135 protected void initialize()
136 {
137 if (poolParams.idleTimeout != 0)
138 IdleRemover.registerPool(this, poolParams.idleTimeout);
139
140 if (poolParams.backgroundInterval > 0)
141 {
142
143 log.debug("Registering for background validation at interval " + poolParams.backgroundInterval);
144 ConnectionValidator.registerPool(this, poolParams.backgroundInterval);
145
146 }
147 }
148
149 public long getAvailableConnections()
150 {
151 return permits.permits();
152 }
153
154 public int getMaxConnectionsInUseCount()
155 {
156 return maxUsedConnections;
157 }
158
159 public int getConnectionInUseCount()
160 {
161 return checkedOut.size();
162 }
163
164 /**
165 * todo distinguish between connection dying while match called
166 * and bad match strategy. In latter case we should put it back in
167 * the pool.
168 */
169 public ConnectionListener getConnection(Subject subject, ConnectionRequestInfo cri) throws ResourceException
170 {
171 subject = (subject == null) ? defaultSubject : subject;
172 cri = (cri == null) ? defaultCri : cri;
173 long startWait = System.currentTimeMillis();
174 try
175 {
176 connectionCounter.updateBlockTime(System.currentTimeMillis() - startWait);
177
178 if (permits.attempt(poolParams.blockingTimeout))
179 {
180 long poolBlockTime = System.currentTimeMillis() - startWait ;
181 connectionCounter.updateBlockTime(poolBlockTime);
182
183 //We have a permit to get a connection. Is there one in the pool already?
184 ConnectionListener cl = null;
185 do
186 {
187 synchronized (cls)
188 {
189 if (shutdown.get())
190 {
191 permits.release();
192 throw new ResourceException("The pool has been shutdown");
193 }
194
195 if (cls.size() > 0)
196 {
197 cl = (ConnectionListener) cls.remove(cls.size() - 1);
198 checkedOut.add(cl);
199 int size = (int) (maxSize - permits.permits());
200 if (size > maxUsedConnections)
201 maxUsedConnections = size;
202 }
203 }
204 if (cl != null)
205 {
206 //Yes, we retrieved a ManagedConnection from the pool. Does it match?
207 try
208 {
209 Object matchedMC = mcf.matchManagedConnections(Collections.singleton(cl.getManagedConnection()),
210 subject, cri);
211 if (matchedMC != null)
212 {
213 if (trace)
214 log.trace("supplying ManagedConnection from pool: " + cl);
215 cl.grantPermit(true);
216 return cl;
217 }
218
219 //Match did not succeed but no exception was thrown.
220 //Either we have the matching strategy wrong or the
221 //connection died while being checked. We need to
222 //distinguish these cases, but for now we always
223 //destroy the connection.
224 log.warn("Destroying connection that could not be successfully matched: " + cl);
225 synchronized (cls)
226 {
227 checkedOut.remove(cl);
228 }
229 doDestroy(cl);
230 cl = null;
231 }
232 catch (Throwable t)
233 {
234 log.warn("Throwable while trying to match ManagedConnection, destroying connection: " + cl, t);
235 synchronized (cls)
236 {
237 checkedOut.remove(cl);
238 }
239 doDestroy(cl);
240 cl = null;
241 }
242 //We made it here, something went wrong and we should validate if we should continue attempting to acquire a connection
243 if(poolParams.useFastFail)
244 {
245 log.trace("Fast failing for connection attempt. No more attempts will be made to acquire connection from pool and a new connection will be created immeadiately");
246 break;
247 }
248
249 }
250 }
251 while (cls.size() > 0);//end of do loop
252
253 //OK, we couldnt find a working connection from the pool. Make a new one.
254 try
255 {
256 //No, the pool was empty, so we have to make a new one.
257 cl = createConnectionEventListener(subject, cri);
258 synchronized (cls)
259 {
260 checkedOut.add(cl);
261 int size = (int) (maxSize - permits.permits());
262 if (size > maxUsedConnections)
263 maxUsedConnections = size;
264 }
265
266 //lack of synch on "started" probably ok, if 2 reads occur we will just
267 //run fillPool twice, no harm done.
268 if (started == false)
269 {
270 started = true;
271 if (poolParams.minSize > 0)
272 PoolFiller.fillPool(this);
273 }
274 if (trace)
275 log.trace("supplying new ManagedConnection: " + cl);
276 cl.grantPermit(true);
277 return cl;
278 }
279 catch (Throwable t)
280 {
281 log.warn("Throwable while attempting to get a new connection: " + cl, t);
282 //return permit and rethrow
283 synchronized (cls)
284 {
285 checkedOut.remove(cl);
286 }
287 permits.release();
288 JBossResourceException.rethrowAsResourceException("Unexpected throwable while trying to create a connection: " + cl, t);
289 throw new UnreachableStatementException();
290 }
291 }
292 else
293 {
294 // we timed out
295 throw new ResourceException("No ManagedConnections available within configured blocking timeout ( "
296 + poolParams.blockingTimeout + " [ms] )");
297 }
298
299 }
300 catch (InterruptedException ie)
301 {
302 long end = System.currentTimeMillis() - startWait;
303 connectionCounter.updateBlockTime(end);
304 throw new ResourceException("Interrupted while requesting permit! Waited " + end + " ms");
305 }
306 }
307
308 public void returnConnection(ConnectionListener cl, boolean kill)
309 {
310 synchronized (cls)
311 {
312 if (cl.getState() == ConnectionListener.DESTROYED)
313 {
314 if (trace)
315 log.trace("ManagedConnection is being returned after it was destroyed" + cl);
316 if (cl.hasPermit())
317 {
318 // release semaphore
319 cl.grantPermit(false);
320 permits.release();
321 }
322
323 return;
324 }
325 }
326
327 if (trace)
328 log.trace("putting ManagedConnection back into pool kill=" + kill + " cl=" + cl);
329 try
330 {
331 cl.getManagedConnection().cleanup();
332 }
333 catch (ResourceException re)
334 {
335 log.warn("ResourceException cleaning up ManagedConnection: " + cl, re);
336 kill = true;
337 }
338
339 synchronized (cls)
340 {
341 // We need to destroy this one
342 if (cl.getState() == ConnectionListener.DESTROY || cl.getState() == ConnectionListener.DESTROYED)
343 kill = true;
344 checkedOut.remove(cl);
345
346 // This is really an error
347 if (kill == false && cls.size() >= poolParams.maxSize)
348 {
349 log.warn("Destroying returned connection, maximum pool size exceeded " + cl);
350 kill = true;
351 }
352
353 // If we are destroying, check the connection is not in the pool
354 if (kill)
355 {
356 // Adrian Brock: A resource adapter can asynchronously notify us that
357 // a connection error occurred.
358 // This could happen while the connection is not checked out.
359 // e.g. JMS can do this via an ExceptionListener on the connection.
360 // I have twice had to reinstate this line of code, PLEASE DO NOT REMOVE IT!
361 cls.remove(cl);
362 }
363 // return to the pool
364 else
365 {
366 cl.used();
367 if (cls.contains(cl) == false)
368 cls.add(cl);
369 else
370 log.warn("Attempt to return connection twice (ignored): " + cl, new Throwable("STACKTRACE"));
371 }
372
373 if (cl.hasPermit())
374 {
375 // release semaphore
376 cl.grantPermit(false);
377 permits.release();
378 }
379 }
380
381 if (kill)
382 {
383 if (trace)
384 log.trace("Destroying returned connection " + cl);
385 doDestroy(cl);
386 }
387
388 }
389
390 public void flush()
391 {
392 ArrayList destroy = null;
393 synchronized (cls)
394 {
395 if (trace)
396 log.trace("Flushing pool checkedOut=" + checkedOut + " inPool=" + cls);
397
398 // Mark checked out connections as requiring destruction
399 for (Iterator i = checkedOut.iterator(); i.hasNext();)
400 {
401 ConnectionListener cl = (ConnectionListener) i.next();
402 if (trace)
403 log.trace("Flush marking checked out connection for destruction " + cl);
404 cl.setState(ConnectionListener.DESTROY);
405 }
406 // Destroy connections in the pool
407 while (cls.size() > 0)
408 {
409 ConnectionListener cl = (ConnectionListener) cls.remove(0);
410 if (destroy == null)
411 destroy = new ArrayList();
412 destroy.add(cl);
413 }
414 }
415
416 // We need to destroy some connections
417 if (destroy != null)
418 {
419 for (int i = 0; i < destroy.size(); ++i)
420 {
421 ConnectionListener cl = (ConnectionListener) destroy.get(i);
422 if (trace)
423 log.trace("Destroying flushed connection " + cl);
424 doDestroy(cl);
425 }
426
427 // We destroyed something, check the minimum.
428 if (shutdown.get() == false && poolParams.minSize > 0)
429 PoolFiller.fillPool(this);
430 }
431 }
432
433 public void removeIdleConnections()
434 {
435 ArrayList destroy = null;
436 long timeout = System.currentTimeMillis() - poolParams.idleTimeout;
437 while (true)
438 {
439 synchronized (cls)
440 {
441
442 // Nothing left to destroy
443 if (cls.size() == 0)
444 break;
445
446 // Check the first in the list
447 ConnectionListener cl = (ConnectionListener) cls.get(0);
448 if (cl.isTimedOut(timeout) && shouldRemove())
449 {
450 connectionCounter.incTimedOut();
451 // We need to destroy this one
452 cls.remove(0);
453 if (destroy == null)
454 destroy = new ArrayList();
455 destroy.add(cl);
456 }
457 else
458 {
459 //They were inserted chronologically, so if this one isn't timed out, following ones won't be either.
460 break;
461 }
462 }
463 }
464
465 // We found some connections to destroy
466 if (destroy != null)
467 {
468 for (int i = 0; i < destroy.size(); ++i)
469 {
470 ConnectionListener cl = (ConnectionListener) destroy.get(i);
471 if (trace)
472 log.trace("Destroying timedout connection " + cl);
473 doDestroy(cl);
474 }
475
476 // We destroyed something, check the minimum.
477 if (shutdown.get() == false && poolParams.minSize > 0)
478 PoolFiller.fillPool(this);
479 }
480 }
481
482
483 /**
484 * For testing
485 */
486 public void shutdownWithoutClear()
487 {
488 IdleRemover.unregisterPool(this);
489 IdleRemover.waitForBackgroundThread();
490 ConnectionValidator.unRegisterPool(this);
491 ConnectionValidator.waitForBackgroundThread();
492
493 fillToMin();
494 shutdown.set(true);
495 }
496
497 public void shutdown()
498 {
499 shutdown.set(true);
500 IdleRemover.unregisterPool(this);
501 ConnectionValidator.unRegisterPool(this);
502 flush();
503 }
504
505 public void fillToMin()
506 {
507 while (true)
508 {
509 // Get a permit - avoids a race when the pool is nearly full
510 // Also avoids unnessary fill checking when all connections are checked out
511 try
512 {
513 if (permits.attempt(poolParams.blockingTimeout))
514 {
515 try
516 {
517 if (shutdown.get())
518 return;
519
520 // We already have enough connections
521 if (getMinSize() - connectionCounter.getGuaranteedCount() <= 0)
522 return;
523
524 // Create a connection to fill the pool
525 try
526 {
527 ConnectionListener cl = createConnectionEventListener(defaultSubject, defaultCri);
528 synchronized (cls)
529 {
530 if (trace)
531 log.trace("Filling pool cl=" + cl);
532 cls.add(cl);
533 }
534 }
535 catch (ResourceException re)
536 {
537 log.warn("Unable to fill pool ", re);
538 return;
539 }
540 }
541 finally
542 {
543 permits.release();
544 }
545 }
546 }
547 catch (InterruptedException ignored)
548 {
549 log.trace("Interrupted while requesting permit in fillToMin");
550 }
551 }
552 }
553
554 public int getConnectionCount()
555 {
556 return connectionCounter.getCount();
557 }
558
559 public long getTotalBlockTime()
560 {
561 return connectionCounter.getTotalBlockTime();
562 }
563
564 public int getTimedOut()
565 {
566 return connectionCounter.getTimedOut();
567 }
568
569 public long getAverageBlockTime()
570 {
571 return connectionCounter.getTotalBlockTime() / getConnectionCreatedCount();
572 }
573
574 public long getMaxWaitTime()
575 {
576 return connectionCounter.getMaxWaitTime();
577 }
578
579 public int getConnectionCreatedCount()
580 {
581 return connectionCounter.getCreatedCount();
582 }
583
584 public int getConnectionDestroyedCount()
585 {
586 return connectionCounter.getDestroyedCount();
587 }
588
589 Set getConnectionListeners()
590 {
591 synchronized (cls)
592 {
593 Set result = new HashSet();
594 result.addAll(cls);
595 result.addAll(checkedOut);
596 return result;
597 }
598 }
599
600 /**
601 * Create a connection event listener
602 *
603 * @param subject the subject
604 * @param cri the connection request information
605 * @return the new listener
606 * @throws ResourceException for any error
607 */
608 private ConnectionListener createConnectionEventListener(Subject subject, ConnectionRequestInfo cri)
609 throws ResourceException
610 {
611 ManagedConnection mc = mcf.createManagedConnection(subject, cri);
612 connectionCounter.inc();
613 try
614 {
615 return clf.createConnectionListener(mc, this);
616 }
617 catch (ResourceException re)
618 {
619 connectionCounter.dec();
620 mc.destroy();
621 throw re;
622 }
623 }
624
625 /**
626 * Destroy a connection
627 *
628 * @param cl the connection to destroy
629 */
630 private void doDestroy(ConnectionListener cl)
631 {
632 if (cl.getState() == ConnectionListener.DESTROYED)
633 {
634 log.trace("ManagedConnection is already destroyed " + cl);
635 return;
636 }
637
638 connectionCounter.dec();
639 cl.setState(ConnectionListener.DESTROYED);
640 try
641 {
642 cl.getManagedConnection().destroy();
643 }
644 catch (Throwable t)
645 {
646 log.debug("Exception destroying ManagedConnection " + cl, t);
647 }
648
649 }
650
651 private boolean shouldRemove()
652 {
653 boolean remove = true;
654
655 if(poolParams.stictMin)
656 {
657 remove = cls.size() > poolParams.minSize;
658
659 log.trace("StrictMin is active. Current connection will be removed is " + remove);
660
661 }
662
663 return remove;
664
665 }
666
667 public void validateConnections() throws Exception
668 {
669
670 if (trace)
671 log.trace("Attempting to validate connections for pool " + this);
672
673 if (permits.attempt(poolParams.blockingTimeout))
674 {
675
676 boolean destroyed = false;
677
678 try
679 {
680
681 while (true)
682 {
683
684 ConnectionListener cl = null;
685
686 synchronized (cls)
687 {
688 if (cls.size() == 0)
689 {
690
691 break;
692
693 }
694
695 cl = removeForFrequencyCheck();
696
697 }
698
699 if (cl == null)
700 {
701
702 break;
703 }
704
705 try
706 {
707
708 Set candidateSet = Collections.singleton(cl.getManagedConnection());
709
710 if (mcf instanceof ValidatingManagedConnectionFactory)
711 {
712 ValidatingManagedConnectionFactory vcf = (ValidatingManagedConnectionFactory) mcf;
713 candidateSet = vcf.getInvalidConnections(candidateSet);
714
715 if (candidateSet != null && candidateSet.size() > 0)
716 {
717
718 if (cl.getState() != ConnectionListener.DESTROY)
719 {
720 doDestroy(cl);
721 destroyed = true;
722
723 }
724 }
725
726 }
727 else
728 {
729 log.warn("warning: background validation was specified with a non compliant ManagedConnectionFactory interface.");
730 }
731
732 }
733 finally
734 {
735 if(!destroyed)
736 {
737 synchronized (cls)
738 {
739 returnForFrequencyCheck(cl);
740 }
741
742 }
743
744 }
745
746 }
747
748 }
749 finally
750 {
751 permits.release();
752
753 if (destroyed && shutdown.get() == false && poolParams.minSize > 0)
754 {
755 PoolFiller.fillPool(this);
756 }
757
758 }
759
760 }
761
762 }
763 private ConnectionListener removeForFrequencyCheck()
764 {
765
766 log.debug("Checking for connection within frequency");
767
768 ConnectionListener cl = null;
769
770 for (Iterator iter = cls.iterator(); iter.hasNext();)
771 {
772
773 cl = (ConnectionListener) iter.next();
774 long lastCheck = cl.getLastValidatedTime();
775
776 if ((System.currentTimeMillis() - lastCheck) >= poolParams.backgroundInterval)
777 {
778 cls.remove(cl);
779 break;
780
781 }
782 else
783 {
784 cl = null;
785 }
786
787 }
788
789 return cl;
790 }
791
792 private void returnForFrequencyCheck(ConnectionListener cl)
793 {
794
795 log.debug("Returning for connection within frequency");
796
797 cl.setLastValidatedTime(System.currentTimeMillis());
798 cls.add(cl);
799
800 }
801 /**
802 * Guard against configurations or
803 * dynamic changes that may increase the minimum
804 * beyond the maximum
805 */
806 private int getMinSize()
807 {
808 if (poolParams.minSize > maxSize)
809 return maxSize;
810
811 return poolParams.minSize;
812 }
813
814 public static class PoolParams
815 {
816 public int minSize = 0;
817
818 public int maxSize = 10;
819
820 public int blockingTimeout = 30000; // milliseconds
821
822 public long idleTimeout = 1000 * 60 * 30; // milliseconds, 30 minutes.
823
824 public long backgroundInterval = 0;
825
826 public boolean prefill;
827
828 public boolean stictMin;
829
830 //Do we want to immeadiately break when a connection cannot be matched and not evaluate the rest of the pool?
831 public boolean useFastFail;
832 }
833
834 /**
835 * Stats
836 */
837 private static class Counter
838 {
839 private int created = 0;
840
841 private int destroyed = 0;
842
843 // Total wait time to get Connection from Pool.
844 private long totalBlockTime;
845
846 // Idle timed out Connection Count.
847 private int timedOut;
848
849 // The maximum wait time */
850 private long maxWaitTime;
851
852 synchronized int getGuaranteedCount()
853 {
854 return created - destroyed;
855 }
856
857 int getCount()
858 {
859 return created - destroyed;
860 }
861
862 int getCreatedCount()
863 {
864 return created;
865 }
866
867 int getDestroyedCount()
868 {
869 return destroyed;
870 }
871
872 synchronized void inc()
873 {
874 ++created;
875 }
876
877 synchronized void dec()
878 {
879 ++destroyed;
880 }
881
882 synchronized void updateBlockTime(long latest)
883 {
884 totalBlockTime += latest;
885 if (maxWaitTime < latest)
886 maxWaitTime = latest;
887 }
888
889 long getTotalBlockTime()
890 {
891 return totalBlockTime;
892 }
893
894 int getTimedOut()
895 {
896 return timedOut;
897 }
898
899 synchronized void incTimedOut()
900 {
901 ++timedOut;
902 }
903
904 long getMaxWaitTime()
905 {
906 return maxWaitTime;
907 }
908 }
909 }