Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/objectstyle/cayenne/dataport/DataPort.java


1   /* ====================================================================
2    *
3    * The ObjectStyle Group Software License, Version 1.0
4    *
5    * Copyright (c) 2002-2003 The ObjectStyle Group
6    * and individual authors of the software.  All rights reserved.
7    *
8    * Redistribution and use in source and binary forms, with or without
9    * modification, are permitted provided that the following conditions
10   * are met:
11   *
12   * 1. Redistributions of source code must retain the above copyright
13   *    notice, this list of conditions and the following disclaimer.
14   *
15   * 2. Redistributions in binary form must reproduce the above copyright
16   *    notice, this list of conditions and the following disclaimer in
17   *    the documentation and/or other materials provided with the
18   *    distribution.
19   *
20   * 3. The end-user documentation included with the redistribution, if
21   *    any, must include the following acknowlegement:
22   *       "This product includes software developed by the
23   *        ObjectStyle Group (http://objectstyle.org/)."
24   *    Alternately, this acknowlegement may appear in the software itself,
25   *    if and wherever such third-party acknowlegements normally appear.
26   *
27   * 4. The names "ObjectStyle Group" and "Cayenne"
28   *    must not be used to endorse or promote products derived
29   *    from this software without prior written permission. For written
30   *    permission, please contact andrus@objectstyle.org.
31   *
32   * 5. Products derived from this software may not be called "ObjectStyle"
33   *    nor may "ObjectStyle" appear in their names without prior written
34   *    permission of the ObjectStyle Group.
35   *
36   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39   * DISCLAIMED.  IN NO EVENT SHALL THE OBJECTSTYLE GROUP OR
40   * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41   * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42   * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43   * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45   * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46   * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47   * SUCH DAMAGE.
48   * ====================================================================
49   *
50   * This software consists of voluntary contributions made by many
51   * individuals on behalf of the ObjectStyle Group.  For more
52   * information on the ObjectStyle Group, please see
53   * <http://objectstyle.org/>.
54   *
55   */
56  package org.objectstyle.cayenne.dataport;
57  
58  import java.util.ArrayList;
59  import java.util.Collections;
60  import java.util.Iterator;
61  import java.util.List;
62  import java.util.Map;
63  
64  import org.objectstyle.cayenne.CayenneException;
65  import org.objectstyle.cayenne.access.DataNode;
66  import org.objectstyle.cayenne.access.QueryResult;
67  import org.objectstyle.cayenne.access.ResultIterator;
68  import org.objectstyle.cayenne.access.util.IteratedSelectObserver;
69  import org.objectstyle.cayenne.map.DbEntity;
70  import org.objectstyle.cayenne.map.DerivedDbEntity;
71  import org.objectstyle.cayenne.query.DeleteQuery;
72  import org.objectstyle.cayenne.query.InsertBatchQuery;
73  import org.objectstyle.cayenne.query.SelectQuery;
74  
75  /**
76   * Engine to port data between two DataNodes. These nodes can potentially connect
77   * to databases from different vendors. The only assumption is that all of the 
78   * DbEntities (tables) being ported are present in both source and destination databases, 
79   * and are adequately described by Cayenne mapping. 
80   * 
81   * <p>DataPort implements a Cayenne-based algorithm to read data from one data node
82   * and write to another. It uses DataPortDelegate interface to decouple porting logic 
83   * from such things like filtering entities (include/exclude from port based on some criteria),
84   * logging the progress of port operation, qualifying the queries, etc. It is possible to build 
85   * various configurable interfaces for this tool. E.g. Cayenne implements CayenneDataPort 
86   * Ant task.
87   * </p>
88   * 
89   * @author Andrei Adamchik
90   */
91  public class DataPort {
92      public static final int INSERT_BATCH_SIZE = 1000;
93  
94      protected DataNode sourceNode;
95      protected DataNode destinationNode;
96      protected List entities;
97      protected boolean cleaningDestination;
98      protected DataPortDelegate delegate;
99  
100     /**
101      * Creates new DataPort instance, initializing it 
102      * with a DataPortDelegate.  
103      */
104     public DataPort(DataPortDelegate delegate) {
105         super();
106         this.delegate = delegate;
107     }
108 
109     /**
110      * Runs DataPort. The instance must be fully configured by the time this method
111      * is invoked, having its delegate, source and destinatio nodes, and a list of entities 
112      * set up. 
113      */
114     public void execute() throws Exception {
115         // sanity check
116         if (sourceNode == null) {
117             throw new CayenneException("Can't port data, source node is null.");
118         }
119 
120         if (destinationNode == null) {
121             throw new CayenneException("Can't port data, destination node is null.");
122         }
123 
124         // the simple equality check may actually detect problems with misconfigred nodes
125         // it is not as dumb as it may look at first
126         if (sourceNode == destinationNode) {
127             throw new CayenneException("Can't port data, source and target nodes are the same.");
128         }
129 
130         if (entities == null || entities.isEmpty()) {
131             return;
132         }
133 
134         // sort entities for insertion
135         List sorted = new ArrayList(entities);
136         destinationNode.getDependencySorter().sortDbEntities(sorted, false);
137 
138         if (cleaningDestination) {
139             // reverse insertion order for deletion
140             List entitiesInDeleteOrder = new ArrayList(sorted.size());
141             entitiesInDeleteOrder.addAll(sorted);
142             Collections.reverse(entitiesInDeleteOrder);
143             processDelete(entitiesInDeleteOrder);
144         }
145 
146         processInsert(sorted);
147     }
148 
149     /**
150      * Cleans up destination tables data.
151      */
152     protected void processDelete(List entities) {
153         // Allow delegate to modify the list of entities
154         // any way it wants. For instance delegate may filter 
155         // or sort the list (though it doesn't have to, and can simply
156         // pass through the original list). 
157         if (delegate != null) {
158             entities = delegate.willCleanData(this, entities);
159         }
160 
161         if (entities == null || entities.isEmpty()) {
162             return;
163         }
164 
165         // Using QueryResult as observer for the data cleanup.
166         // This allows to collect query statistics and pass it to the delegate.
167         QueryResult observer = new QueryResult();
168 
169         // Delete data from entities one by one
170         Iterator it = entities.iterator();
171         while (it.hasNext()) {
172             DbEntity entity = (DbEntity) it.next();
173 
174             // skip derived DbEntities. Should we consult delegate ?
175             // Using derived entities may allow things like materialized views....
176             if (entity instanceof DerivedDbEntity) {
177                 continue;
178             }
179 
180             DeleteQuery query = new DeleteQuery();
181 
182             // using DbEntity as a query root is unusual but will still work with Cayenne
183             query.setRoot(entity);
184 
185             // notify delegate that delete is about to happen
186             if (delegate != null) {
187                 delegate.willCleanData(this, entity, query);
188             }
189 
190             // perform delete query
191             observer.clear();
192             destinationNode.performQuery(query, observer);
193 
194             // notify delegate that delete just happened
195             if (delegate != null) {
196                 // observer will store query statistics
197                 int count = observer.getFirstUpdateCount(query);
198                 delegate.didCleanData(this, entity, count);
199             }
200         }
201     }
202 
203     /**
204      * Reads source data from source, saving it to destination.
205      */
206     protected void processInsert(List entities) throws Exception {
207         // Allow delegate to modify the list of entities
208         // any way it wants. For instance delegate may filter 
209         // or sort the list (though it doesn't have to, and can simply
210         // pass through the original list). 
211         if (delegate != null) {
212             entities = delegate.willCleanData(this, entities);
213         }
214 
215         if (entities == null || entities.isEmpty()) {
216             return;
217         }
218 
219         // Create an observer for to get the iterated result
220         // instead of getting each table as a list
221         IteratedSelectObserver observer = new IteratedSelectObserver();
222 
223         // Using QueryResult as observer for the data cleanup.
224         // This allows to collect query statistics and pass it to the delegate.
225         QueryResult insertObserver = new QueryResult();
226 
227         // process ordered list of entities one by one
228         Iterator it = entities.iterator();
229         while (it.hasNext()) {
230             insertObserver.clear();
231 
232             DbEntity entity = (DbEntity) it.next();
233 
234             // skip derived DbEntities. Should we consult delegate ?
235             // Using derived entities may allow things like materialized views....
236             if (entity instanceof DerivedDbEntity) {
237                 continue;
238             }
239 
240             SelectQuery select = new SelectQuery();
241             select.setRoot(entity);
242             select.setFetchingDataRows(true);
243 
244             sourceNode.performQuery(select, observer);
245             ResultIterator result = observer.getResultIterator();
246             InsertBatchQuery insert =
247                 new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
248 
249             if (delegate != null) {
250                 delegate.willPortEntity(this, entity, select);
251             }
252 
253             try {
254 
255                 // Split insertions into the same table into batches of 1000. 
256                 // This will allow to process tables of arbitrary big size
257                 // without running out of memory. 
258                 int currentRow = 0;
259 
260                 while (result.hasNextRow()) {
261                     if (currentRow > 0
262                         && currentRow % INSERT_BATCH_SIZE == 0) {
263                         // end of the batch detected... commit and start a new insert query
264                         destinationNode.performQuery(insert, insertObserver);
265                         insert =
266                             new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
267                         insertObserver.clear();
268                     }
269 
270                     currentRow++;
271 
272                     Map nextRow = result.nextDataRow();
273                     insert.add(nextRow);
274                 }
275 
276                 // commit remaining batch if needed
277                 if (insert.size() > 0) {
278                     destinationNode.performQuery(insert, insertObserver);
279                 }
280 
281                 if (delegate != null) {
282                     delegate.didPortEntity(this, entity, currentRow);
283                 }
284             } finally {
285                 try {
286                     // don't forget to close ResultIterator
287                     result.close();
288                 } catch (CayenneException ex) {
289                 }
290             }
291         }
292     }
293 
294     public List getEntities() {
295         return entities;
296     }
297 
298     public DataNode getSourceNode() {
299         return sourceNode;
300     }
301 
302     public DataNode getDestinationNode() {
303         return destinationNode;
304     }
305 
306     /**
307      * Sets the initial list of entities to process. This list can
308      * be later modified by the delegate.
309      */
310     public void setEntities(List entities) {
311         this.entities = entities;
312     }
313 
314     /**
315      * Sets the DataNode serving as a source of the ported data.
316      */
317     public void setSourceNode(DataNode sourceNode) {
318         this.sourceNode = sourceNode;
319     }
320 
321     /**
322      * Sets the DataNode serving as a destination of the ported data.
323      */
324     public void setDestinationNode(DataNode destinationNode) {
325         this.destinationNode = destinationNode;
326     }
327 
328     public DataPortDelegate getDelegate() {
329         return delegate;
330     }
331 
332     public void setDelegate(DataPortDelegate delegate) {
333         this.delegate = delegate;
334     }
335 
336     public boolean isCleaningDestination() {
337         return cleaningDestination;
338     }
339 
340     /**
341      * Defines whether DataPort should delete all data from destination
342      * tables before doing the port. 
343      */
344     public void setCleaningDestination(boolean cleaningDestination) {
345         this.cleaningDestination = cleaningDestination;
346     }
347 
348 }