1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.openjpa.datacache;
20
21 import java.io.ObjectStreamException;
22 import java.io.Serializable;
23 import java.util.AbstractList;
24 import java.util.ArrayList;
25 import java.util.BitSet;
26 import java.util.Collections;
27 import java.util.Date;
28 import java.util.List;
29 import java.util.Locale;
30 import java.util.Map;
31 import java.util.TreeMap;
32
33 import org.apache.commons.collections.map.LinkedMap;
34 import org.apache.openjpa.kernel.FetchConfiguration;
35 import org.apache.openjpa.kernel.LockLevels;
36 import org.apache.openjpa.kernel.QueryContext;
37 import org.apache.openjpa.kernel.StoreContext;
38 import org.apache.openjpa.kernel.StoreQuery;
39 import org.apache.openjpa.kernel.exps.AggregateListener;
40 import org.apache.openjpa.kernel.exps.FilterListener;
41 import org.apache.openjpa.lib.rop.ListResultObjectProvider;
42 import org.apache.openjpa.lib.rop.ResultObjectProvider;
43 import org.apache.openjpa.meta.ClassMetaData;
44 import org.apache.openjpa.meta.JavaTypes;
45 import org.apache.openjpa.meta.MetaDataRepository;
46 import org.apache.openjpa.util.ObjectNotFoundException;
47 import serp.util.Numbers;
48
49 /**
50 * A {@link StoreQuery} implementation that caches the OIDs involved in
51 * the query, and can determine whether or not the query has been dirtied.
52 *
53 * @author Patrick Linskey
54 * @since 0.2.5.0
55 */
56 public class QueryCacheStoreQuery
57 implements StoreQuery {
58
59 private final StoreQuery _query;
60 private final QueryCache _cache;
61 private StoreContext _sctx;
62 private MetaDataRepository _repos;
63
64 /**
65 * Create a new instance that delegates to <code>query</code> if no
66 * cached results are available.
67 */
68 public QueryCacheStoreQuery(StoreQuery query, QueryCache cache) {
69 _query = query;
70 _cache = cache;
71 }
72
73 /**
74 * Return the {@link QueryCache} that this object is associated with.
75 */
76 public QueryCache getCache() {
77 return _cache;
78 }
79
80 /**
81 * Delegate.
82 */
83 public StoreQuery getDelegate() {
84 return _query;
85 }
86
87 /**
88 * Look in the query cache for a result for the given query
89 * key. Only look if this query is being executed outside a
90 * transaction or in a transaction with IgnoreChanges set to true
91 * or in a transaction with IgnoreChanges set to false but in which
92 * none of the classes involved in this query have been touched.
93 * Caching is not used when using object locking.
94 * This is because we must obtain locks on the
95 * data, and it is likely that making n trips to the database to
96 * make the locks will be slower than running the query against
97 * the database.
98 * If the fetch configuration has query caching disabled,
99 * then this method returns <code>null</code>.
100 * Return the list if we meet the above criteria and if a list
101 * is found for <code>qk</code>. Else, return
102 * <code>null</code>.
103 * This implementation means that queries against the cache
104 * are of READ_COMMITTED isolation level. It'd be nice to support
105 * READ_SERIALIZABLE -- to do so, we'd just return false when in
106 * a transaction.
107 */
108 private List checkCache(QueryKey qk) {
109 if (qk == null)
110 return null;
111 FetchConfiguration fetch = getContext().getFetchConfiguration();
112 if (!fetch.getQueryCacheEnabled())
113 return null;
114 if (fetch.getReadLockLevel() > LockLevels.LOCK_NONE)
115 return null;
116
117 // get the cached data
118 QueryResult res = _cache.get(qk);
119 if (res == null)
120 return null;
121 if (res.isEmpty())
122 return Collections.EMPTY_LIST;
123
124 int projs = getContext().getProjectionAliases().length;
125 if (projs == 0) {
126 // make sure the data cache contains the oids for the query result;
127 // if it doesn't, then using the result could be slower than not
128 // using it because of the individual by-oid lookups
129 ClassMetaData meta = _repos.getMetaData(getContext().
130 getCandidateType(), _sctx.getClassLoader(), true);
131 if (meta.getDataCache() == null)
132 return null;
133
134 BitSet idxs = meta.getDataCache().containsAll(res);
135
136 // eventually we should optimize this to figure out how many objects
137 // the cache is missing and if only a few do a bulk fetch for them
138 int len = idxs.length();
139 if (len < res.size())
140 return null;
141 for (int i = 0; i < len; i++)
142 if (!idxs.get(i))
143 return null;
144 }
145 return new CachedList(res, projs != 0, _sctx);
146 }
147
148 /**
149 * Wrap the result object provider returned by our delegate in a
150 * caching provider.
151 */
152 private ResultObjectProvider wrapResult(ResultObjectProvider rop,
153 QueryKey key) {
154 if (key == null)
155 return rop;
156 return new CachingResultObjectProvider(rop, getContext().
157 getProjectionAliases().length > 0, key);
158 }
159
160 /**
161 * Copy a projection element for caching / returning.
162 */
163 private static Object copyProjection(Object obj, StoreContext ctx) {
164 if (obj == null)
165 return null;
166 switch (JavaTypes.getTypeCode(obj.getClass())) {
167 case JavaTypes.STRING:
168 case JavaTypes.BOOLEAN_OBJ:
169 case JavaTypes.BYTE_OBJ:
170 case JavaTypes.CHAR_OBJ:
171 case JavaTypes.DOUBLE_OBJ:
172 case JavaTypes.FLOAT_OBJ:
173 case JavaTypes.INT_OBJ:
174 case JavaTypes.LONG_OBJ:
175 case JavaTypes.SHORT_OBJ:
176 case JavaTypes.BIGDECIMAL:
177 case JavaTypes.BIGINTEGER:
178 case JavaTypes.OID:
179 return obj;
180 case JavaTypes.DATE:
181 return ((Date) obj).clone();
182 case JavaTypes.LOCALE:
183 return ((Locale) obj).clone();
184 default:
185 if (obj instanceof CachedObjectId)
186 return fromObjectId(((CachedObjectId) obj).oid, ctx);
187 Object oid = ctx.getObjectId(obj);
188 if (oid != null)
189 return new CachedObjectId(oid);
190 return obj;
191 }
192 }
193
194 /**
195 * Return the result object based on its cached oid.
196 */
197 private static Object fromObjectId(Object oid, StoreContext sctx) {
198 if (oid == null)
199 return null;
200
201 Object obj = sctx.find(oid, null, null, null, 0);
202 if (obj == null)
203 throw new ObjectNotFoundException(oid);
204 return obj;
205 }
206
207 public Object writeReplace()
208 throws ObjectStreamException {
209 return _query;
210 }
211
212 public QueryContext getContext() {
213 return _query.getContext();
214 }
215
216 public void setContext(QueryContext qctx) {
217 _query.setContext(qctx);
218 _sctx = qctx.getStoreContext();
219 _repos = _sctx.getConfiguration().getMetaDataRepositoryInstance();
220 }
221
222 public boolean setQuery(Object query) {
223 return _query.setQuery(query);
224 }
225
226 public FilterListener getFilterListener(String tag) {
227 return _query.getFilterListener(tag);
228 }
229
230 public AggregateListener getAggregateListener(String tag) {
231 return _query.getAggregateListener(tag);
232 }
233
234 public Object newCompilationKey() {
235 return _query.newCompilationKey();
236 }
237
238 public Object newCompilation() {
239 return _query.newCompilation();
240 }
241
242 public void populateFromCompilation(Object comp) {
243 _query.populateFromCompilation(comp);
244 }
245
246 public void invalidateCompilation() {
247 _query.invalidateCompilation();
248 }
249
250 public boolean supportsDataStoreExecution() {
251 return _query.supportsDataStoreExecution();
252 }
253
254 public boolean supportsInMemoryExecution() {
255 return _query.supportsInMemoryExecution();
256 }
257
258 public Executor newInMemoryExecutor(ClassMetaData meta, boolean subs) {
259 return _query.newInMemoryExecutor(meta, subs);
260 }
261
262 public Executor newDataStoreExecutor(ClassMetaData meta, boolean subs) {
263 Executor ex = _query.newDataStoreExecutor(meta, subs);
264 return new QueryCacheExecutor(ex, meta, subs);
265 }
266
267 public boolean supportsAbstractExecutors() {
268 return _query.supportsAbstractExecutors();
269 }
270
271 public boolean requiresCandidateType() {
272 return _query.requiresCandidateType();
273 }
274
275 public boolean requiresParameterDeclarations() {
276 return _query.requiresParameterDeclarations();
277 }
278
279 public boolean supportsParameterDeclarations() {
280 return _query.supportsParameterDeclarations();
281 }
282
283 /**
284 * Caching executor.
285 */
286 private static class QueryCacheExecutor
287 implements Executor {
288
289 private final Executor _ex;
290 private final Class _candidate;
291 private final boolean _subs;
292
293 public QueryCacheExecutor(Executor ex, ClassMetaData meta,
294 boolean subs) {
295 _ex = ex;
296 _candidate = (meta == null) ? null : meta.getDescribedType();
297 _subs = subs;
298 }
299
300 public ResultObjectProvider executeQuery(StoreQuery q, Object[] params,
301 Range range) {
302 QueryCacheStoreQuery cq = (QueryCacheStoreQuery) q;
303 QueryKey key = QueryKey.newInstance(cq.getContext(),
304 _ex.isPacking(q), params, _candidate, _subs, range.start,
305 range.end);
306 List cached = cq.checkCache(key);
307 if (cached != null)
308 return new ListResultObjectProvider(cached);
309
310 ResultObjectProvider rop = _ex.executeQuery(cq.getDelegate(),
311 params, range);
312 return cq.wrapResult(rop, key);
313 }
314
315 /**
316 * Clear the cached queries associated with the access path
317 * classes in the query. This is done when bulk operations
318 * (such as deletes or updates) are performed so that the
319 * cache remains up-to-date.
320 */
321 private void clearAccessPath(StoreQuery q) {
322 if (q == null)
323 return;
324
325 ClassMetaData[] cmd = getAccessPathMetaDatas(q);
326 if (cmd == null || cmd.length == 0)
327 return;
328
329 List classes = new ArrayList(cmd.length);
330 for (int i = 0; i < cmd.length; i++)
331 classes.add(cmd[i].getDescribedType());
332
333 // evict from the query cache
334 QueryCacheStoreQuery cq = (QueryCacheStoreQuery) q;
335 cq.getCache().onTypesChanged(new TypesChangedEvent
336 (q.getContext(), classes));
337
338 // evict from the data cache
339 for (int i = 0; i < cmd.length; i++) {
340 if (cmd[i].getDataCache() != null)
341 cmd[i].getDataCache().removeAll(
342 cmd[i].getDescribedType(), true);
343 }
344 }
345
346 public Number executeDelete(StoreQuery q, Object[] params) {
347 try {
348 return _ex.executeDelete(unwrap(q), params);
349 } finally {
350 clearAccessPath(q);
351 }
352 }
353
354 public Number executeUpdate(StoreQuery q, Object[] params) {
355 try {
356 return _ex.executeUpdate(unwrap(q), params);
357 } finally {
358 clearAccessPath(q);
359 }
360 }
361
362 public String[] getDataStoreActions(StoreQuery q, Object[] params,
363 Range range) {
364 return EMPTY_STRINGS;
365 }
366
367 public void validate(StoreQuery q) {
368 _ex.validate(unwrap(q));
369 }
370
371 public void getRange(StoreQuery q, Object[] params, Range range) {
372 _ex.getRange(q, params, range);
373 }
374
375 public Object getOrderingValue(StoreQuery q, Object[] params,
376 Object resultObject, int orderIndex) {
377 return _ex.getOrderingValue(unwrap(q), params, resultObject,
378 orderIndex);
379 }
380
381 public boolean[] getAscending(StoreQuery q) {
382 return _ex.getAscending(unwrap(q));
383 }
384
385 public boolean isPacking(StoreQuery q) {
386 return _ex.isPacking(unwrap(q));
387 }
388
389 public String getAlias(StoreQuery q) {
390 return _ex.getAlias(unwrap(q));
391 }
392
393 public Class getResultClass(StoreQuery q) {
394 return _ex.getResultClass(unwrap(q));
395 }
396
397 public String[] getProjectionAliases(StoreQuery q) {
398 return _ex.getProjectionAliases(unwrap(q));
399 }
400
401 public Class[] getProjectionTypes(StoreQuery q) {
402 return _ex.getProjectionTypes(unwrap(q));
403 }
404
405 public ClassMetaData[] getAccessPathMetaDatas(StoreQuery q) {
406 return _ex.getAccessPathMetaDatas(unwrap(q));
407 }
408
409 public int getOperation(StoreQuery q) {
410 return _ex.getOperation(unwrap(q));
411 }
412
413 public boolean isAggregate(StoreQuery q) {
414 return _ex.isAggregate(unwrap(q));
415 }
416
417 public boolean hasGrouping(StoreQuery q) {
418 return _ex.hasGrouping(unwrap(q));
419 }
420
421 public LinkedMap getParameterTypes(StoreQuery q) {
422 return _ex.getParameterTypes(unwrap(q));
423 }
424
425 public Map getUpdates(StoreQuery q) {
426 return _ex.getUpdates(unwrap(q));
427 }
428
429 private static StoreQuery unwrap(StoreQuery q) {
430 return ((QueryCacheStoreQuery) q).getDelegate();
431 }
432 }
433
434 /**
435 * Result list implementation for a cached query result. Package-protected
436 * for testing.
437 */
438 public static class CachedList
439 extends AbstractList
440 implements Serializable {
441
442 private final QueryResult _res;
443 private final boolean _proj;
444 private final StoreContext _sctx;
445
446 public CachedList(QueryResult res, boolean proj, StoreContext ctx) {
447 _res = res;
448 _proj = proj;
449 _sctx = ctx;
450 }
451
452 public Object get(int idx) {
453 if (!_proj)
454 return fromObjectId(_res.get(idx), _sctx);
455
456 Object[] cached = (Object[]) _res.get(idx);
457 if (cached == null)
458 return null;
459 Object[] uncached = new Object[cached.length];
460 for (int i = 0; i < cached.length; i++)
461 uncached[i] = copyProjection(cached[i], _sctx);
462 return uncached;
463 }
464
465 public int size() {
466 return _res.size();
467 }
468
469 public Object writeReplace()
470 throws ObjectStreamException {
471 return new ArrayList(this);
472 }
473 }
474
475 /**
476 * A wrapper around a {@link ResultObjectProvider} that builds up a list of
477 * all the OIDs in this list and registers that list with the
478 * query cache. Abandons monitoring and registering if one of the classes
479 * in the access path is modified while the query results are being loaded.
480 */
481 private class CachingResultObjectProvider
482 implements ResultObjectProvider, TypesChangedListener {
483
484 private final ResultObjectProvider _rop;
485 private final boolean _proj;
486 private final QueryKey _qk;
487 private final TreeMap _data = new TreeMap();
488 private boolean _maintainCache = true;
489 private int _pos = -1;
490
491 // used to determine list size without necessarily calling size(),
492 // which may require a DB trip or return Integer.MAX_VALUE
493 private int _max = -1;
494 private int _size = Integer.MAX_VALUE;
495
496 /**
497 * Constructor. Supply delegate result provider and our query key.
498 */
499 public CachingResultObjectProvider(ResultObjectProvider rop,
500 boolean proj, QueryKey key) {
501 _rop = rop;
502 _proj = proj;
503 _qk = key;
504 _cache.addTypesChangedListener(this);
505 }
506
507 /**
508 * Stop caching.
509 */
510 private void abortCaching() {
511 if (!_maintainCache)
512 return;
513
514 // this can be called via an event from another thread
515 synchronized (this) {
516 // it's important that we set this flag first so that any
517 // subsequent calls to this object are bypassed.
518 _maintainCache = false;
519 _cache.removeTypesChangedListener(this);
520 _data.clear();
521 }
522 }
523
524 /**
525 * Check whether we've buffered all results, while optionally adding
526 * the given result.
527 */
528 private void checkFinished(Object obj, boolean result) {
529 // this can be called at the same time as abortCaching via
530 // a types changed event
531 boolean finished = false;
532 synchronized (this) {
533 if (_maintainCache) {
534 if (result) {
535 Integer index = Numbers.valueOf(_pos);
536 if (!_data.containsKey(index)) {
537 Object cached;
538 if (obj == null)
539 cached = null;
540 else if (!_proj)
541 cached = _sctx.getObjectId(obj);
542 else {
543 Object[] arr = (Object[]) obj;
544 Object[] cp = new Object[arr.length];
545 for (int i = 0; i < arr.length; i++)
546 cp[i] = copyProjection(arr[i], _sctx);
547 cached = cp;
548 }
549 if (cached != null)
550 _data.put(index, cached);
551 }
552 }
553 finished = _size == _data.size();
554 }
555 }
556
557 if (finished) {
558 // an abortCaching call can sneak in here via onExpire; the
559 // cache is locked during event firings, so the lock here will
560 // wait for it (or will force the next firing to wait)
561 _cache.writeLock();
562 try {
563 // make sure we didn't abort
564 if (_maintainCache) {
565 QueryResult res = new QueryResult(_qk, _data.values());
566 _cache.put(_qk, res);
567 abortCaching();
568 }
569 }
570 finally {
571 _cache.writeUnlock();
572 }
573 }
574 }
575
576 public boolean supportsRandomAccess() {
577 return _rop.supportsRandomAccess();
578 }
579
580 public void open()
581 throws Exception {
582 _rop.open();
583 }
584
585 public Object getResultObject()
586 throws Exception {
587 Object obj = _rop.getResultObject();
588 checkFinished(obj, true);
589 return obj;
590 }
591
592 public boolean next()
593 throws Exception {
594 _pos++;
595 boolean next = _rop.next();
596 if (!next && _pos == _max + 1) {
597 _size = _pos;
598 checkFinished(null, false);
599 } else if (next && _pos > _max)
600 _max = _pos;
601 return next;
602 }
603
604 public boolean absolute(int pos)
605 throws Exception {
606 _pos = pos;
607 boolean valid = _rop.absolute(pos);
608 if (!valid && _pos == _max + 1) {
609 _size = _pos;
610 checkFinished(null, false);
611 } else if (valid && _pos > _max)
612 _max = _pos;
613 return valid;
614 }
615
616 public int size()
617 throws Exception {
618 if (_size != Integer.MAX_VALUE)
619 return _size;
620 int size = _rop.size();
621 _size = size;
622 checkFinished(null, false);
623 return size;
624 }
625
626 public void reset()
627 throws Exception {
628 _rop.reset();
629 _pos = -1;
630 }
631
632 public void close()
633 throws Exception {
634 abortCaching();
635 _rop.close();
636 }
637
638 public void handleCheckedException(Exception e) {
639 _rop.handleCheckedException(e);
640 }
641
642 public void onTypesChanged(TypesChangedEvent ev) {
643 if (_qk.changeInvalidatesQuery(ev.getTypes()))
644 abortCaching();
645 }
646 }
647
648 /**
649 * Struct to recognize cached oids.
650 */
651 private static class CachedObjectId {
652
653 public final Object oid;
654
655 public CachedObjectId (Object oid)
656 {
657 this.oid = oid;
658 }
659 }
660 }