1 // $Id: ActionQueue.java 9194 2006-02-01 19:59:07Z steveebersole $
2 package org.hibernate.engine;
3
4 import org.hibernate.action.EntityInsertAction;
5 import org.hibernate.action.EntityDeleteAction;
6 import org.hibernate.action.Executable;
7 import org.hibernate.action.EntityUpdateAction;
8 import org.hibernate.action.CollectionRecreateAction;
9 import org.hibernate.action.CollectionRemoveAction;
10 import org.hibernate.action.CollectionUpdateAction;
11 import org.hibernate.action.EntityIdentityInsertAction;
12 import org.hibernate.action.BulkOperationCleanupAction;
13 import org.hibernate.HibernateException;
14 import org.hibernate.AssertionFailure;
15 import org.hibernate.cache.CacheException;
16 import org.apache.commons.logging.Log;
17 import org.apache.commons.logging.LogFactory;
18
19 import java.util.ArrayList;
20 import java.util.List;
21 import java.util.Set;
22 import java.io.ObjectInputStream;
23 import java.io.IOException;
24 import java.io.Serializable;
25 import java.io.ObjectOutputStream;
26
27 /**
28 * Responsible for maintaining the queue of actions related to events.
29 * </p>
30 * The ActionQueue holds the DML operations queued as part of a session's
31 * transactional-write-behind semantics. DML operations are queued here
32 * until a flush forces them to be executed against the database.
33 *
34 * @author Steve Ebersole
35 */
36 public class ActionQueue {
37
38 private static final Log log = LogFactory.getLog( ActionQueue.class );
39 private static final int INIT_QUEUE_LIST_SIZE = 5;
40
41 private SessionImplementor session;
42
43 // Object insertions, updates, and deletions have list semantics because
44 // they must happen in the right order so as to respect referential
45 // integrity
46 private ArrayList insertions;
47 private ArrayList deletions;
48 private ArrayList updates;
49 // Actually the semantics of the next three are really "Bag"
50 // Note that, unlike objects, collection insertions, updates,
51 // deletions are not really remembered between flushes. We
52 // just re-use the same Lists for convenience.
53 private ArrayList collectionCreations;
54 private ArrayList collectionUpdates;
55 private ArrayList collectionRemovals;
56
57 private ArrayList executions;
58
59 /**
60 * Constructs an action queue bound to the given session.
61 *
62 * @param session The session "owning" this queue.
63 */
64 public ActionQueue(SessionImplementor session) {
65 this.session = session;
66 init();
67 }
68
69 private void init() {
70 insertions = new ArrayList( INIT_QUEUE_LIST_SIZE );
71 deletions = new ArrayList( INIT_QUEUE_LIST_SIZE );
72 updates = new ArrayList( INIT_QUEUE_LIST_SIZE );
73
74 collectionCreations = new ArrayList( INIT_QUEUE_LIST_SIZE );
75 collectionRemovals = new ArrayList( INIT_QUEUE_LIST_SIZE );
76 collectionUpdates = new ArrayList( INIT_QUEUE_LIST_SIZE );
77
78 executions = new ArrayList( INIT_QUEUE_LIST_SIZE * 3 );
79 }
80
81 public void clear() {
82 updates.clear();
83 insertions.clear();
84 deletions.clear();
85
86 collectionCreations.clear();
87 collectionRemovals.clear();
88 collectionUpdates.clear();
89 }
90
91 public void addAction(EntityInsertAction action) {
92 insertions.add( action );
93 }
94
95 public void addAction(EntityDeleteAction action) {
96 deletions.add( action );
97 }
98
99 public void addAction(EntityUpdateAction action) {
100 updates.add( action );
101 }
102
103 public void addAction(CollectionRecreateAction action) {
104 collectionCreations.add( action );
105 }
106
107 public void addAction(CollectionRemoveAction action) {
108 collectionRemovals.add( action );
109 }
110
111 public void addAction(CollectionUpdateAction action) {
112 collectionUpdates.add( action );
113 }
114
115 public void addAction(EntityIdentityInsertAction insert) {
116 insertions.add( insert );
117 }
118
119 public void addAction(BulkOperationCleanupAction cleanupAction) {
120 // Add these directly to the executions queue
121 executions.add( cleanupAction );
122 }
123
124 /**
125 * Perform all currently queued entity-insertion actions.
126 *
127 * @throws HibernateException error executing queued insertion actions.
128 */
129 public void executeInserts() throws HibernateException {
130 executeActions( insertions );
131 }
132
133 /**
134 * Perform all currently queued actions.
135 *
136 * @throws HibernateException error executing queued actions.
137 */
138 public void executeActions() throws HibernateException {
139 executeActions( insertions );
140 executeActions( updates );
141 executeActions( collectionRemovals );
142 executeActions( collectionUpdates );
143 executeActions( collectionCreations );
144 executeActions( deletions );
145 }
146
147 /**
148 * Prepares the internal action queues for execution.
149 *
150 * @throws HibernateException error preparing actions.
151 */
152 public void prepareActions() throws HibernateException {
153 prepareActions( collectionRemovals );
154 prepareActions( collectionUpdates );
155 prepareActions( collectionCreations );
156 }
157
158 /**
159 * Performs cleanup of any held cache softlocks.
160 *
161 * @param success Was the transaction successful.
162 */
163 public void afterTransactionCompletion(boolean success) {
164 int size = executions.size();
165 final boolean invalidateQueryCache = session.getFactory().getSettings().isQueryCacheEnabled();
166 for ( int i = 0; i < size; i++ ) {
167 try {
168 Executable exec = ( Executable ) executions.get(i);
169 try {
170 exec.afterTransactionCompletion( success );
171 }
172 finally {
173 if ( invalidateQueryCache ) {
174 session.getFactory().getUpdateTimestampsCache().invalidate( exec.getPropertySpaces() );
175 }
176 }
177 }
178 catch (CacheException ce) {
179 log.error( "could not release a cache lock", ce );
180 // continue loop
181 }
182 catch (Exception e) {
183 throw new AssertionFailure( "Exception releasing cache locks", e );
184 }
185 }
186 executions.clear();
187 }
188
189 /**
190 * Check whether the given tables/query-spaces are to be executed against
191 * given the currently queued actions.
192 *
193 * @param tables The table/query-spaces to check.
194 * @return True if we contain pending actions against any of the given
195 * tables; false otherwise.
196 */
197 public boolean areTablesToBeUpdated(Set tables) {
198 return areTablesToUpdated( updates, tables ) ||
199 areTablesToUpdated( insertions, tables ) ||
200 areTablesToUpdated( deletions, tables ) ||
201 areTablesToUpdated( collectionUpdates, tables ) ||
202 areTablesToUpdated( collectionCreations, tables ) ||
203 areTablesToUpdated( collectionRemovals, tables );
204 }
205
206 /**
207 * Check whether any insertion or deletion actions are currently queued.
208 *
209 * @return True if insertions or deletions are currently queued; false otherwise.
210 */
211 public boolean areInsertionsOrDeletionsQueued() {
212 return ( insertions.size() > 0 || deletions.size() > 0 );
213 }
214
215 private static boolean areTablesToUpdated(List executables, Set tablespaces) {
216 int size = executables.size();
217 for ( int j = 0; j < size; j++ ) {
218 Serializable[] spaces = ( (Executable) executables.get(j) ).getPropertySpaces();
219 for ( int i = 0; i < spaces.length; i++ ) {
220 if ( tablespaces.contains( spaces[i] ) ) {
221 if ( log.isDebugEnabled() ) log.debug( "changes must be flushed to space: " + spaces[i] );
222 return true;
223 }
224 }
225 }
226 return false;
227 }
228
229 private void executeActions(List list) throws HibernateException {
230 int size = list.size();
231 for ( int i = 0; i < size; i++ ) {
232 execute( (Executable) list.get(i) );
233 }
234 list.clear();
235 session.getBatcher().executeBatch();
236 }
237
238 public void execute(Executable executable) {
239 final boolean lockQueryCache = session.getFactory().getSettings().isQueryCacheEnabled();
240 if ( executable.hasAfterTransactionCompletion() || lockQueryCache ) {
241 executions.add( executable );
242 }
243 if (lockQueryCache) {
244 session.getFactory()
245 .getUpdateTimestampsCache()
246 .preinvalidate( executable.getPropertySpaces() );
247 }
248 executable.execute();
249 }
250
251 private void prepareActions(List queue) throws HibernateException {
252 int size = queue.size();
253 for ( int i=0; i<size; i++ ) {
254 Executable executable = ( Executable ) queue.get(i);
255 executable.beforeExecutions();
256 }
257 }
258
259 /**
260 * Returns a string representation of the object.
261 *
262 * @return a string representation of the object.
263 */
264 public String toString() {
265 return new StringBuffer()
266 .append("ActionQueue[insertions=").append(insertions)
267 .append(" updates=").append(updates)
268 .append(" deletions=").append(deletions)
269 .append(" collectionCreations=").append(collectionCreations)
270 .append(" collectionRemovals=").append(collectionRemovals)
271 .append(" collectionUpdates=").append(collectionUpdates)
272 .append("]")
273 .toString();
274 }
275
276 public int numberOfCollectionRemovals() {
277 return collectionRemovals.size();
278 }
279
280 public int numberOfCollectionUpdates() {
281 return collectionUpdates.size();
282 }
283
284 public int numberOfCollectionCreations() {
285 return collectionCreations.size();
286 }
287
288 public int numberOfDeletions() {
289 return deletions.size();
290 }
291
292 public int numberOfUpdates() {
293 return updates.size();
294 }
295
296 public int numberOfInsertions() {
297 return insertions.size();
298 }
299
300 public void sortCollectionActions() {
301 if ( session.getFactory().getSettings().isOrderUpdatesEnabled() ) {
302 //sort the updates by fk
303 java.util.Collections.sort( collectionCreations );
304 java.util.Collections.sort( collectionUpdates );
305 java.util.Collections.sort( collectionRemovals );
306 }
307 }
308
309 public void sortUpdateActions() {
310 if ( session.getFactory().getSettings().isOrderUpdatesEnabled() ) {
311 //sort the updates by pk
312 java.util.Collections.sort( updates );
313 }
314 }
315
316 public ArrayList cloneDeletions() {
317 return (ArrayList) deletions.clone();
318 }
319
320 public void clearFromFlushNeededCheck(int previousCollectionRemovalSize) {
321 collectionCreations.clear();
322 collectionUpdates.clear();
323 updates.clear();
324 // collection deletions are a special case since update() can add
325 // deletions of collections not loaded by the session.
326 for ( int i = collectionRemovals.size()-1; i >= previousCollectionRemovalSize; i-- ) {
327 collectionRemovals.remove(i);
328 }
329 }
330
331 public boolean hasAnyQueuedActions() {
332 return updates.size() > 0 ||
333 insertions.size() > 0 ||
334 deletions.size() > 0 ||
335 collectionUpdates.size() > 0 ||
336 collectionRemovals.size() > 0 ||
337 collectionCreations.size() > 0;
338 }
339
340 /**
341 * Used by the owning session to explicitly control serialization of the
342 * action queue
343 *
344 * @param oos The stream to which the action queue should get written
345 * @throws IOException
346 */
347 public void serialize(ObjectOutputStream oos) throws IOException {
348 log.trace( "serializing action-queue" );
349
350 int queueSize = insertions.size();
351 log.trace( "starting serialization of [" + queueSize + "] insertions entries" );
352 oos.writeInt( queueSize );
353 for ( int i = 0; i < queueSize; i++ ) {
354 oos.writeObject( insertions.get( i ) );
355 }
356
357 queueSize = deletions.size();
358 log.trace( "starting serialization of [" + queueSize + "] deletions entries" );
359 oos.writeInt( queueSize );
360 for ( int i = 0; i < queueSize; i++ ) {
361 oos.writeObject( deletions.get( i ) );
362 }
363
364 queueSize = updates.size();
365 log.trace( "starting serialization of [" + queueSize + "] updates entries" );
366 oos.writeInt( queueSize );
367 for ( int i = 0; i < queueSize; i++ ) {
368 oos.writeObject( updates.get( i ) );
369 }
370
371 queueSize = collectionUpdates.size();
372 log.trace( "starting serialization of [" + queueSize + "] collectionUpdates entries" );
373 oos.writeInt( queueSize );
374 for ( int i = 0; i < queueSize; i++ ) {
375 oos.writeObject( collectionUpdates.get( i ) );
376 }
377
378 queueSize = collectionRemovals.size();
379 log.trace( "starting serialization of [" + queueSize + "] collectionRemovals entries" );
380 oos.writeInt( queueSize );
381 for ( int i = 0; i < queueSize; i++ ) {
382 oos.writeObject( collectionRemovals.get( i ) );
383 }
384
385 queueSize = collectionCreations.size();
386 log.trace( "starting serialization of [" + queueSize + "] collectionCreations entries" );
387 oos.writeInt( queueSize );
388 for ( int i = 0; i < queueSize; i++ ) {
389 oos.writeObject( collectionCreations.get( i ) );
390 }
391 }
392
393 /**
394 * Used by the owning session to explicitly control deserialization of the
395 * action queue
396 *
397 * @param ois The stream from which to read the action queue
398 * @throws IOException
399 */
400 public static ActionQueue deserialize(
401 ObjectInputStream ois,
402 SessionImplementor session) throws IOException, ClassNotFoundException {
403 log.trace( "deserializing action-queue" );
404 ActionQueue rtn = new ActionQueue( session );
405
406 int queueSize = ois.readInt();
407 log.trace( "starting deserialization of [" + queueSize + "] insertions entries" );
408 rtn.insertions = new ArrayList( queueSize );
409 for ( int i = 0; i < queueSize; i++ ) {
410 rtn.insertions.add( ois.readObject() );
411 }
412
413 queueSize = ois.readInt();
414 log.trace( "starting deserialization of [" + queueSize + "] deletions entries" );
415 rtn.deletions = new ArrayList( queueSize );
416 for ( int i = 0; i < queueSize; i++ ) {
417 rtn.deletions.add( ois.readObject() );
418 }
419
420 queueSize = ois.readInt();
421 log.trace( "starting deserialization of [" + queueSize + "] updates entries" );
422 rtn.updates = new ArrayList( queueSize );
423 for ( int i = 0; i < queueSize; i++ ) {
424 rtn.updates.add( ois.readObject() );
425 }
426
427 queueSize = ois.readInt();
428 log.trace( "starting deserialization of [" + queueSize + "] collectionUpdates entries" );
429 rtn.collectionUpdates = new ArrayList( queueSize );
430 for ( int i = 0; i < queueSize; i++ ) {
431 rtn.collectionUpdates.add( ois.readObject() );
432 }
433
434 queueSize = ois.readInt();
435 log.trace( "starting deserialization of [" + queueSize + "] collectionRemovals entries" );
436 rtn.collectionRemovals = new ArrayList( queueSize );
437 for ( int i = 0; i < queueSize; i++ ) {
438 rtn.collectionRemovals.add( ois.readObject() );
439 }
440
441 queueSize = ois.readInt();
442 log.trace( "starting deserialization of [" + queueSize + "] collectionCreations entries" );
443 rtn.collectionCreations = new ArrayList( queueSize );
444 for ( int i = 0; i < queueSize; i++ ) {
445 rtn.collectionCreations.add( ois.readObject() );
446 }
447 return rtn;
448 }
449
450 }