Home » apache-openjpa-1.1.0-source » org.apache.openjpa » kernel » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing,
   13    * software distributed under the License is distributed on an
   14    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    * KIND, either express or implied.  See the License for the
   16    * specific language governing permissions and limitations
   17    * under the License.    
   18    */
   19   package org.apache.openjpa.kernel;
   20   
   21   import java.io.ObjectStreamException;
   22   import java.util.ArrayList;
   23   import java.util.Collection;
   24   import java.util.Collections;
   25   import java.util.HashMap;
   26   import java.util.Iterator;
   27   import java.util.LinkedList;
   28   import java.util.List;
   29   import java.util.Map;
   30   import java.util.Properties;
   31   import java.util.Set;
   32   import javax.transaction.Status;
   33   import javax.transaction.Synchronization;
   34   import javax.transaction.Transaction;
   35   import javax.transaction.TransactionManager;
   36   
   37   import org.apache.commons.lang.StringUtils;
   38   import org.apache.commons.collections.set.MapBackedSet;
   39   import org.apache.openjpa.conf.OpenJPAConfiguration;
   40   import org.apache.openjpa.conf.OpenJPAVersion;
   41   import org.apache.openjpa.conf.BrokerValue;
   42   import org.apache.openjpa.conf.OpenJPAConfigurationImpl;
   43   import org.apache.openjpa.datacache.DataCacheStoreManager;
   44   import org.apache.openjpa.ee.ManagedRuntime;
   45   import org.apache.openjpa.enhance.PCRegistry;
   46   import org.apache.openjpa.enhance.PersistenceCapable;
   47   import org.apache.openjpa.enhance.ManagedClassSubclasser;
   48   import org.apache.openjpa.event.BrokerFactoryEvent;
   49   import org.apache.openjpa.event.RemoteCommitEventManager;
   50   import org.apache.openjpa.lib.conf.Configuration;
   51   import org.apache.openjpa.lib.conf.Configurations;
   52   import org.apache.openjpa.lib.log.Log;
   53   import org.apache.openjpa.lib.util.J2DoPrivHelper;
   54   import org.apache.openjpa.lib.util.Localizer;
   55   import java.util.concurrent.ConcurrentHashMap;
   56   import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet;
   57   import java.util.concurrent.locks.ReentrantLock;
   58   import org.apache.openjpa.meta.MetaDataRepository;
   59   import org.apache.openjpa.util.GeneralException;
   60   import org.apache.openjpa.util.InvalidStateException;
   61   import org.apache.openjpa.util.OpenJPAException;
   62   import org.apache.openjpa.util.UserException;
   63   
   64   /**
   65    * Abstract implementation of the {@link BrokerFactory}
   66    * that must be subclassed for a specific runtime.
   67    *
   68    * @author Abe White
   69    */
   70   public abstract class AbstractBrokerFactory
   71       implements BrokerFactory {
   72   
   73       private static final Localizer _loc = Localizer.forPackage
   74           (AbstractBrokerFactory.class);
   75   
   76       // static mapping of configurations to pooled broker factories
   77       private static final Map _pool = Collections.synchronizedMap(new HashMap());
   78   
   79       // configuration
   80       private final OpenJPAConfiguration _conf;
   81       private transient boolean _readOnly = false;
   82       private transient boolean _closed = false;
   83       private transient RuntimeException _closedException = null;
   84       private Map _userObjects = null;
   85   
   86       // internal lock: spec forbids synchronization on this object
   87       private final ReentrantLock _lock = new ReentrantLock();
   88   
   89       // maps global transactions to associated brokers
   90       private transient ConcurrentHashMap _transactional
   91           = new ConcurrentHashMap();
   92   
   93       // weak-ref tracking of open brokers
   94       private transient Set _brokers;
   95   
   96       // cache the class names loaded from the persistent classes property so
   97       // that we can re-load them for each new broker
   98       private transient Collection _pcClassNames = null;
   99       private transient Collection _pcClassLoaders = null;
  100       private transient boolean _persistentTypesLoaded = false;
  101   
  102       // lifecycle listeners to pass to each broker
  103       private transient Map _lifecycleListeners = null;
  104   
  105       // transaction listeners to pass to each broker
  106       private transient List _transactionListeners = null;
  107   
  108       // key under which this instance can be stored in the broker pool
  109       // and later identified
  110       private Object _poolKey;
  111   
  112       /**
  113        * Return an internal factory pool key for the given configuration.
  114        *
  115        * @since 1.1.0
  116        */
  117       protected static Object toPoolKey(Map map) {
  118           Object key = Configurations.getProperty("Id", map);
  119           return ( key != null) ? key : map;
  120       }
  121   
  122       /**
  123        * Register <code>factory</code> in the pool under <code>key</code>.
  124        *
  125        * @since 1.1.0
  126        */
  127       protected static void pool(Object key, AbstractBrokerFactory factory) {
  128           synchronized(_pool) {
  129               _pool.put(key, factory);
  130               factory.setPoolKey(key);
  131               factory.makeReadOnly();
  132           }
  133       }
  134   
  135       /**
  136        * Return the pooled factory matching the given key, or null
  137        * if none. The key must be of the form created by {@link #getPoolKey}.
  138        */
  139       public static AbstractBrokerFactory getPooledFactoryForKey(Object key) {
  140           return (AbstractBrokerFactory) _pool.get(key);
  141       }
  142   
  143       /**
  144        * Constructor. Configuration must be provided on construction.
  145        */
  146       protected AbstractBrokerFactory(OpenJPAConfiguration config) {
  147           _conf = config;
  148           _brokers = newBrokerSet();
  149           getPcClassLoaders();
  150       }
  151   
  152       /**
  153        * Return the configuration for this factory.
  154        */
  155       public OpenJPAConfiguration getConfiguration() {
  156           return _conf;
  157       }
  158   
  159       public Broker newBroker() {
  160           return newBroker(_conf.getConnectionUserName(),
  161               _conf.getConnectionPassword());
  162       }
  163   
  164       public Broker newBroker(String user, String pass) {
  165           return newBroker(user, pass, _conf.isTransactionModeManaged(),
  166               _conf.getConnectionRetainModeConstant());
  167       }
  168   
  169       public Broker newBroker(boolean managed, int connRetainMode) {
  170           return newBroker(_conf.getConnectionUserName(),
  171               _conf.getConnectionPassword(), managed, connRetainMode);
  172       }
  173   
  174       public Broker newBroker(String user, String pass, boolean managed,
  175           int connRetainMode) {
  176           return newBroker(user, pass, managed, connRetainMode, true);
  177       }
  178   
  179       public Broker newBroker(String user, String pass, boolean managed,
  180           int connRetainMode, boolean findExisting) {
  181           try {
  182               assertOpen();
  183               makeReadOnly();
  184   
  185               BrokerImpl broker = null;
  186               if (findExisting)
  187                   broker = findBroker(user, pass, managed);
  188               if (broker == null) {
  189                   broker = newBrokerImpl(user, pass);
  190                   initializeBroker(managed, connRetainMode, broker, false);
  191               }
  192               return broker;
  193           } catch (OpenJPAException ke) {
  194               throw ke;
  195           } catch (RuntimeException re) {
  196               throw new GeneralException(re);
  197           }
  198       }
  199   
  200       void initializeBroker(boolean managed, int connRetainMode,
  201           BrokerImpl broker, boolean fromDeserialization) {
  202           assertOpen();
  203           makeReadOnly();
  204   
  205           // decorate the store manager for data caching and custom
  206           // result object providers; always make sure it's a delegating
  207           // store manager, because it's easier for users to deal with
  208           // that way
  209           StoreManager sm = newStoreManager();
  210           DelegatingStoreManager dsm = null;
  211           if (_conf.getDataCacheManagerInstance().getSystemDataCache()
  212               != null)
  213               dsm = new DataCacheStoreManager(sm);
  214           dsm = new ROPStoreManager((dsm == null) ? sm : dsm);
  215   
  216           broker.initialize(this, dsm, managed, connRetainMode,
  217               fromDeserialization);
  218           if (!fromDeserialization)
  219               addListeners(broker);
  220   
  221           // if we're using remote events, register the event manager so
  222           // that it can broadcast commit notifications from the broker
  223           RemoteCommitEventManager remote = _conf.
  224               getRemoteCommitEventManager();
  225           if (remote.areRemoteEventsEnabled())
  226               broker.addTransactionListener(remote);
  227   
  228           loadPersistentTypes(broker.getClassLoader());
  229           _brokers.add(broker);
  230           _conf.setReadOnly(Configuration.INIT_STATE_FROZEN);
  231       }
  232   
  233       /**
  234        * Add factory-registered lifecycle listeners to the broker.
  235        */
  236       protected void addListeners(BrokerImpl broker) {
  237           if (_lifecycleListeners != null && !_lifecycleListeners.isEmpty()) {
  238               Map.Entry entry;
  239               for (Iterator itr = _lifecycleListeners.entrySet().iterator();
  240                   itr.hasNext();) {
  241                   entry = (Map.Entry) itr.next();
  242                   broker.addLifecycleListener(entry.getKey(), (Class[])
  243                       entry.getValue());
  244               }
  245           }
  246   
  247           if (_transactionListeners != null && !_transactionListeners.isEmpty()) {
  248               for (Iterator itr = _transactionListeners.iterator();
  249                   itr.hasNext(); ) {
  250                   broker.addTransactionListener(itr.next());
  251               }
  252           }
  253       }
  254   
  255       /**
  256        * Load the configured persistent classes list. Performed automatically
  257        * whenever a broker is created.
  258        */
  259       private void loadPersistentTypes(ClassLoader envLoader) {
  260           // if we've loaded the persistent types and the class name list
  261           // is empty, then we can simply return. Note that there is a
  262           // potential threading scenario in which _persistentTypesLoaded is
  263           // false when read, but the work to populate _pcClassNames has
  264           // already been done. This is ok; _pcClassNames can tolerate
  265           // concurrent access, so the worst case is that the list is
  266           // persistent type data is processed multiple times, which this
  267           // algorithm takes into account.
  268           if (_persistentTypesLoaded && _pcClassNames.isEmpty())
  269               return;
  270   
  271           // cache persistent type names if not already
  272           ClassLoader loader = _conf.getClassResolverInstance().
  273               getClassLoader(getClass(), envLoader);
  274           Collection toRedefine = new ArrayList();
  275           if (!_persistentTypesLoaded) {
  276               Collection clss = _conf.getMetaDataRepositoryInstance().
  277                   loadPersistentTypes(false, loader);
  278               if (clss.isEmpty())
  279                   _pcClassNames = Collections.EMPTY_SET;
  280               else {
  281                   Collection c = new ArrayList(clss.size());
  282                   for (Iterator itr = clss.iterator(); itr.hasNext();) {
  283                       Class cls = (Class) itr.next();
  284                       c.add(cls.getName());
  285                       if (needsSub(cls))
  286                           toRedefine.add(cls);
  287                   }
  288                   getPcClassLoaders().add(loader);
  289                   _pcClassNames = c;
  290               }
  291               _persistentTypesLoaded = true;
  292           } else {
  293               // reload with this loader
  294               if (getPcClassLoaders().add(loader)) {
  295                   for (Iterator itr = _pcClassNames.iterator(); itr.hasNext();) {
  296                       try {
  297                           Class cls =
  298                               Class.forName((String) itr.next(), true, loader);
  299                           if (needsSub(cls))
  300                               toRedefine.add(cls);
  301                       } catch (Throwable t) {
  302                           _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME)
  303                               .warn(null, t);
  304                       }
  305                   }
  306               }
  307           }
  308   
  309           // get the ManagedClassSubclasser into the loop
  310           ManagedClassSubclasser.prepareUnenhancedClasses(
  311               _conf, toRedefine, envLoader);
  312       }
  313   
  314       private boolean needsSub(Class cls) {
  315           return !cls.isInterface()
  316               && !PersistenceCapable.class.isAssignableFrom(cls);
  317       }
  318   
  319       public void addLifecycleListener(Object listener, Class[] classes) {
  320           lock();
  321           try {
  322               assertOpen();
  323               if (_lifecycleListeners == null)
  324                   _lifecycleListeners = new HashMap(7);
  325               _lifecycleListeners.put(listener, classes);
  326           } finally {
  327               unlock();
  328           }
  329       }
  330   
  331       public void removeLifecycleListener(Object listener) {
  332           lock();
  333           try {
  334               assertOpen();
  335               if (_lifecycleListeners != null)
  336                   _lifecycleListeners.remove(listener);
  337           } finally {
  338               unlock();
  339           }
  340       }
  341   
  342       public void addTransactionListener(Object listener) {
  343           lock();
  344           try {
  345               assertOpen();
  346               if (_transactionListeners == null)
  347                   _transactionListeners = new LinkedList();
  348               _transactionListeners.add(listener);
  349           } finally {
  350               unlock();
  351           }
  352       }
  353   
  354       public void removeTransactionListener(Object listener) {
  355           lock();
  356           try {
  357               assertOpen();
  358               if (_transactionListeners != null)
  359                   _transactionListeners.remove(listener);
  360           } finally {
  361               unlock();
  362           }
  363       }
  364   
  365       /**
  366        * Returns true if this broker factory is closed.
  367        */
  368       public boolean isClosed() {
  369           return _closed;
  370       }
  371   
  372       public void close() {
  373           lock();
  374           try {
  375               assertOpen();
  376               assertNoActiveTransaction();
  377   
  378               // remove from factory pool
  379               synchronized (_pool) {
  380                   if (_pool.get(_poolKey) == this)
  381                       _pool.remove(_poolKey);
  382               }
  383   
  384               // close all brokers
  385               Broker broker;
  386               for (Iterator itr = _brokers.iterator(); itr.hasNext();) {
  387                   broker = (Broker) itr.next();
  388                   // Check for null because _brokers may contain weak references
  389                   if ((broker != null) && (!broker.isClosed()))
  390                       broker.close();
  391               }
  392   
  393               if(_conf.metaDataRepositoryAvailable()) {
  394                   // remove metadata repository from listener list
  395                   PCRegistry.removeRegisterClassListener
  396                       (_conf.getMetaDataRepositoryInstance());
  397               }
  398   
  399               _conf.close();
  400               _closed = true;
  401               Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
  402               if (log.isTraceEnabled())
  403                   _closedException = new IllegalStateException();
  404           } finally {
  405               unlock();
  406           }
  407       }
  408   
  409       /**
  410        * Subclasses should override this method to add a <code>Platform</code>
  411        * property listing the runtime platform, such as:
  412        * <code>OpenJPA JDBC Edition: Oracle Database</code>
  413        */
  414       public Properties getProperties() {
  415           // required props are VendorName and VersionNumber
  416           Properties props = new Properties();
  417           props.setProperty("VendorName", OpenJPAVersion.VENDOR_NAME);
  418           props.setProperty("VersionNumber", OpenJPAVersion.VERSION_NUMBER);
  419           props.setProperty("VersionId", OpenJPAVersion.VERSION_ID);
  420           return props;
  421       }
  422   
  423       public Object getUserObject(Object key) {
  424           lock();
  425           try {
  426               assertOpen();
  427               return (_userObjects == null) ? null : _userObjects.get(key);
  428           } finally {
  429               unlock();
  430           }
  431       }
  432   
  433       public Object putUserObject(Object key, Object val) {
  434           lock();
  435           try {
  436               assertOpen();
  437               if (val == null)
  438                   return (_userObjects == null) ? null : _userObjects.remove(key);
  439   
  440               if (_userObjects == null)
  441                   _userObjects = new HashMap();
  442               return _userObjects.put(key, val);
  443           } finally {
  444               unlock();
  445           }
  446       }
  447   
  448       public void lock() {
  449           _lock.lock();
  450       }
  451   
  452       public void unlock() {
  453           _lock.unlock();
  454       }
  455   
  456       /**
  457        * Replaces the factory with this JVMs pooled version if it exists. Also
  458        * freezes the factory.
  459        */
  460       protected Object readResolve()
  461           throws ObjectStreamException {
  462           AbstractBrokerFactory factory = getPooledFactoryForKey(_poolKey);
  463           if (factory != null)
  464               return factory;
  465   
  466           // reset these transient fields to empty values
  467           _transactional = new ConcurrentHashMap();
  468           _brokers = newBrokerSet();
  469   
  470           makeReadOnly();
  471           return this;
  472       }
  473   
  474       private Set newBrokerSet() {
  475           BrokerValue bv;
  476           if (_conf instanceof OpenJPAConfigurationImpl)
  477               bv = ((OpenJPAConfigurationImpl) _conf).brokerPlugin;
  478           else
  479               bv = (BrokerValue) _conf.getValue(BrokerValue.KEY);
  480   
  481           if (FinalizingBrokerImpl.class.isAssignableFrom(
  482               bv.getTemplateBrokerType(_conf))) {
  483               return MapBackedSet.decorate(new ConcurrentHashMap(),
  484                   new Object() { });
  485           } else {
  486               return new ConcurrentReferenceHashSet(
  487                   ConcurrentReferenceHashSet.WEAK);
  488           }
  489       }
  490   
  491       ////////////////////////
  492       // Methods for Override
  493       ////////////////////////
  494   
  495       /**
  496        * Return a new StoreManager for this runtime. Note that the instance
  497        * returned here may be wrapped before being passed to the
  498        * {@link #newBroker} method.
  499        */
  500       protected abstract StoreManager newStoreManager();
  501   
  502       /**
  503        * Find a pooled broker, or return null if none. If using
  504        * managed transactions, looks for a transactional broker;
  505        * otherwise returns null by default. This method will be called before
  506        * {@link #newStoreManager} so that factory subclasses implementing
  507        * pooling can return a matching manager before a new {@link StoreManager}
  508        * is created.
  509        */
  510       protected BrokerImpl findBroker(String user, String pass, boolean managed) {
  511           if (managed)
  512               return findTransactionalBroker(user, pass);
  513           return null;
  514       }
  515   
  516       /**
  517        * Return a broker configured with the proper settings.
  518        * By default, this method constructs a new
  519        * BrokerImpl of the class set for this factory.
  520        */
  521       protected BrokerImpl newBrokerImpl(String user, String pass) {
  522           BrokerImpl broker = _conf.newBrokerInstance(user, pass);
  523           if (broker == null)
  524               throw new UserException(_loc.get("no-broker-class",
  525                   _conf.getBrokerImpl()));
  526   
  527           return broker;
  528       }
  529   
  530       /**
  531        * Setup transient state used by this factory based on the
  532        * current configuration, which will subsequently be locked down. This
  533        * method will be called before the first broker is requested,
  534        * and will be re-called each time the factory is deserialized into a JVM
  535        * that has no configuration for this data store.
  536        */
  537       protected void setup() {
  538       }
  539   
  540       /////////////
  541       // Utilities
  542       /////////////
  543   
  544       /**
  545        * Find a managed runtime broker associated with the
  546        * current transaction, or returns null if none.
  547        */
  548       protected BrokerImpl findTransactionalBroker(String user, String pass) {
  549           Transaction trans;
  550           ManagedRuntime mr = _conf.getManagedRuntimeInstance();
  551           Object txKey;
  552           try {
  553               trans = mr.getTransactionManager().
  554                   getTransaction();
  555               txKey = mr.getTransactionKey();
  556   
  557               if (trans == null
  558                   || trans.getStatus() == Status.STATUS_NO_TRANSACTION
  559                   || trans.getStatus() == Status.STATUS_UNKNOWN)
  560                   return null;
  561           } catch (OpenJPAException ke) {
  562               throw ke;
  563           } catch (Exception e) {
  564               throw new GeneralException(e);
  565           }
  566   
  567           Collection brokers = (Collection) _transactional.get(txKey);
  568           if (brokers != null) {
  569               // we don't need to synchronize on brokers since one JTA transaction
  570               // can never be active on multiple concurrent threads.
  571               BrokerImpl broker;
  572               for (Iterator itr = brokers.iterator(); itr.hasNext();) {
  573                   broker = (BrokerImpl) itr.next();
  574                   if (StringUtils.equals(broker.getConnectionUserName(),
  575                       user) && StringUtils.equals
  576                       (broker.getConnectionPassword(), pass))
  577                       return broker;
  578               }
  579           }
  580           return null;
  581       }
  582   
  583       /**
  584        * Configures the given broker with the current factory option settings.
  585        */
  586       protected void configureBroker(BrokerImpl broker) {
  587           broker.setOptimistic(_conf.getOptimistic());
  588           broker.setNontransactionalRead(_conf.getNontransactionalRead());
  589           broker.setNontransactionalWrite(_conf.getNontransactionalWrite());
  590           broker.setRetainState(_conf.getRetainState());
  591           broker.setRestoreState(_conf.getRestoreStateConstant());
  592           broker.setAutoClear(_conf.getAutoClearConstant());
  593           broker.setIgnoreChanges(_conf.getIgnoreChanges());
  594           broker.setMultithreaded(_conf.getMultithreaded());
  595           broker.setAutoDetach(_conf.getAutoDetachConstant());
  596           broker.setDetachState(_conf.getDetachStateInstance().
  597               getDetachState());
  598       }
  599   
  600       /**
  601        * Freezes the configuration of this factory.
  602        */
  603       public void makeReadOnly() {
  604           if (_readOnly)
  605               return;
  606   
  607           lock();
  608           try {
  609               // check again
  610               if (_readOnly)
  611                   return;
  612               _readOnly = true;
  613   
  614               Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
  615               if (log.isInfoEnabled())
  616                   log.info(getFactoryInitializationBanner());
  617               if (log.isTraceEnabled()) {
  618                   Map props = _conf.toProperties(true);
  619                   String lineSep = J2DoPrivHelper.getLineSeparator();
  620                   StringBuffer buf = new StringBuffer();
  621                   Map.Entry entry;
  622                   for (Iterator itr = props.entrySet().iterator();
  623                       itr.hasNext();) {
  624                       entry = (Map.Entry) itr.next();
  625                       buf.append(entry.getKey()).append(": ").
  626                           append(entry.getValue());
  627                       if (itr.hasNext())
  628                           buf.append(lineSep);
  629                   }
  630                   log.trace(_loc.get("factory-properties", buf.toString()));
  631               }
  632   
  633               // setup transient state
  634               setup();
  635   
  636               // register the metdata repository to auto-load persistent types
  637               // and make sure types are enhanced
  638               MetaDataRepository repos = _conf.getMetaDataRepositoryInstance();
  639               repos.setValidate(repos.VALIDATE_RUNTIME, true);
  640               repos.setResolve(repos.MODE_MAPPING_INIT, true);
  641               PCRegistry.addRegisterClassListener(repos);
  642   
  643               // freeze underlying configuration and eagerly initialize to
  644               // avoid synchronization
  645               _conf.setReadOnly(Configuration.INIT_STATE_FREEZING);
  646               _conf.instantiateAll();
  647   
  648               // fire an event for all the broker factory listeners
  649               // registered on the configuration.
  650               _conf.getBrokerFactoryEventManager().fireEvent(
  651                   new BrokerFactoryEvent(this,
  652                       BrokerFactoryEvent.BROKER_FACTORY_CREATED));
  653           } finally {
  654               unlock();
  655           }
  656       }
  657   
  658       /**
  659        * Return an object to be written to the log when this broker factory
  660        * initializes. This happens after the configuration is fully loaded.
  661        */
  662       protected Object getFactoryInitializationBanner() {
  663           return _loc.get("factory-init", OpenJPAVersion.VERSION_NUMBER);
  664       }
  665   
  666       /**
  667        * Throw an exception if the factory is closed.  The exact message and
  668        * content of the exception varies whether TRACE is enabled or not.
  669        */
  670       private void assertOpen() {
  671           if (_closed) {
  672               if (_closedException == null)  // TRACE not enabled
  673                   throw new InvalidStateException(_loc
  674                           .get("closed-factory-notrace"));
  675               else
  676                   throw new InvalidStateException(_loc.get("closed-factory"))
  677                           .setCause(_closedException);
  678           }
  679       }
  680   
  681       ////////////////////
  682       // Broker utilities
  683       ////////////////////
  684   
  685       /**
  686        * Throws a {@link UserException} if a transaction is active. The thrown
  687        * exception will contain all the Brokers with active transactions as
  688        * failed objects in the nested exceptions.
  689        */
  690       private void assertNoActiveTransaction() {
  691           Collection excs;
  692           if (_transactional.isEmpty())
  693               return;
  694   
  695           excs = new ArrayList(_transactional.size());
  696           for (Iterator trans = _transactional.values().iterator();
  697               trans.hasNext();) {
  698               Collection brokers = (Collection) trans.next();
  699               for (Iterator itr = brokers.iterator(); itr.hasNext();) {
  700                   excs.add(new InvalidStateException(_loc.get("active")).
  701                       setFailedObject(itr.next()));
  702               }
  703           }
  704   
  705           if (!excs.isEmpty())
  706               throw new InvalidStateException(_loc.get("nested-exceps")).
  707                   setNestedThrowables((Throwable[]) excs.toArray
  708                       (new Throwable[excs.size()]));
  709       }
  710   
  711       /**
  712        * Synchronize the given broker with a managed transaction,
  713        * optionally starting one if none is in progress.
  714        *
  715        * @return true if synched with transaction, false otherwise
  716        */
  717       boolean syncWithManagedTransaction(BrokerImpl broker, boolean begin) {
  718           Transaction trans;
  719           try {
  720               ManagedRuntime mr = broker.getManagedRuntime();
  721               TransactionManager tm = mr.getTransactionManager();
  722               trans = tm.getTransaction();
  723               if (trans != null
  724                   && (trans.getStatus() == Status.STATUS_NO_TRANSACTION
  725                   || trans.getStatus() == Status.STATUS_UNKNOWN))
  726                   trans = null;
  727   
  728               if (trans == null && begin) {
  729                   tm.begin();
  730                   trans = tm.getTransaction();
  731               } else if (trans == null)
  732                   return false;
  733   
  734               // synch broker and trans
  735               trans.registerSynchronization(broker);
  736   
  737               // we don't need to synchronize on brokers or guard against multiple
  738               // threads using the same trans since one JTA transaction can never
  739               // be active on multiple concurrent threads.
  740               Object txKey = mr.getTransactionKey();
  741               Collection brokers = (Collection) _transactional.get(txKey);
  742               
  743               if (brokers == null) {
  744                   brokers = new ArrayList(2);
  745                   _transactional.put(txKey, brokers);
  746                   trans.registerSynchronization(new RemoveTransactionSync(txKey));
  747               }
  748               brokers.add(broker);
  749               
  750               return true;
  751           } catch (OpenJPAException ke) {
  752               throw ke;
  753           } catch (Exception e) {
  754               throw new GeneralException(e);
  755           }
  756       }
  757   
  758       /**
  759        * Returns a set of all the open brokers associated with this factory. The
  760        * returned set is unmodifiable, and may contain null references.
  761        */
  762       public Collection getOpenBrokers() {
  763           return Collections.unmodifiableCollection(_brokers);
  764       }
  765   
  766       /**
  767        * Release <code>broker</code> from any internal data structures. This
  768        * is invoked by <code>broker</code> after the broker is fully closed.
  769        *
  770        * @since 1.1.0
  771        */
  772       protected void releaseBroker(BrokerImpl broker) {
  773           _brokers.remove(broker);
  774       }
  775   
  776       /**
  777        * @return a key that can be used to obtain this broker factory from the
  778        * pool at a later time.
  779        *
  780        * @since 1.1.0
  781        */
  782       public Object getPoolKey() {
  783           return _poolKey;
  784       }
  785   
  786       /**
  787        * Set a key that can be used to obtain this broker factory from the
  788        * pool at a later time.
  789        *
  790        * @since 1.1.0
  791        */
  792       void setPoolKey(Object key) {
  793           _poolKey = key;
  794       }
  795   
  796       /**
  797        * Simple synchronization listener to remove completed transactions
  798        * from our cache.
  799        */
  800       private class RemoveTransactionSync
  801           implements Synchronization {
  802   
  803           private final Object _trans;
  804   
  805           public RemoveTransactionSync(Object trans) {
  806               _trans = trans;
  807           }
  808   
  809           public void beforeCompletion() {
  810           }
  811   
  812           public void afterCompletion(int status) {
  813               _transactional.remove (_trans);
  814   		}
  815   	}
  816       
  817       /**
  818        * Method insures that deserialized EMF has this reference re-instantiated
  819        */
  820       private Collection getPcClassLoaders() {
  821          if (_pcClassLoaders == null)
  822            _pcClassLoaders = new ConcurrentReferenceHashSet(
  823                ConcurrentReferenceHashSet.WEAK);
  824             
  825          return _pcClassLoaders;
  826       }
  827   }

Save This Page
Home » apache-openjpa-1.1.0-source » org.apache.openjpa » kernel » [javadoc | source]