Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » pool » [javadoc | source]
    1   /**
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.activemq.pool;
   18   
   19   import java.util.HashMap;
   20   import java.util.Iterator;
   21   import java.util.LinkedList;
   22   import java.util.Map;
   23   import java.util.concurrent.atomic.AtomicBoolean;
   24   import javax.jms.Connection;
   25   import javax.jms.ConnectionFactory;
   26   import javax.jms.JMSException;
   27   import org.apache.activemq.ActiveMQConnection;
   28   import org.apache.activemq.ActiveMQConnectionFactory;
   29   import org.apache.activemq.Service;
   30   import org.apache.activemq.util.IOExceptionSupport;
   31   import org.apache.commons.logging.Log;
   32   import org.apache.commons.logging.LogFactory;
   33   import org.apache.commons.pool.ObjectPoolFactory;
   34   import org.apache.commons.pool.impl.GenericObjectPoolFactory;
   35   
   36   /**
   37    * A JMS provider which pools Connection, Session and MessageProducer instances
   38    * so it can be used with tools like Spring's <a
   39    * href="http://activemq.org/Spring+Support">JmsTemplate</a>.
   40    * 
   41    * <b>NOTE</b> this implementation is only intended for use when sending
   42    * messages. It does not deal with pooling of consumers; for that look at a
   43    * library like <a href="http://jencks.org/">Jencks</a> such as in <a
   44    * href="http://jencks.org/Message+Driven+POJOs">this example</a>
   45    * 
   46    * @org.apache.xbean.XBean element="pooledConnectionFactory"
   47    * 
   48    * @version $Revision: 1.1 $
   49    */
   50   public class PooledConnectionFactory implements ConnectionFactory, Service {
   51       private static final transient Log LOG = LogFactory.getLog(PooledConnectionFactory.class);
   52       private ConnectionFactory connectionFactory;
   53       private Map<ConnectionKey, LinkedList<ConnectionPool>> cache = new HashMap<ConnectionKey, LinkedList<ConnectionPool>>();
   54       private ObjectPoolFactory poolFactory;
   55       private int maximumActive = 500;
   56       private int maxConnections = 1;
   57       private int idleTimeout = 30 * 1000;
   58       private AtomicBoolean stopped = new AtomicBoolean(false);
   59   
   60       public PooledConnectionFactory() {
   61           this(new ActiveMQConnectionFactory());
   62       }
   63   
   64       public PooledConnectionFactory(String brokerURL) {
   65           this(new ActiveMQConnectionFactory(brokerURL));
   66       }
   67   
   68       public PooledConnectionFactory(ActiveMQConnectionFactory connectionFactory) {
   69           this.connectionFactory = connectionFactory;
   70       }
   71   
   72       public ConnectionFactory getConnectionFactory() {
   73           return connectionFactory;
   74       }
   75   
   76       public void setConnectionFactory(ConnectionFactory connectionFactory) {
   77           this.connectionFactory = connectionFactory;
   78       }
   79   
   80       public Connection createConnection() throws JMSException {
   81           return createConnection(null, null);
   82       }
   83   
   84       public synchronized Connection createConnection(String userName, String password) throws JMSException {
   85           if (stopped.get()) {
   86               LOG.debug("PooledConnectionFactory is stopped, skip create new connection.");
   87               return null;
   88           }
   89           
   90           ConnectionKey key = new ConnectionKey(userName, password);
   91           LinkedList<ConnectionPool> pools = cache.get(key);
   92   
   93           if (pools == null) {
   94               pools = new LinkedList<ConnectionPool>();
   95               cache.put(key, pools);
   96           }
   97   
   98           ConnectionPool connection = null;
   99           if (pools.size() == maxConnections) {
  100               connection = pools.removeFirst();
  101           }
  102   
  103           // Now.. we might get a connection, but it might be that we need to
  104           // dump it..
  105           if (connection != null && connection.expiredCheck()) {
  106               connection = null;
  107           }
  108   
  109           if (connection == null) {
  110               ActiveMQConnection delegate = createConnection(key);
  111               connection = createConnectionPool(delegate);
  112           }
  113           pools.add(connection);
  114           return new PooledConnection(connection);
  115       }
  116   
  117       protected ConnectionPool createConnectionPool(ActiveMQConnection connection) {
  118           ConnectionPool result =  new ConnectionPool(connection, getPoolFactory());
  119           result.setIdleTimeout(getIdleTimeout());
  120           return result;
  121       }
  122   
  123       protected ActiveMQConnection createConnection(ConnectionKey key) throws JMSException {
  124           if (key.getUserName() == null && key.getPassword() == null) {
  125               return (ActiveMQConnection)connectionFactory.createConnection();
  126           } else {
  127               return (ActiveMQConnection)connectionFactory.createConnection(key.getUserName(), key.getPassword());
  128           }
  129       }
  130   
  131       /**
  132        * @see org.apache.activemq.service.Service#start()
  133        */
  134       public void start() {
  135           try {
  136               stopped.set(false);
  137               createConnection();
  138           } catch (JMSException e) {
  139               LOG.warn("Create pooled connection during start failed.", e);
  140               IOExceptionSupport.create(e);
  141           }
  142       }
  143   
  144       public void stop() {
  145           LOG.debug("Stop the PooledConnectionFactory, number of connections in cache: "+cache.size());
  146           stopped.set(true);
  147           for (Iterator<LinkedList<ConnectionPool>> iter = cache.values().iterator(); iter.hasNext();) {
  148               LinkedList list = iter.next();
  149               for (Iterator i = list.iterator(); i.hasNext();) {
  150                   ConnectionPool connection = (ConnectionPool) i.next();
  151                   try {
  152                       connection.close();
  153                   }catch(Exception e) {
  154                       LOG.warn("Close connection failed",e);
  155                   }
  156               }
  157           }
  158           cache.clear();
  159       }
  160   
  161       public ObjectPoolFactory getPoolFactory() {
  162           if (poolFactory == null) {
  163               poolFactory = createPoolFactory();
  164           }
  165           return poolFactory;
  166       }
  167   
  168       /**
  169        * Sets the object pool factory used to create individual session pools for
  170        * each connection
  171        */
  172       public void setPoolFactory(ObjectPoolFactory poolFactory) {
  173           this.poolFactory = poolFactory;
  174       }
  175   
  176       public int getMaximumActive() {
  177           return maximumActive;
  178       }
  179   
  180       /**
  181        * Sets the maximum number of active sessions per connection
  182        */
  183       public void setMaximumActive(int maximumActive) {
  184           this.maximumActive = maximumActive;
  185       }
  186   
  187       /**
  188        * @return the maxConnections
  189        */
  190       public int getMaxConnections() {
  191           return maxConnections;
  192       }
  193   
  194       /**
  195        * @param maxConnections the maxConnections to set
  196        */
  197       public void setMaxConnections(int maxConnections) {
  198           this.maxConnections = maxConnections;
  199       }
  200   
  201       protected ObjectPoolFactory createPoolFactory() {
  202           return new GenericObjectPoolFactory(null, maximumActive);
  203       }
  204   
  205       public int getIdleTimeout() {
  206           return idleTimeout;
  207       }
  208   
  209       public void setIdleTimeout(int idleTimeout) {
  210           this.idleTimeout = idleTimeout;
  211       }
  212   }

Save This Page
Home » activemq-parent-5.3.1-source-release » org.apache » activemq » pool » [javadoc | source]