1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.openjpa.kernel;
20
21 import java.io.ObjectStreamException;
22 import java.util.ArrayList;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.Iterator;
27 import java.util.LinkedList;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Properties;
31 import java.util.Set;
32 import javax.transaction.Status;
33 import javax.transaction.Synchronization;
34 import javax.transaction.Transaction;
35 import javax.transaction.TransactionManager;
36
37 import org.apache.commons.lang.StringUtils;
38 import org.apache.commons.collections.set.MapBackedSet;
39 import org.apache.openjpa.conf.OpenJPAConfiguration;
40 import org.apache.openjpa.conf.OpenJPAVersion;
41 import org.apache.openjpa.conf.BrokerValue;
42 import org.apache.openjpa.conf.OpenJPAConfigurationImpl;
43 import org.apache.openjpa.datacache.DataCacheStoreManager;
44 import org.apache.openjpa.ee.ManagedRuntime;
45 import org.apache.openjpa.enhance.PCRegistry;
46 import org.apache.openjpa.enhance.PersistenceCapable;
47 import org.apache.openjpa.enhance.ManagedClassSubclasser;
48 import org.apache.openjpa.event.BrokerFactoryEvent;
49 import org.apache.openjpa.event.RemoteCommitEventManager;
50 import org.apache.openjpa.lib.conf.Configuration;
51 import org.apache.openjpa.lib.conf.Configurations;
52 import org.apache.openjpa.lib.log.Log;
53 import org.apache.openjpa.lib.util.J2DoPrivHelper;
54 import org.apache.openjpa.lib.util.Localizer;
55 import java.util.concurrent.ConcurrentHashMap;
56 import org.apache.openjpa.lib.util.concurrent.ConcurrentReferenceHashSet;
57 import java.util.concurrent.locks.ReentrantLock;
58 import org.apache.openjpa.meta.MetaDataRepository;
59 import org.apache.openjpa.util.GeneralException;
60 import org.apache.openjpa.util.InvalidStateException;
61 import org.apache.openjpa.util.OpenJPAException;
62 import org.apache.openjpa.util.UserException;
63
64 /**
65 * Abstract implementation of the {@link BrokerFactory}
66 * that must be subclassed for a specific runtime.
67 *
68 * @author Abe White
69 */
70 public abstract class AbstractBrokerFactory
71 implements BrokerFactory {
72
73 private static final Localizer _loc = Localizer.forPackage
74 (AbstractBrokerFactory.class);
75
76 // static mapping of configurations to pooled broker factories
77 private static final Map _pool = Collections.synchronizedMap(new HashMap());
78
79 // configuration
80 private final OpenJPAConfiguration _conf;
81 private transient boolean _readOnly = false;
82 private transient boolean _closed = false;
83 private transient RuntimeException _closedException = null;
84 private Map _userObjects = null;
85
86 // internal lock: spec forbids synchronization on this object
87 private final ReentrantLock _lock = new ReentrantLock();
88
89 // maps global transactions to associated brokers
90 private transient ConcurrentHashMap _transactional
91 = new ConcurrentHashMap();
92
93 // weak-ref tracking of open brokers
94 private transient Set _brokers;
95
96 // cache the class names loaded from the persistent classes property so
97 // that we can re-load them for each new broker
98 private transient Collection _pcClassNames = null;
99 private transient Collection _pcClassLoaders = null;
100 private transient boolean _persistentTypesLoaded = false;
101
102 // lifecycle listeners to pass to each broker
103 private transient Map _lifecycleListeners = null;
104
105 // transaction listeners to pass to each broker
106 private transient List _transactionListeners = null;
107
108 // key under which this instance can be stored in the broker pool
109 // and later identified
110 private Object _poolKey;
111
112 /**
113 * Return an internal factory pool key for the given configuration.
114 *
115 * @since 1.1.0
116 */
117 protected static Object toPoolKey(Map map) {
118 Object key = Configurations.getProperty("Id", map);
119 return ( key != null) ? key : map;
120 }
121
122 /**
123 * Register <code>factory</code> in the pool under <code>key</code>.
124 *
125 * @since 1.1.0
126 */
127 protected static void pool(Object key, AbstractBrokerFactory factory) {
128 synchronized(_pool) {
129 _pool.put(key, factory);
130 factory.setPoolKey(key);
131 factory.makeReadOnly();
132 }
133 }
134
135 /**
136 * Return the pooled factory matching the given key, or null
137 * if none. The key must be of the form created by {@link #getPoolKey}.
138 */
139 public static AbstractBrokerFactory getPooledFactoryForKey(Object key) {
140 return (AbstractBrokerFactory) _pool.get(key);
141 }
142
143 /**
144 * Constructor. Configuration must be provided on construction.
145 */
146 protected AbstractBrokerFactory(OpenJPAConfiguration config) {
147 _conf = config;
148 _brokers = newBrokerSet();
149 getPcClassLoaders();
150 }
151
152 /**
153 * Return the configuration for this factory.
154 */
155 public OpenJPAConfiguration getConfiguration() {
156 return _conf;
157 }
158
159 public Broker newBroker() {
160 return newBroker(_conf.getConnectionUserName(),
161 _conf.getConnectionPassword());
162 }
163
164 public Broker newBroker(String user, String pass) {
165 return newBroker(user, pass, _conf.isTransactionModeManaged(),
166 _conf.getConnectionRetainModeConstant());
167 }
168
169 public Broker newBroker(boolean managed, int connRetainMode) {
170 return newBroker(_conf.getConnectionUserName(),
171 _conf.getConnectionPassword(), managed, connRetainMode);
172 }
173
174 public Broker newBroker(String user, String pass, boolean managed,
175 int connRetainMode) {
176 return newBroker(user, pass, managed, connRetainMode, true);
177 }
178
179 public Broker newBroker(String user, String pass, boolean managed,
180 int connRetainMode, boolean findExisting) {
181 try {
182 assertOpen();
183 makeReadOnly();
184
185 BrokerImpl broker = null;
186 if (findExisting)
187 broker = findBroker(user, pass, managed);
188 if (broker == null) {
189 broker = newBrokerImpl(user, pass);
190 initializeBroker(managed, connRetainMode, broker, false);
191 }
192 return broker;
193 } catch (OpenJPAException ke) {
194 throw ke;
195 } catch (RuntimeException re) {
196 throw new GeneralException(re);
197 }
198 }
199
200 void initializeBroker(boolean managed, int connRetainMode,
201 BrokerImpl broker, boolean fromDeserialization) {
202 assertOpen();
203 makeReadOnly();
204
205 // decorate the store manager for data caching and custom
206 // result object providers; always make sure it's a delegating
207 // store manager, because it's easier for users to deal with
208 // that way
209 StoreManager sm = newStoreManager();
210 DelegatingStoreManager dsm = null;
211 if (_conf.getDataCacheManagerInstance().getSystemDataCache()
212 != null)
213 dsm = new DataCacheStoreManager(sm);
214 dsm = new ROPStoreManager((dsm == null) ? sm : dsm);
215
216 broker.initialize(this, dsm, managed, connRetainMode,
217 fromDeserialization);
218 if (!fromDeserialization)
219 addListeners(broker);
220
221 // if we're using remote events, register the event manager so
222 // that it can broadcast commit notifications from the broker
223 RemoteCommitEventManager remote = _conf.
224 getRemoteCommitEventManager();
225 if (remote.areRemoteEventsEnabled())
226 broker.addTransactionListener(remote);
227
228 loadPersistentTypes(broker.getClassLoader());
229 _brokers.add(broker);
230 _conf.setReadOnly(Configuration.INIT_STATE_FROZEN);
231 }
232
233 /**
234 * Add factory-registered lifecycle listeners to the broker.
235 */
236 protected void addListeners(BrokerImpl broker) {
237 if (_lifecycleListeners != null && !_lifecycleListeners.isEmpty()) {
238 Map.Entry entry;
239 for (Iterator itr = _lifecycleListeners.entrySet().iterator();
240 itr.hasNext();) {
241 entry = (Map.Entry) itr.next();
242 broker.addLifecycleListener(entry.getKey(), (Class[])
243 entry.getValue());
244 }
245 }
246
247 if (_transactionListeners != null && !_transactionListeners.isEmpty()) {
248 for (Iterator itr = _transactionListeners.iterator();
249 itr.hasNext(); ) {
250 broker.addTransactionListener(itr.next());
251 }
252 }
253 }
254
255 /**
256 * Load the configured persistent classes list. Performed automatically
257 * whenever a broker is created.
258 */
259 private void loadPersistentTypes(ClassLoader envLoader) {
260 // if we've loaded the persistent types and the class name list
261 // is empty, then we can simply return. Note that there is a
262 // potential threading scenario in which _persistentTypesLoaded is
263 // false when read, but the work to populate _pcClassNames has
264 // already been done. This is ok; _pcClassNames can tolerate
265 // concurrent access, so the worst case is that the list is
266 // persistent type data is processed multiple times, which this
267 // algorithm takes into account.
268 if (_persistentTypesLoaded && _pcClassNames.isEmpty())
269 return;
270
271 // cache persistent type names if not already
272 ClassLoader loader = _conf.getClassResolverInstance().
273 getClassLoader(getClass(), envLoader);
274 Collection toRedefine = new ArrayList();
275 if (!_persistentTypesLoaded) {
276 Collection clss = _conf.getMetaDataRepositoryInstance().
277 loadPersistentTypes(false, loader);
278 if (clss.isEmpty())
279 _pcClassNames = Collections.EMPTY_SET;
280 else {
281 Collection c = new ArrayList(clss.size());
282 for (Iterator itr = clss.iterator(); itr.hasNext();) {
283 Class cls = (Class) itr.next();
284 c.add(cls.getName());
285 if (needsSub(cls))
286 toRedefine.add(cls);
287 }
288 getPcClassLoaders().add(loader);
289 _pcClassNames = c;
290 }
291 _persistentTypesLoaded = true;
292 } else {
293 // reload with this loader
294 if (getPcClassLoaders().add(loader)) {
295 for (Iterator itr = _pcClassNames.iterator(); itr.hasNext();) {
296 try {
297 Class cls =
298 Class.forName((String) itr.next(), true, loader);
299 if (needsSub(cls))
300 toRedefine.add(cls);
301 } catch (Throwable t) {
302 _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME)
303 .warn(null, t);
304 }
305 }
306 }
307 }
308
309 // get the ManagedClassSubclasser into the loop
310 ManagedClassSubclasser.prepareUnenhancedClasses(
311 _conf, toRedefine, envLoader);
312 }
313
314 private boolean needsSub(Class cls) {
315 return !cls.isInterface()
316 && !PersistenceCapable.class.isAssignableFrom(cls);
317 }
318
319 public void addLifecycleListener(Object listener, Class[] classes) {
320 lock();
321 try {
322 assertOpen();
323 if (_lifecycleListeners == null)
324 _lifecycleListeners = new HashMap(7);
325 _lifecycleListeners.put(listener, classes);
326 } finally {
327 unlock();
328 }
329 }
330
331 public void removeLifecycleListener(Object listener) {
332 lock();
333 try {
334 assertOpen();
335 if (_lifecycleListeners != null)
336 _lifecycleListeners.remove(listener);
337 } finally {
338 unlock();
339 }
340 }
341
342 public void addTransactionListener(Object listener) {
343 lock();
344 try {
345 assertOpen();
346 if (_transactionListeners == null)
347 _transactionListeners = new LinkedList();
348 _transactionListeners.add(listener);
349 } finally {
350 unlock();
351 }
352 }
353
354 public void removeTransactionListener(Object listener) {
355 lock();
356 try {
357 assertOpen();
358 if (_transactionListeners != null)
359 _transactionListeners.remove(listener);
360 } finally {
361 unlock();
362 }
363 }
364
365 /**
366 * Returns true if this broker factory is closed.
367 */
368 public boolean isClosed() {
369 return _closed;
370 }
371
372 public void close() {
373 lock();
374 try {
375 assertOpen();
376 assertNoActiveTransaction();
377
378 // remove from factory pool
379 synchronized (_pool) {
380 if (_pool.get(_poolKey) == this)
381 _pool.remove(_poolKey);
382 }
383
384 // close all brokers
385 Broker broker;
386 for (Iterator itr = _brokers.iterator(); itr.hasNext();) {
387 broker = (Broker) itr.next();
388 // Check for null because _brokers may contain weak references
389 if ((broker != null) && (!broker.isClosed()))
390 broker.close();
391 }
392
393 if(_conf.metaDataRepositoryAvailable()) {
394 // remove metadata repository from listener list
395 PCRegistry.removeRegisterClassListener
396 (_conf.getMetaDataRepositoryInstance());
397 }
398
399 _conf.close();
400 _closed = true;
401 Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
402 if (log.isTraceEnabled())
403 _closedException = new IllegalStateException();
404 } finally {
405 unlock();
406 }
407 }
408
409 /**
410 * Subclasses should override this method to add a <code>Platform</code>
411 * property listing the runtime platform, such as:
412 * <code>OpenJPA JDBC Edition: Oracle Database</code>
413 */
414 public Properties getProperties() {
415 // required props are VendorName and VersionNumber
416 Properties props = new Properties();
417 props.setProperty("VendorName", OpenJPAVersion.VENDOR_NAME);
418 props.setProperty("VersionNumber", OpenJPAVersion.VERSION_NUMBER);
419 props.setProperty("VersionId", OpenJPAVersion.VERSION_ID);
420 return props;
421 }
422
423 public Object getUserObject(Object key) {
424 lock();
425 try {
426 assertOpen();
427 return (_userObjects == null) ? null : _userObjects.get(key);
428 } finally {
429 unlock();
430 }
431 }
432
433 public Object putUserObject(Object key, Object val) {
434 lock();
435 try {
436 assertOpen();
437 if (val == null)
438 return (_userObjects == null) ? null : _userObjects.remove(key);
439
440 if (_userObjects == null)
441 _userObjects = new HashMap();
442 return _userObjects.put(key, val);
443 } finally {
444 unlock();
445 }
446 }
447
448 public void lock() {
449 _lock.lock();
450 }
451
452 public void unlock() {
453 _lock.unlock();
454 }
455
456 /**
457 * Replaces the factory with this JVMs pooled version if it exists. Also
458 * freezes the factory.
459 */
460 protected Object readResolve()
461 throws ObjectStreamException {
462 AbstractBrokerFactory factory = getPooledFactoryForKey(_poolKey);
463 if (factory != null)
464 return factory;
465
466 // reset these transient fields to empty values
467 _transactional = new ConcurrentHashMap();
468 _brokers = newBrokerSet();
469
470 makeReadOnly();
471 return this;
472 }
473
474 private Set newBrokerSet() {
475 BrokerValue bv;
476 if (_conf instanceof OpenJPAConfigurationImpl)
477 bv = ((OpenJPAConfigurationImpl) _conf).brokerPlugin;
478 else
479 bv = (BrokerValue) _conf.getValue(BrokerValue.KEY);
480
481 if (FinalizingBrokerImpl.class.isAssignableFrom(
482 bv.getTemplateBrokerType(_conf))) {
483 return MapBackedSet.decorate(new ConcurrentHashMap(),
484 new Object() { });
485 } else {
486 return new ConcurrentReferenceHashSet(
487 ConcurrentReferenceHashSet.WEAK);
488 }
489 }
490
491 ////////////////////////
492 // Methods for Override
493 ////////////////////////
494
495 /**
496 * Return a new StoreManager for this runtime. Note that the instance
497 * returned here may be wrapped before being passed to the
498 * {@link #newBroker} method.
499 */
500 protected abstract StoreManager newStoreManager();
501
502 /**
503 * Find a pooled broker, or return null if none. If using
504 * managed transactions, looks for a transactional broker;
505 * otherwise returns null by default. This method will be called before
506 * {@link #newStoreManager} so that factory subclasses implementing
507 * pooling can return a matching manager before a new {@link StoreManager}
508 * is created.
509 */
510 protected BrokerImpl findBroker(String user, String pass, boolean managed) {
511 if (managed)
512 return findTransactionalBroker(user, pass);
513 return null;
514 }
515
516 /**
517 * Return a broker configured with the proper settings.
518 * By default, this method constructs a new
519 * BrokerImpl of the class set for this factory.
520 */
521 protected BrokerImpl newBrokerImpl(String user, String pass) {
522 BrokerImpl broker = _conf.newBrokerInstance(user, pass);
523 if (broker == null)
524 throw new UserException(_loc.get("no-broker-class",
525 _conf.getBrokerImpl()));
526
527 return broker;
528 }
529
530 /**
531 * Setup transient state used by this factory based on the
532 * current configuration, which will subsequently be locked down. This
533 * method will be called before the first broker is requested,
534 * and will be re-called each time the factory is deserialized into a JVM
535 * that has no configuration for this data store.
536 */
537 protected void setup() {
538 }
539
540 /////////////
541 // Utilities
542 /////////////
543
544 /**
545 * Find a managed runtime broker associated with the
546 * current transaction, or returns null if none.
547 */
548 protected BrokerImpl findTransactionalBroker(String user, String pass) {
549 Transaction trans;
550 ManagedRuntime mr = _conf.getManagedRuntimeInstance();
551 Object txKey;
552 try {
553 trans = mr.getTransactionManager().
554 getTransaction();
555 txKey = mr.getTransactionKey();
556
557 if (trans == null
558 || trans.getStatus() == Status.STATUS_NO_TRANSACTION
559 || trans.getStatus() == Status.STATUS_UNKNOWN)
560 return null;
561 } catch (OpenJPAException ke) {
562 throw ke;
563 } catch (Exception e) {
564 throw new GeneralException(e);
565 }
566
567 Collection brokers = (Collection) _transactional.get(txKey);
568 if (brokers != null) {
569 // we don't need to synchronize on brokers since one JTA transaction
570 // can never be active on multiple concurrent threads.
571 BrokerImpl broker;
572 for (Iterator itr = brokers.iterator(); itr.hasNext();) {
573 broker = (BrokerImpl) itr.next();
574 if (StringUtils.equals(broker.getConnectionUserName(),
575 user) && StringUtils.equals
576 (broker.getConnectionPassword(), pass))
577 return broker;
578 }
579 }
580 return null;
581 }
582
583 /**
584 * Configures the given broker with the current factory option settings.
585 */
586 protected void configureBroker(BrokerImpl broker) {
587 broker.setOptimistic(_conf.getOptimistic());
588 broker.setNontransactionalRead(_conf.getNontransactionalRead());
589 broker.setNontransactionalWrite(_conf.getNontransactionalWrite());
590 broker.setRetainState(_conf.getRetainState());
591 broker.setRestoreState(_conf.getRestoreStateConstant());
592 broker.setAutoClear(_conf.getAutoClearConstant());
593 broker.setIgnoreChanges(_conf.getIgnoreChanges());
594 broker.setMultithreaded(_conf.getMultithreaded());
595 broker.setAutoDetach(_conf.getAutoDetachConstant());
596 broker.setDetachState(_conf.getDetachStateInstance().
597 getDetachState());
598 }
599
600 /**
601 * Freezes the configuration of this factory.
602 */
603 public void makeReadOnly() {
604 if (_readOnly)
605 return;
606
607 lock();
608 try {
609 // check again
610 if (_readOnly)
611 return;
612 _readOnly = true;
613
614 Log log = _conf.getLog(OpenJPAConfiguration.LOG_RUNTIME);
615 if (log.isInfoEnabled())
616 log.info(getFactoryInitializationBanner());
617 if (log.isTraceEnabled()) {
618 Map props = _conf.toProperties(true);
619 String lineSep = J2DoPrivHelper.getLineSeparator();
620 StringBuffer buf = new StringBuffer();
621 Map.Entry entry;
622 for (Iterator itr = props.entrySet().iterator();
623 itr.hasNext();) {
624 entry = (Map.Entry) itr.next();
625 buf.append(entry.getKey()).append(": ").
626 append(entry.getValue());
627 if (itr.hasNext())
628 buf.append(lineSep);
629 }
630 log.trace(_loc.get("factory-properties", buf.toString()));
631 }
632
633 // setup transient state
634 setup();
635
636 // register the metdata repository to auto-load persistent types
637 // and make sure types are enhanced
638 MetaDataRepository repos = _conf.getMetaDataRepositoryInstance();
639 repos.setValidate(repos.VALIDATE_RUNTIME, true);
640 repos.setResolve(repos.MODE_MAPPING_INIT, true);
641 PCRegistry.addRegisterClassListener(repos);
642
643 // freeze underlying configuration and eagerly initialize to
644 // avoid synchronization
645 _conf.setReadOnly(Configuration.INIT_STATE_FREEZING);
646 _conf.instantiateAll();
647
648 // fire an event for all the broker factory listeners
649 // registered on the configuration.
650 _conf.getBrokerFactoryEventManager().fireEvent(
651 new BrokerFactoryEvent(this,
652 BrokerFactoryEvent.BROKER_FACTORY_CREATED));
653 } finally {
654 unlock();
655 }
656 }
657
658 /**
659 * Return an object to be written to the log when this broker factory
660 * initializes. This happens after the configuration is fully loaded.
661 */
662 protected Object getFactoryInitializationBanner() {
663 return _loc.get("factory-init", OpenJPAVersion.VERSION_NUMBER);
664 }
665
666 /**
667 * Throw an exception if the factory is closed. The exact message and
668 * content of the exception varies whether TRACE is enabled or not.
669 */
670 private void assertOpen() {
671 if (_closed) {
672 if (_closedException == null) // TRACE not enabled
673 throw new InvalidStateException(_loc
674 .get("closed-factory-notrace"));
675 else
676 throw new InvalidStateException(_loc.get("closed-factory"))
677 .setCause(_closedException);
678 }
679 }
680
681 ////////////////////
682 // Broker utilities
683 ////////////////////
684
685 /**
686 * Throws a {@link UserException} if a transaction is active. The thrown
687 * exception will contain all the Brokers with active transactions as
688 * failed objects in the nested exceptions.
689 */
690 private void assertNoActiveTransaction() {
691 Collection excs;
692 if (_transactional.isEmpty())
693 return;
694
695 excs = new ArrayList(_transactional.size());
696 for (Iterator trans = _transactional.values().iterator();
697 trans.hasNext();) {
698 Collection brokers = (Collection) trans.next();
699 for (Iterator itr = brokers.iterator(); itr.hasNext();) {
700 excs.add(new InvalidStateException(_loc.get("active")).
701 setFailedObject(itr.next()));
702 }
703 }
704
705 if (!excs.isEmpty())
706 throw new InvalidStateException(_loc.get("nested-exceps")).
707 setNestedThrowables((Throwable[]) excs.toArray
708 (new Throwable[excs.size()]));
709 }
710
711 /**
712 * Synchronize the given broker with a managed transaction,
713 * optionally starting one if none is in progress.
714 *
715 * @return true if synched with transaction, false otherwise
716 */
717 boolean syncWithManagedTransaction(BrokerImpl broker, boolean begin) {
718 Transaction trans;
719 try {
720 ManagedRuntime mr = broker.getManagedRuntime();
721 TransactionManager tm = mr.getTransactionManager();
722 trans = tm.getTransaction();
723 if (trans != null
724 && (trans.getStatus() == Status.STATUS_NO_TRANSACTION
725 || trans.getStatus() == Status.STATUS_UNKNOWN))
726 trans = null;
727
728 if (trans == null && begin) {
729 tm.begin();
730 trans = tm.getTransaction();
731 } else if (trans == null)
732 return false;
733
734 // synch broker and trans
735 trans.registerSynchronization(broker);
736
737 // we don't need to synchronize on brokers or guard against multiple
738 // threads using the same trans since one JTA transaction can never
739 // be active on multiple concurrent threads.
740 Object txKey = mr.getTransactionKey();
741 Collection brokers = (Collection) _transactional.get(txKey);
742
743 if (brokers == null) {
744 brokers = new ArrayList(2);
745 _transactional.put(txKey, brokers);
746 trans.registerSynchronization(new RemoveTransactionSync(txKey));
747 }
748 brokers.add(broker);
749
750 return true;
751 } catch (OpenJPAException ke) {
752 throw ke;
753 } catch (Exception e) {
754 throw new GeneralException(e);
755 }
756 }
757
758 /**
759 * Returns a set of all the open brokers associated with this factory. The
760 * returned set is unmodifiable, and may contain null references.
761 */
762 public Collection getOpenBrokers() {
763 return Collections.unmodifiableCollection(_brokers);
764 }
765
766 /**
767 * Release <code>broker</code> from any internal data structures. This
768 * is invoked by <code>broker</code> after the broker is fully closed.
769 *
770 * @since 1.1.0
771 */
772 protected void releaseBroker(BrokerImpl broker) {
773 _brokers.remove(broker);
774 }
775
776 /**
777 * @return a key that can be used to obtain this broker factory from the
778 * pool at a later time.
779 *
780 * @since 1.1.0
781 */
782 public Object getPoolKey() {
783 return _poolKey;
784 }
785
786 /**
787 * Set a key that can be used to obtain this broker factory from the
788 * pool at a later time.
789 *
790 * @since 1.1.0
791 */
792 void setPoolKey(Object key) {
793 _poolKey = key;
794 }
795
796 /**
797 * Simple synchronization listener to remove completed transactions
798 * from our cache.
799 */
800 private class RemoveTransactionSync
801 implements Synchronization {
802
803 private final Object _trans;
804
805 public RemoveTransactionSync(Object trans) {
806 _trans = trans;
807 }
808
809 public void beforeCompletion() {
810 }
811
812 public void afterCompletion(int status) {
813 _transactional.remove (_trans);
814 }
815 }
816
817 /**
818 * Method insures that deserialized EMF has this reference re-instantiated
819 */
820 private Collection getPcClassLoaders() {
821 if (_pcClassLoaders == null)
822 _pcClassLoaders = new ConcurrentReferenceHashSet(
823 ConcurrentReferenceHashSet.WEAK);
824
825 return _pcClassLoaders;
826 }
827 }