Source code: org/objectstyle/cayenne/dataport/DataPort.java
1 /* ====================================================================
2 *
3 * The ObjectStyle Group Software License, Version 1.0
4 *
5 * Copyright (c) 2002-2003 The ObjectStyle Group
6 * and individual authors of the software. All rights reserved.
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 *
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in
17 * the documentation and/or other materials provided with the
18 * distribution.
19 *
20 * 3. The end-user documentation included with the redistribution, if
21 * any, must include the following acknowlegement:
22 * "This product includes software developed by the
23 * ObjectStyle Group (http://objectstyle.org/)."
24 * Alternately, this acknowlegement may appear in the software itself,
25 * if and wherever such third-party acknowlegements normally appear.
26 *
27 * 4. The names "ObjectStyle Group" and "Cayenne"
28 * must not be used to endorse or promote products derived
29 * from this software without prior written permission. For written
30 * permission, please contact andrus@objectstyle.org.
31 *
32 * 5. Products derived from this software may not be called "ObjectStyle"
33 * nor may "ObjectStyle" appear in their names without prior written
34 * permission of the ObjectStyle Group.
35 *
36 * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
37 * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
38 * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
39 * DISCLAIMED. IN NO EVENT SHALL THE OBJECTSTYLE GROUP OR
40 * ITS CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
41 * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
42 * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
43 * USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
44 * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
45 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
46 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
47 * SUCH DAMAGE.
48 * ====================================================================
49 *
50 * This software consists of voluntary contributions made by many
51 * individuals on behalf of the ObjectStyle Group. For more
52 * information on the ObjectStyle Group, please see
53 * <http://objectstyle.org/>.
54 *
55 */
56 package org.objectstyle.cayenne.dataport;
57
58 import java.util.ArrayList;
59 import java.util.Collections;
60 import java.util.Iterator;
61 import java.util.List;
62 import java.util.Map;
63
64 import org.objectstyle.cayenne.CayenneException;
65 import org.objectstyle.cayenne.access.DataNode;
66 import org.objectstyle.cayenne.access.QueryResult;
67 import org.objectstyle.cayenne.access.ResultIterator;
68 import org.objectstyle.cayenne.access.util.IteratedSelectObserver;
69 import org.objectstyle.cayenne.map.DbEntity;
70 import org.objectstyle.cayenne.map.DerivedDbEntity;
71 import org.objectstyle.cayenne.query.DeleteQuery;
72 import org.objectstyle.cayenne.query.InsertBatchQuery;
73 import org.objectstyle.cayenne.query.SelectQuery;
74
75 /**
76 * Engine to port data between two DataNodes. These nodes can potentially connect
77 * to databases from different vendors. The only assumption is that all of the
78 * DbEntities (tables) being ported are present in both source and destination databases,
79 * and are adequately described by Cayenne mapping.
80 *
81 * <p>DataPort implements a Cayenne-based algorithm to read data from one data node
82 * and write to another. It uses DataPortDelegate interface to decouple porting logic
83 * from such things like filtering entities (include/exclude from port based on some criteria),
84 * logging the progress of port operation, qualifying the queries, etc. It is possible to build
85 * various configurable interfaces for this tool. E.g. Cayenne implements CayenneDataPort
86 * Ant task.
87 * </p>
88 *
89 * @author Andrei Adamchik
90 */
91 public class DataPort {
92 public static final int INSERT_BATCH_SIZE = 1000;
93
94 protected DataNode sourceNode;
95 protected DataNode destinationNode;
96 protected List entities;
97 protected boolean cleaningDestination;
98 protected DataPortDelegate delegate;
99
100 /**
101 * Creates new DataPort instance, initializing it
102 * with a DataPortDelegate.
103 */
104 public DataPort(DataPortDelegate delegate) {
105 super();
106 this.delegate = delegate;
107 }
108
109 /**
110 * Runs DataPort. The instance must be fully configured by the time this method
111 * is invoked, having its delegate, source and destinatio nodes, and a list of entities
112 * set up.
113 */
114 public void execute() throws Exception {
115 // sanity check
116 if (sourceNode == null) {
117 throw new CayenneException("Can't port data, source node is null.");
118 }
119
120 if (destinationNode == null) {
121 throw new CayenneException("Can't port data, destination node is null.");
122 }
123
124 // the simple equality check may actually detect problems with misconfigred nodes
125 // it is not as dumb as it may look at first
126 if (sourceNode == destinationNode) {
127 throw new CayenneException("Can't port data, source and target nodes are the same.");
128 }
129
130 if (entities == null || entities.isEmpty()) {
131 return;
132 }
133
134 // sort entities for insertion
135 List sorted = new ArrayList(entities);
136 destinationNode.getDependencySorter().sortDbEntities(sorted, false);
137
138 if (cleaningDestination) {
139 // reverse insertion order for deletion
140 List entitiesInDeleteOrder = new ArrayList(sorted.size());
141 entitiesInDeleteOrder.addAll(sorted);
142 Collections.reverse(entitiesInDeleteOrder);
143 processDelete(entitiesInDeleteOrder);
144 }
145
146 processInsert(sorted);
147 }
148
149 /**
150 * Cleans up destination tables data.
151 */
152 protected void processDelete(List entities) {
153 // Allow delegate to modify the list of entities
154 // any way it wants. For instance delegate may filter
155 // or sort the list (though it doesn't have to, and can simply
156 // pass through the original list).
157 if (delegate != null) {
158 entities = delegate.willCleanData(this, entities);
159 }
160
161 if (entities == null || entities.isEmpty()) {
162 return;
163 }
164
165 // Using QueryResult as observer for the data cleanup.
166 // This allows to collect query statistics and pass it to the delegate.
167 QueryResult observer = new QueryResult();
168
169 // Delete data from entities one by one
170 Iterator it = entities.iterator();
171 while (it.hasNext()) {
172 DbEntity entity = (DbEntity) it.next();
173
174 // skip derived DbEntities. Should we consult delegate ?
175 // Using derived entities may allow things like materialized views....
176 if (entity instanceof DerivedDbEntity) {
177 continue;
178 }
179
180 DeleteQuery query = new DeleteQuery();
181
182 // using DbEntity as a query root is unusual but will still work with Cayenne
183 query.setRoot(entity);
184
185 // notify delegate that delete is about to happen
186 if (delegate != null) {
187 delegate.willCleanData(this, entity, query);
188 }
189
190 // perform delete query
191 observer.clear();
192 destinationNode.performQuery(query, observer);
193
194 // notify delegate that delete just happened
195 if (delegate != null) {
196 // observer will store query statistics
197 int count = observer.getFirstUpdateCount(query);
198 delegate.didCleanData(this, entity, count);
199 }
200 }
201 }
202
203 /**
204 * Reads source data from source, saving it to destination.
205 */
206 protected void processInsert(List entities) throws Exception {
207 // Allow delegate to modify the list of entities
208 // any way it wants. For instance delegate may filter
209 // or sort the list (though it doesn't have to, and can simply
210 // pass through the original list).
211 if (delegate != null) {
212 entities = delegate.willCleanData(this, entities);
213 }
214
215 if (entities == null || entities.isEmpty()) {
216 return;
217 }
218
219 // Create an observer for to get the iterated result
220 // instead of getting each table as a list
221 IteratedSelectObserver observer = new IteratedSelectObserver();
222
223 // Using QueryResult as observer for the data cleanup.
224 // This allows to collect query statistics and pass it to the delegate.
225 QueryResult insertObserver = new QueryResult();
226
227 // process ordered list of entities one by one
228 Iterator it = entities.iterator();
229 while (it.hasNext()) {
230 insertObserver.clear();
231
232 DbEntity entity = (DbEntity) it.next();
233
234 // skip derived DbEntities. Should we consult delegate ?
235 // Using derived entities may allow things like materialized views....
236 if (entity instanceof DerivedDbEntity) {
237 continue;
238 }
239
240 SelectQuery select = new SelectQuery();
241 select.setRoot(entity);
242 select.setFetchingDataRows(true);
243
244 sourceNode.performQuery(select, observer);
245 ResultIterator result = observer.getResultIterator();
246 InsertBatchQuery insert =
247 new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
248
249 if (delegate != null) {
250 delegate.willPortEntity(this, entity, select);
251 }
252
253 try {
254
255 // Split insertions into the same table into batches of 1000.
256 // This will allow to process tables of arbitrary big size
257 // without running out of memory.
258 int currentRow = 0;
259
260 while (result.hasNextRow()) {
261 if (currentRow > 0
262 && currentRow % INSERT_BATCH_SIZE == 0) {
263 // end of the batch detected... commit and start a new insert query
264 destinationNode.performQuery(insert, insertObserver);
265 insert =
266 new InsertBatchQuery(entity, INSERT_BATCH_SIZE);
267 insertObserver.clear();
268 }
269
270 currentRow++;
271
272 Map nextRow = result.nextDataRow();
273 insert.add(nextRow);
274 }
275
276 // commit remaining batch if needed
277 if (insert.size() > 0) {
278 destinationNode.performQuery(insert, insertObserver);
279 }
280
281 if (delegate != null) {
282 delegate.didPortEntity(this, entity, currentRow);
283 }
284 } finally {
285 try {
286 // don't forget to close ResultIterator
287 result.close();
288 } catch (CayenneException ex) {
289 }
290 }
291 }
292 }
293
294 public List getEntities() {
295 return entities;
296 }
297
298 public DataNode getSourceNode() {
299 return sourceNode;
300 }
301
302 public DataNode getDestinationNode() {
303 return destinationNode;
304 }
305
306 /**
307 * Sets the initial list of entities to process. This list can
308 * be later modified by the delegate.
309 */
310 public void setEntities(List entities) {
311 this.entities = entities;
312 }
313
314 /**
315 * Sets the DataNode serving as a source of the ported data.
316 */
317 public void setSourceNode(DataNode sourceNode) {
318 this.sourceNode = sourceNode;
319 }
320
321 /**
322 * Sets the DataNode serving as a destination of the ported data.
323 */
324 public void setDestinationNode(DataNode destinationNode) {
325 this.destinationNode = destinationNode;
326 }
327
328 public DataPortDelegate getDelegate() {
329 return delegate;
330 }
331
332 public void setDelegate(DataPortDelegate delegate) {
333 this.delegate = delegate;
334 }
335
336 public boolean isCleaningDestination() {
337 return cleaningDestination;
338 }
339
340 /**
341 * Defines whether DataPort should delete all data from destination
342 * tables before doing the port.
343 */
344 public void setCleaningDestination(boolean cleaningDestination) {
345 this.cleaningDestination = cleaningDestination;
346 }
347
348 }