1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19 package org.apache.openjpa.jdbc.kernel;
20
21 import java.sql.Connection;
22 import java.sql.PreparedStatement;
23 import java.sql.Statement;
24 import java.sql.SQLException;
25 import java.util.ArrayList;
26 import java.util.Iterator;
27 import java.util.List;
28
29 import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
30 import org.apache.openjpa.jdbc.meta.ClassMapping;
31 import org.apache.openjpa.jdbc.schema.Column;
32 import org.apache.openjpa.jdbc.sql.Row;
33 import org.apache.openjpa.jdbc.sql.RowImpl;
34 import org.apache.openjpa.jdbc.sql.SQLExceptions;
35 import org.apache.openjpa.kernel.OpenJPAStateManager;
36 import org.apache.openjpa.lib.log.Log;
37 import org.apache.openjpa.lib.util.Localizer;
38 import org.apache.openjpa.util.OptimisticException;
39
40 /**
41 * Batch prepared statement manager implementation. This prepared statement
42 * manager will utilize the JDBC addBatch() and exceuteBatch() to batch the SQL
43 * statements together to improve the execution performance.
44 *
45 * @author Teresa Kan
46 */
47
48 public class BatchingPreparedStatementManagerImpl extends
49 PreparedStatementManagerImpl {
50
51 private final static Localizer _loc = Localizer
52 .forPackage(BatchingPreparedStatementManagerImpl.class);
53
54 private String _batchedSql = null;
55 private List _batchedRows = new ArrayList();
56 private int _batchLimit;
57 private boolean _disableBatch = false;
58 private transient Log _log = null;
59
60 /**
61 * Constructor. Supply connection.
62 */
63 public BatchingPreparedStatementManagerImpl(JDBCStore store,
64 Connection conn, int batchLimit) {
65 super(store, conn);
66 _batchLimit = batchLimit;
67 _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC);
68 if (_log.isTraceEnabled())
69 _log.trace(_loc.get("batch_limit", String.valueOf(_batchLimit)));
70 }
71
72 /**
73 * Flush the given row immediately or deferred the flush in batch.
74 */
75 protected void flushAndUpdate(RowImpl row) throws SQLException {
76 if (isBatchDisabled(row)) {
77 // if there were some statements batched before, then
78 // we need to flush them out first before processing the
79 // current non batch process.
80 flushBatch();
81
82 super.flushAndUpdate(row);
83 } else {
84 // process the SQL statement, either execute it immediately or
85 // batch it for later execution.
86 String sql = row.getSQL(_dict);
87 if (_batchedSql == null) {
88 // brand new SQL
89 _batchedSql = sql;
90 } else if (!sql.equals(_batchedSql)) {
91 // SQL statements changed.
92 switch (_batchedRows.size()) {
93 case 0:
94 break;
95 case 1:
96 // single entry in cache, direct SQL execution.
97 super.flushAndUpdate((RowImpl) _batchedRows.get(0));
98 _batchedRows.clear();
99 break;
100 default:
101 // flush all entries in cache in batch.
102 flushBatch();
103 }
104 _batchedSql = sql;
105 }
106 _batchedRows.add(row);
107 }
108 }
109
110 /*
111 * Compute if batching is disabled, based on values of batch limit
112 * and database characteristics.
113 */
114 private boolean isBatchDisabled(RowImpl row) {
115 boolean rtnVal = true;
116 if (_batchLimit != 0 && !_disableBatch) {
117 String sql = row.getSQL(_dict);
118 OpenJPAStateManager sm = row.getPrimaryKey();
119 ClassMapping cmd = null;
120 if (sm != null)
121 cmd = (ClassMapping) sm.getMetaData();
122 Column[] autoAssign = null;
123 if (row.getAction() == Row.ACTION_INSERT)
124 autoAssign = row.getTable().getAutoAssignedColumns();
125 // validate batch capability
126 _disableBatch = _dict
127 .validateBatchProcess(row, autoAssign, sm, cmd);
128 rtnVal = _disableBatch;
129 }
130 return rtnVal;
131 }
132
133 /**
134 * flush all cached up statements to be executed as a single or batched
135 * prepared statements.
136 */
137 protected void flushBatch() {
138 if (_batchedSql != null && _batchedRows.size() > 0) {
139 PreparedStatement ps = null;
140 try {
141 RowImpl onerow = null;
142 ps = _conn.prepareStatement(_batchedSql);
143 if (_batchedRows.size() == 1) {
144 // execute a single row.
145 onerow = (RowImpl) _batchedRows.get(0);
146 flushSingleRow(onerow, ps);
147 } else {
148 // cache has more than one rows, execute as batch.
149 int count = 0;
150 int batchedRowsBaseIndex = 0;
151 Iterator itr = _batchedRows.iterator();
152 while (itr.hasNext()) {
153 onerow = (RowImpl) itr.next();
154 if (_batchLimit == 1) {
155 flushSingleRow(onerow, ps);
156 } else {
157 if (count < _batchLimit || _batchLimit == -1) {
158 onerow.flush(ps, _dict, _store);
159 ps.addBatch();
160 count++;
161 } else {
162 // reach the batchLimit, execute the batch
163 int[] rtn = ps.executeBatch();
164 checkUpdateCount(rtn, batchedRowsBaseIndex);
165
166 batchedRowsBaseIndex += _batchLimit;
167
168 onerow.flush(ps, _dict, _store);
169 ps.addBatch();
170 // reset the count to 1 for new batch
171 count = 1;
172 }
173 }
174 }
175 // end of the loop, execute the batch
176 int[] rtn = ps.executeBatch();
177 checkUpdateCount(rtn, batchedRowsBaseIndex);
178 }
179 } catch (SQLException se) {
180 SQLException sqex = se.getNextException();
181 if (sqex == null)
182 sqex = se;
183 throw SQLExceptions.getStore(sqex, ps, _dict);
184 } finally {
185 _batchedSql = null;
186 _batchedRows.clear();
187 if (ps != null) {
188 try {
189 ps.close();
190 } catch (SQLException sqex) {
191 throw SQLExceptions.getStore(sqex, ps, _dict);
192 }
193 }
194 }
195 }
196 }
197
198 /*
199 * Execute an update of a single row.
200 */
201 private void flushSingleRow(RowImpl row, PreparedStatement ps)
202 throws SQLException {
203 row.flush(ps, _dict, _store);
204 int count = ps.executeUpdate();
205 if (count != 1) {
206 Object failed = row.getFailedObject();
207 if (failed != null)
208 _exceptions.add(new OptimisticException(failed));
209 else if (row.getAction() == Row.ACTION_INSERT)
210 throw new SQLException(_loc.get("update-failed-no-failed-obj",
211 String.valueOf(count), row.getSQL(_dict)).getMessage());
212 }
213 }
214
215 /*
216 * Process executeBatch function array of return counts.
217 */
218 private void checkUpdateCount(int[] count, int batchedRowsBaseIndex)
219 throws SQLException {
220 int cnt = 0;
221 Object failed = null;
222 for (int i = 0; i < count.length; i++) {
223 cnt = count[i];
224 RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
225 switch (cnt) {
226 case Statement.EXECUTE_FAILED: // -3
227 failed = row.getFailedObject();
228 if (failed != null || row.getAction() == Row.ACTION_UPDATE)
229 _exceptions.add(new OptimisticException(failed));
230 else if (row.getAction() == Row.ACTION_INSERT)
231 throw new SQLException(_loc.get(
232 "update-failed-no-failed-obj",
233 String.valueOf(count[i]), _batchedSql).getMessage());
234 break;
235 case Statement.SUCCESS_NO_INFO: // -2
236 if (_log.isTraceEnabled())
237 _log.trace(_loc.get("batch_update_info",
238 String.valueOf(cnt), _batchedSql).getMessage());
239 break;
240 case 0: // no row is inserted, treats it as failed
241 // case
242 failed = row.getFailedObject();
243 if ((failed != null || row.getAction() == Row.ACTION_INSERT))
244 throw new SQLException(_loc.get(
245 "update-failed-no-failed-obj",
246 String.valueOf(count[i]), _batchedSql).getMessage());
247 }
248 }
249 }
250 }