1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.openjpa.datacache;
20
21 import java.util.ArrayList;
22 import java.util.BitSet;
23 import java.util.Collection;
24 import java.util.Collections;
25 import java.util.HashMap;
26 import java.util.HashSet;
27 import java.util.Iterator;
28 import java.util.List;
29 import java.util.Map;
30 import java.util.Map.Entry;
31
32 import org.apache.openjpa.enhance.PCDataGenerator;
33 import org.apache.openjpa.kernel.DelegatingStoreManager;
34 import org.apache.openjpa.kernel.FetchConfiguration;
35 import org.apache.openjpa.kernel.LockLevels;
36 import org.apache.openjpa.kernel.OpenJPAStateManager;
37 import org.apache.openjpa.kernel.PCState;
38 import org.apache.openjpa.kernel.QueryLanguages;
39 import org.apache.openjpa.kernel.StoreContext;
40 import org.apache.openjpa.kernel.StoreManager;
41 import org.apache.openjpa.kernel.StoreQuery;
42 import org.apache.openjpa.meta.ClassMetaData;
43 import org.apache.openjpa.meta.MetaDataRepository;
44 import org.apache.openjpa.util.OpenJPAId;
45 import org.apache.openjpa.util.OptimisticException;
46
47 /**
48 * StoreManager proxy that delegates to a data cache when possible.
49 *
50 * @author Patrick Linskey
51 * @nojavadoc
52 */
53 public class DataCacheStoreManager
54 extends DelegatingStoreManager {
55
56 // all the state managers changed in this transaction
57 private Collection _inserts = null; // statemanagers
58 private Map _updates = null; // statemanager -> fmd set
59 private Collection _deletes = null; // statemanagers
60
61 // the owning context
62 private StoreContext _ctx = null;
63
64 // pc data generator
65 private PCDataGenerator _gen = null;
66
67 /**
68 * Constructor.
69 *
70 * @param sm the store manager to delegate to
71 */
72 public DataCacheStoreManager(StoreManager sm) {
73 super(sm);
74 }
75
76 public void setContext(StoreContext ctx) {
77 _ctx = ctx;
78 _gen = ctx.getConfiguration().getDataCacheManagerInstance().
79 getPCDataGenerator();
80 super.setContext(ctx);
81 }
82
83 public void begin() {
84 super.begin();
85 }
86
87 public void commit() {
88 try {
89 super.commit();
90 updateCaches();
91 } finally {
92 _inserts = null;
93 _updates = null;
94 _deletes = null;
95 }
96 }
97
98 public void rollback() {
99 try {
100 super.rollback();
101 } finally {
102 _inserts = null;
103 _updates = null;
104 _deletes = null;
105 }
106 }
107
108 /**
109 * Evict all members of the given classes.
110 */
111 private void evictTypes(Collection classes) {
112 if (classes.isEmpty())
113 return;
114
115 MetaDataRepository mdr = _ctx.getConfiguration().
116 getMetaDataRepositoryInstance();
117 ClassLoader loader = _ctx.getClassLoader();
118
119 Class cls;
120 DataCache cache;
121 for (Iterator itr = classes.iterator(); itr.hasNext();) {
122 cls = (Class) itr.next();
123 cache = mdr.getMetaData(cls, loader, false).getDataCache();
124 if (cache != null)
125 cache.removeAll(cls, false);
126 }
127 }
128
129 /**
130 * Update all caches with the committed inserts, updates, and deletes.
131 */
132 private void updateCaches() {
133 // map each data cache to the modifications we need to perform
134 Map modMap = null;
135 Modifications mods;
136 OpenJPAStateManager sm;
137 DataCachePCData data;
138 DataCache cache;
139
140 // create pc datas for inserts
141 if (_ctx.getPopulateDataCache() && _inserts != null) {
142 for (Iterator itr = _inserts.iterator(); itr.hasNext();) {
143 sm = (OpenJPAStateManager) itr.next();
144 cache = sm.getMetaData().getDataCache();
145 if (cache == null)
146 continue;
147
148 if (modMap == null)
149 modMap = new HashMap();
150 mods = getModifications(modMap, cache);
151 data = newPCData(sm);
152 data.store(sm);
153 mods.additions.add(new PCDataHolder(data, sm));
154 }
155 }
156
157 // update pcdatas for updates
158 Map.Entry entry;
159 if (_updates != null) {
160 BitSet fields;
161 for (Iterator itr = _updates.entrySet().iterator();
162 itr.hasNext();) {
163 entry = (Map.Entry) itr.next();
164 sm = (OpenJPAStateManager) entry.getKey();
165 fields = (BitSet) entry.getValue();
166
167 cache = sm.getMetaData().getDataCache();
168 if (cache == null)
169 continue;
170
171 // it's ok not to clone the object that we get from the cache,
172 // since we're inside the commit() method, so any modifications
173 // to the underlying cache are valid. If the commit had not
174 // already succeeded, then we'd want to clone the retrieved
175 // object.
176 if (modMap == null)
177 modMap = new HashMap();
178 data = cache.get(sm.getObjectId());
179 mods = getModifications(modMap, cache);
180
181 // data should always be non-null, since the object is
182 // dirty, but maybe it got dropped from the cache in the
183 // interim
184 if (data == null) {
185 data = newPCData(sm);
186 data.store(sm);
187 mods.newUpdates.add(new PCDataHolder(data, sm));
188 } else {
189 data.store(sm, fields);
190 mods.existingUpdates.add(new PCDataHolder(data, sm));
191 }
192 }
193 }
194
195 // remove pcdatas for deletes
196 if (_deletes != null) {
197 for (Iterator itr = _deletes.iterator(); itr.hasNext();) {
198 sm = (OpenJPAStateManager) itr.next();
199 cache = sm.getMetaData().getDataCache();
200 if (cache == null)
201 continue;
202
203 if (modMap == null)
204 modMap = new HashMap();
205 mods = getModifications(modMap, cache);
206 mods.deletes.add(sm.getObjectId());
207 }
208 }
209
210 // notify the caches of the changes
211 if (modMap != null) {
212 for (Iterator itr = modMap.entrySet().iterator(); itr.hasNext();) {
213 entry = (Map.Entry) itr.next();
214 cache = (DataCache) entry.getKey();
215 mods = (Modifications) entry.getValue();
216
217 // make sure we're not caching old versions
218 cache.writeLock();
219 try {
220 transformToVersionSafePCDatas(cache, mods.additions);
221 transformToVersionSafePCDatas(cache, mods.newUpdates);
222 transformToVersionSafePCDatas(cache, mods.existingUpdates);
223 cache.commit(mods.additions, mods.newUpdates,
224 mods.existingUpdates, mods.deletes);
225 } finally {
226 cache.writeUnlock();
227 }
228 }
229 }
230
231 // if we were in largeTransaction mode, then we have recorded
232 // the classes of updated/deleted objects and these now need to be
233 // evicted
234 if (_ctx.isTrackChangesByType()) {
235 evictTypes(_ctx.getDeletedTypes());
236 evictTypes(_ctx.getUpdatedTypes());
237 }
238
239 // and notify the query cache. notify in one batch to reduce synch
240 QueryCache queryCache = _ctx.getConfiguration().
241 getDataCacheManagerInstance().getSystemQueryCache();
242 if (queryCache != null) {
243 Collection pers = _ctx.getPersistedTypes();
244 Collection del = _ctx.getDeletedTypes();
245 Collection up = _ctx.getUpdatedTypes();
246 int size = pers.size() + del.size() + up.size();
247 if (size > 0) {
248 Collection types = new ArrayList(size);
249 types.addAll(pers);
250 types.addAll(del);
251 types.addAll(up);
252 queryCache.onTypesChanged(new TypesChangedEvent(this, types));
253 }
254 }
255 }
256
257 /**
258 * Transforms a collection of {@link PCDataHolder}s that might contain
259 * stale instances into a collection of up-to-date {@link DataCachePCData}s.
260 */
261 private void transformToVersionSafePCDatas(DataCache cache,
262 List holders) {
263
264 Map<Object,Integer> ids = new HashMap<Object,Integer>(holders.size());
265 // this list could be removed if DataCache.getAll() took a Collection
266 List idList = new ArrayList(holders.size());
267 int i = 0;
268 for (PCDataHolder holder : (List<PCDataHolder>) holders) {
269 ids.put(holder.sm.getObjectId(), i++);
270 idList.add(holder.sm.getObjectId());
271 }
272
273 Map<Object,DataCachePCData> pcdatas = cache.getAll(idList);
274 for (Entry<Object,DataCachePCData> entry : pcdatas.entrySet()) {
275 Integer index = ids.get(entry.getKey());
276 DataCachePCData oldpc = entry.getValue();
277 PCDataHolder holder = (PCDataHolder) holders.get(index);
278 if (oldpc != null && compareVersion(holder.sm,
279 holder.sm.getVersion(), oldpc.getVersion()) == VERSION_EARLIER)
280 holders.remove(index);
281 else
282 holders.set(index, holder.pcdata);
283 }
284 }
285
286 /**
287 * Return a {@link Modifications} instance to track modifications
288 * to the given cache, creating and caching the instance if it does
289 * not already exist in the given map.
290 */
291 private static Modifications getModifications(Map modMap, DataCache cache) {
292 Modifications mods = (Modifications) modMap.get(cache);
293 if (mods == null) {
294 mods = new Modifications();
295 modMap.put(cache, mods);
296 }
297 return mods;
298 }
299
300 public boolean exists(OpenJPAStateManager sm, Object edata) {
301 DataCache cache = sm.getMetaData().getDataCache();
302 if (cache != null && !isLocking(null)
303 && cache.contains(sm.getObjectId()))
304 return true;
305 return super.exists(sm, edata);
306 }
307
308 public boolean syncVersion(OpenJPAStateManager sm, Object edata) {
309 DataCache cache = sm.getMetaData().getDataCache();
310 if (cache == null || sm.isEmbedded())
311 return super.syncVersion(sm, edata);
312
313 DataCachePCData data;
314 Object version = null;
315 data = cache.get(sm.getObjectId());
316 if (!isLocking(null) && data != null)
317 version = data.getVersion();
318
319 // if we have a cached version update from there
320 if (version != null) {
321 if (!version.equals(sm.getVersion())) {
322 sm.setVersion(version);
323 return false;
324 }
325 return true;
326 }
327
328 // use data store version
329 return super.syncVersion(sm, edata);
330 }
331
332 public boolean initialize(OpenJPAStateManager sm, PCState state,
333 FetchConfiguration fetch, Object edata) {
334 DataCache cache = sm.getMetaData().getDataCache();
335 if (cache == null || sm.isEmbedded())
336 return super.initialize(sm, state, fetch, edata);
337
338 DataCachePCData data = cache.get(sm.getObjectId());
339 if (data != null && !isLocking(fetch)) {
340 //### the 'data.type' access here probably needs to be
341 //### addressed for bug 511
342 sm.initialize(data.getType(), state);
343 data.load(sm, fetch, edata);
344 return true;
345 }
346
347 // initialize from store manager
348 if (!super.initialize(sm, state, fetch, edata))
349 return false;
350 if (!_ctx.getPopulateDataCache())
351 return true;
352
353 // make sure that we're not trying to cache an old version
354 cache.writeLock();
355 try {
356 data = cache.get(sm.getObjectId());
357 if (data != null && compareVersion(sm, sm.getVersion(),
358 data.getVersion()) == VERSION_EARLIER)
359 return true;
360
361 // cache newly loaded info. It is safe to cache data frorm
362 // initialize() because this method is only called upon
363 // initial load of the data.
364 if (data == null)
365 data = newPCData(sm);
366 data.store(sm);
367 cache.put(data);
368 } finally {
369 cache.writeUnlock();
370 }
371 return true;
372 }
373
374 public boolean load(OpenJPAStateManager sm, BitSet fields,
375 FetchConfiguration fetch, int lockLevel, Object edata) {
376 DataCache cache = sm.getMetaData().getDataCache();
377 if (cache == null || sm.isEmbedded())
378 return super.load(sm, fields, fetch, lockLevel, edata);
379
380 DataCachePCData data = cache.get(sm.getObjectId());
381 if (lockLevel == LockLevels.LOCK_NONE && !isLocking(fetch)
382 && data != null)
383 data.load(sm, fields, fetch, edata);
384 if (fields.length() == 0)
385 return true;
386
387 // load from store manager; clone the set of still-unloaded fields
388 // so that if the store manager decides to modify it it won't affect us
389 if (!super.load(sm, (BitSet) fields.clone(), fetch, lockLevel, edata))
390 return false;
391 if (!_ctx.getPopulateDataCache())
392 return true;
393 // Do not load changes into cache if the instance has been flushed
394 if (sm.isFlushed())
395 return true;
396
397 // make sure that we're not trying to cache an old version
398 cache.writeLock();
399 try {
400 data = cache.get(sm.getObjectId());
401 if (data != null && compareVersion(sm, sm.getVersion(),
402 data.getVersion()) == VERSION_EARLIER)
403 return true;
404
405 // cache newly loaded info
406 boolean isNew = data == null;
407 if (isNew)
408 data = newPCData(sm);
409 data.store(sm, fields);
410 if (isNew)
411 cache.put(data);
412 else
413 cache.update(data);
414 } finally {
415 cache.writeUnlock();
416 }
417 return true;
418 }
419
420 public Collection loadAll(Collection sms, PCState state, int load,
421 FetchConfiguration fetch, Object edata) {
422 if (isLocking(fetch))
423 return super.loadAll(sms, state, load, fetch, edata);
424
425 Map unloaded = null;
426 List smList = null;
427 Map caches = new HashMap();
428 OpenJPAStateManager sm;
429 DataCache cache;
430 DataCachePCData data;
431 BitSet fields;
432
433 for (Iterator itr = sms.iterator(); itr.hasNext();) {
434 sm = (OpenJPAStateManager) itr.next();
435 cache = sm.getMetaData().getDataCache();
436 if (cache == null || sm.isEmbedded()) {
437 unloaded = addUnloaded(sm, null, unloaded);
438 continue;
439 }
440
441 if (sm.getManagedInstance() == null
442 || load != FORCE_LOAD_NONE
443 || sm.getPCState() == PCState.HOLLOW) {
444 smList = (List) caches.get(cache);
445 if (smList == null) {
446 smList = new ArrayList();
447 caches.put(cache, smList);
448 }
449 smList.add(sm);
450 } else if (!cache.contains(sm.getObjectId()))
451 unloaded = addUnloaded(sm, null, unloaded);
452 }
453
454 for (Iterator itr = caches.keySet().iterator(); itr.hasNext();) {
455 cache = (DataCache) itr.next();
456 smList = (List) caches.get(cache);
457 List oidList = new ArrayList(smList.size());
458
459 for (itr=smList.iterator();itr.hasNext();) {
460 sm = (OpenJPAStateManager) itr.next();
461 oidList.add((OpenJPAId) sm.getObjectId());
462 }
463
464 Map dataMap = cache.getAll(oidList);
465
466 for (itr=smList.iterator();itr.hasNext();) {
467 sm = (OpenJPAStateManager) itr.next();
468 data = (DataCachePCData) dataMap.get(
469 (OpenJPAId) sm.getObjectId());
470
471 if (sm.getManagedInstance() == null) {
472 if (data != null) {
473 //### the 'data.type' access here probably needs
474 //### to be addressed for bug 511
475 sm.initialize(data.getType(), state);
476 data.load(sm, fetch, edata);
477 } else
478 unloaded = addUnloaded(sm, null, unloaded);
479 } else if (load != FORCE_LOAD_NONE
480 || sm.getPCState() == PCState.HOLLOW) {
481 data = cache.get(sm.getObjectId());
482 if (data != null) {
483 // load unloaded fields
484 fields = sm.getUnloaded(fetch);
485 data.load(sm, fields, fetch, edata);
486 if (fields.length() > 0)
487 unloaded = addUnloaded(sm, fields, unloaded);
488 } else
489 unloaded = addUnloaded(sm, null, unloaded);
490 }
491 }
492 }
493
494 if (unloaded == null)
495 return Collections.EMPTY_LIST;
496
497 // load with delegate
498 Collection failed = super.loadAll(unloaded.keySet(), state, load,
499 fetch, edata);
500 if (!_ctx.getPopulateDataCache())
501 return failed;
502
503 // for each loaded instance, merge loaded state into cached data
504 Map.Entry entry;
505 boolean isNew;
506 for (Iterator itr = unloaded.entrySet().iterator(); itr.hasNext();) {
507 entry = (Map.Entry) itr.next();
508 sm = (OpenJPAStateManager) entry.getKey();
509 fields = (BitSet) entry.getValue();
510
511 cache = sm.getMetaData().getDataCache();
512 if (cache == null || sm.isEmbedded() || (failed != null
513 && failed.contains(sm.getId())))
514 continue;
515
516 // make sure that we're not trying to cache an old version
517 cache.writeLock();
518 try {
519 data = cache.get(sm.getObjectId());
520 if (data != null && compareVersion(sm, sm.getVersion(),
521 data.getVersion()) == VERSION_EARLIER)
522 continue;
523
524 isNew = data == null;
525 if (isNew)
526 data = newPCData(sm);
527 if (fields == null)
528 data.store(sm);
529 else
530 data.store(sm, fields);
531 if (isNew)
532 cache.put(data);
533 else
534 cache.update(data);
535 } finally {
536 cache.writeUnlock();
537 }
538 }
539 return failed;
540 }
541
542 /**
543 * Helper method to add an unloaded instance to the given map.
544 */
545 private static Map addUnloaded(OpenJPAStateManager sm, BitSet fields,
546 Map unloaded) {
547 if (unloaded == null)
548 unloaded = new HashMap();
549 unloaded.put(sm, fields);
550 return unloaded;
551 }
552
553 public Collection flush(Collection states) {
554 Collection exceps = super.flush(states);
555
556 // if there were errors evict bad instances and don't record changes
557 if (!exceps.isEmpty()) {
558 for (Iterator iter = exceps.iterator(); iter.hasNext(); ) {
559 Exception e = (Exception) iter.next();
560 if (e instanceof OptimisticException)
561 notifyOptimisticLockFailure((OptimisticException) e);
562 }
563 return exceps;
564 }
565
566 // if large transaction mode don't record individual changes
567 if (_ctx.isTrackChangesByType())
568 return exceps;
569
570 OpenJPAStateManager sm;
571 for (Iterator itr = states.iterator(); itr.hasNext();) {
572 sm = (OpenJPAStateManager) itr.next();
573
574 if (sm.getPCState() == PCState.PNEW && !sm.isFlushed()) {
575 if (_inserts == null)
576 _inserts = new ArrayList();
577 _inserts.add(sm);
578
579 // may have been re-persisted
580 if (_deletes != null)
581 _deletes.remove(sm);
582 } else if (_inserts != null
583 && (sm.getPCState() == PCState.PNEWDELETED
584 || sm.getPCState() == PCState.PNEWFLUSHEDDELETED))
585 _inserts.remove(sm);
586 else if (sm.getPCState() == PCState.PDIRTY) {
587 if (_updates == null)
588 _updates = new HashMap();
589 _updates.put(sm, sm.getDirty());
590 } else if (sm.getPCState() == PCState.PDELETED) {
591 if (_deletes == null)
592 _deletes = new HashSet();
593 _deletes.add(sm);
594 }
595 }
596 return Collections.EMPTY_LIST;
597 }
598
599 /**
600 * Fire local staleness detection events from the cache the OID (if
601 * available) that resulted in an optimistic lock exception iff the
602 * version information in the cache matches the version information
603 * in the state manager for the failed instance. This means that we
604 * will evict data from the cache for records that should have
605 * successfully committed according to the data cache but
606 * did not. The only predictable reason that could cause this behavior
607 * is a concurrent out-of-band modification to the database that was not
608 * communicated to the cache. This logic makes OpenJPA's data cache
609 * somewhat tolerant of such behavior, in that the cache will be cleaned
610 * up as failures occur.
611 */
612 private void notifyOptimisticLockFailure(OptimisticException e) {
613 Object o = e.getFailedObject();
614 OpenJPAStateManager sm = _ctx.getStateManager(o);
615 if (sm == null)
616 return;
617 Object oid = sm.getId();
618 boolean remove;
619
620 // this logic could be more efficient -- we could aggregate
621 // all the cache->oid changes, and then use DataCache.removeAll()
622 // and less write locks to do the mutation.
623 ClassMetaData meta = sm.getMetaData();
624 DataCache cache = meta.getDataCache();
625 if (cache == null)
626 return;
627
628 cache.writeLock();
629 try {
630 DataCachePCData data = cache.get(oid);
631 if (data == null)
632 return;
633
634 switch (compareVersion(sm, sm.getVersion(), data.getVersion())) {
635 case StoreManager.VERSION_LATER:
636 case StoreManager.VERSION_SAME:
637 // This tx's current version is later than or the same as
638 // the data cache version. In this case, the commit should
639 // have succeeded from the standpoint of the cache. Remove
640 // the instance from cache in the hopes that the cache is
641 // out of sync.
642 remove = true;
643 break;
644 case StoreManager.VERSION_EARLIER:
645 // This tx's current version is earlier than the data
646 // cache version. This is a normal optimistic lock failure.
647 // Do not clean up the cache; it probably already has the
648 // right values, and if not, it'll get cleaned up by a tx
649 // that fails in one of the other case statements.
650 remove = false;
651 break;
652 case StoreManager.VERSION_DIFFERENT:
653 // The version strategy for the failed object does not
654 // store enough information to optimize for expected
655 // failures. Clean up the cache.
656 remove = true;
657 break;
658 default:
659 // Unexpected return value. Remove to be future-proof.
660 remove = true;
661 break;
662 }
663 if (remove)
664 // remove directly instead of via the RemoteCommitListener
665 // since we have a write lock here already, so this is more
666 // efficient than read-locking and then write-locking later.
667 cache.remove(sm.getId());
668 } finally {
669 cache.writeUnlock();
670 }
671
672 // fire off a remote commit stalenesss detection event.
673 _ctx.getConfiguration().getRemoteCommitEventManager()
674 .fireLocalStaleNotification(oid);
675 }
676
677 public StoreQuery newQuery(String language) {
678 StoreQuery q = super.newQuery(language);
679
680 // if the query can't be parsed or it's using a non-parsed language
681 // (one for which there is no ExpressionParser), we can't cache it.
682 if (q == null || QueryLanguages.parserForLanguage(language) == null)
683 return q;
684
685 QueryCache queryCache = _ctx.getConfiguration().
686 getDataCacheManagerInstance().getSystemQueryCache();
687 if (queryCache == null)
688 return q;
689
690 return new QueryCacheStoreQuery(q, queryCache);
691 }
692
693 /**
694 * Create a new cacheable instance for the given state manager.
695 */
696 private DataCachePCData newPCData(OpenJPAStateManager sm) {
697 ClassMetaData meta = sm.getMetaData();
698 if (_gen != null)
699 return (DataCachePCData) _gen.generatePCData
700 (sm.getObjectId(), meta);
701 return new DataCachePCDataImpl(sm.fetchObjectId(), meta);
702 }
703
704 /**
705 * Return whether the context is locking loaded data.
706 */
707 private boolean isLocking(FetchConfiguration fetch) {
708 if (fetch == null)
709 fetch = _ctx.getFetchConfiguration();
710 return fetch.getReadLockLevel() > LockLevels.LOCK_NONE;
711 }
712
713 /**
714 * Structure used during the commit process to track cache modifications.
715 */
716 private static class Modifications {
717
718 public final List additions = new ArrayList();
719 public final List newUpdates = new ArrayList();
720 public final List existingUpdates = new ArrayList();
721 public final List deletes = new ArrayList();
722 }
723
724 private static class PCDataHolder {
725
726 public final DataCachePCData pcdata;
727 public final OpenJPAStateManager sm;
728
729 public PCDataHolder(DataCachePCData pcdata,
730 OpenJPAStateManager sm) {
731 this.pcdata = pcdata;
732 this.sm = sm;
733 }
734 }
735 }
736