Home » apache-openjpa-1.1.0-source » org.apache.openjpa.jdbc » kernel » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements.  See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership.  The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License.  You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing,
   13    * software distributed under the License is distributed on an
   14    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    * KIND, either express or implied.  See the License for the
   16    * specific language governing permissions and limitations
   17    * under the License.    
   18    */
   19   package org.apache.openjpa.jdbc.kernel;
   20   
   21   import java.sql.Connection;
   22   import java.sql.PreparedStatement;
   23   import java.sql.Statement;
   24   import java.sql.SQLException;
   25   import java.util.ArrayList;
   26   import java.util.Iterator;
   27   import java.util.List;
   28   
   29   import org.apache.openjpa.jdbc.conf.JDBCConfiguration;
   30   import org.apache.openjpa.jdbc.meta.ClassMapping;
   31   import org.apache.openjpa.jdbc.schema.Column;
   32   import org.apache.openjpa.jdbc.sql.Row;
   33   import org.apache.openjpa.jdbc.sql.RowImpl;
   34   import org.apache.openjpa.jdbc.sql.SQLExceptions;
   35   import org.apache.openjpa.kernel.OpenJPAStateManager;
   36   import org.apache.openjpa.lib.log.Log;
   37   import org.apache.openjpa.lib.util.Localizer;
   38   import org.apache.openjpa.util.OptimisticException;
   39   
   40   /**
   41    * Batch prepared statement manager implementation. This prepared statement
   42    * manager will utilize the JDBC addBatch() and exceuteBatch() to batch the SQL
   43    * statements together to improve the execution performance.
   44    * 
   45    * @author Teresa Kan
   46    */
   47   
   48   public class BatchingPreparedStatementManagerImpl extends
   49           PreparedStatementManagerImpl {
   50   
   51       private final static Localizer _loc = Localizer
   52               .forPackage(BatchingPreparedStatementManagerImpl.class);
   53   
   54       private String _batchedSql = null;
   55       private List _batchedRows = new ArrayList();
   56       private int _batchLimit;
   57       private boolean _disableBatch = false;
   58       private transient Log _log = null;
   59   
   60       /**
   61        * Constructor. Supply connection.
   62        */
   63       public BatchingPreparedStatementManagerImpl(JDBCStore store,
   64           Connection conn, int batchLimit) {
   65           super(store, conn);
   66           _batchLimit = batchLimit;
   67           _log = store.getConfiguration().getLog(JDBCConfiguration.LOG_JDBC);
   68           if (_log.isTraceEnabled())
   69               _log.trace(_loc.get("batch_limit", String.valueOf(_batchLimit)));
   70       }
   71   
   72       /**
   73        * Flush the given row immediately or deferred the flush in batch.
   74        */
   75       protected void flushAndUpdate(RowImpl row) throws SQLException {
   76           if (isBatchDisabled(row)) {
   77               // if there were some statements batched before, then
   78               // we need to flush them out first before processing the
   79               // current non batch process.
   80               flushBatch();
   81   
   82               super.flushAndUpdate(row);
   83           } else {
   84               // process the SQL statement, either execute it immediately or
   85               // batch it for later execution.
   86               String sql = row.getSQL(_dict);
   87               if (_batchedSql == null) {
   88                   // brand new SQL
   89                   _batchedSql = sql;
   90               } else if (!sql.equals(_batchedSql)) {
   91                   // SQL statements changed.
   92                   switch (_batchedRows.size()) {
   93                   case 0:
   94                       break;
   95                   case 1:
   96                       // single entry in cache, direct SQL execution. 
   97                       super.flushAndUpdate((RowImpl) _batchedRows.get(0));
   98                       _batchedRows.clear();
   99                       break;
  100                   default:
  101                       // flush all entries in cache in batch.
  102                       flushBatch();
  103                   }
  104                   _batchedSql = sql;
  105               }
  106               _batchedRows.add(row);
  107           }
  108       }
  109   
  110       /*
  111        * Compute if batching is disabled, based on values of batch limit
  112        * and database characteristics.
  113        */
  114       private boolean isBatchDisabled(RowImpl row) {
  115           boolean rtnVal = true;
  116           if (_batchLimit != 0 && !_disableBatch) {
  117               String sql = row.getSQL(_dict);
  118               OpenJPAStateManager sm = row.getPrimaryKey();
  119               ClassMapping cmd = null;
  120               if (sm != null)
  121                   cmd = (ClassMapping) sm.getMetaData();
  122               Column[] autoAssign = null;
  123               if (row.getAction() == Row.ACTION_INSERT)
  124                   autoAssign = row.getTable().getAutoAssignedColumns();
  125               // validate batch capability
  126               _disableBatch = _dict
  127                   .validateBatchProcess(row, autoAssign, sm, cmd);
  128               rtnVal = _disableBatch;
  129           }
  130           return rtnVal;
  131       }
  132       
  133       /**
  134        * flush all cached up statements to be executed as a single or batched
  135        * prepared statements.
  136        */
  137       protected void flushBatch() {
  138           if (_batchedSql != null && _batchedRows.size() > 0) {
  139               PreparedStatement ps = null;
  140               try {
  141                   RowImpl onerow = null;
  142                   ps = _conn.prepareStatement(_batchedSql);
  143                   if (_batchedRows.size() == 1) {
  144                       // execute a single row.
  145                       onerow = (RowImpl) _batchedRows.get(0);
  146                       flushSingleRow(onerow, ps);
  147                   } else {
  148                       // cache has more than one rows, execute as batch.
  149                       int count = 0;
  150                       int batchedRowsBaseIndex = 0;
  151                       Iterator itr = _batchedRows.iterator();
  152                       while (itr.hasNext()) {
  153                           onerow = (RowImpl) itr.next();
  154                           if (_batchLimit == 1) {
  155                               flushSingleRow(onerow, ps);
  156                           } else {
  157                               if (count < _batchLimit || _batchLimit == -1) {
  158                                   onerow.flush(ps, _dict, _store);
  159                                   ps.addBatch();
  160                                   count++;
  161                               } else {
  162                                   // reach the batchLimit, execute the batch
  163                                   int[] rtn = ps.executeBatch();
  164                                   checkUpdateCount(rtn, batchedRowsBaseIndex);
  165   
  166                                   batchedRowsBaseIndex += _batchLimit;
  167   
  168                                   onerow.flush(ps, _dict, _store);
  169                                   ps.addBatch();
  170                                   // reset the count to 1 for new batch
  171                                   count = 1;
  172                               }
  173                           }
  174                       }
  175                       // end of the loop, execute the batch
  176                       int[] rtn = ps.executeBatch();
  177                       checkUpdateCount(rtn, batchedRowsBaseIndex);
  178                   }
  179               } catch (SQLException se) {
  180                   SQLException sqex = se.getNextException();
  181                   if (sqex == null)
  182                       sqex = se;
  183                   throw SQLExceptions.getStore(sqex, ps, _dict);
  184               } finally {
  185                   _batchedSql = null;
  186                   _batchedRows.clear();
  187                   if (ps != null) {
  188                       try {
  189                           ps.close();
  190                       } catch (SQLException sqex) {
  191                           throw SQLExceptions.getStore(sqex, ps, _dict);
  192                       }
  193                   }
  194               }
  195           }
  196       }
  197   
  198       /*
  199        * Execute an update of a single row.
  200        */
  201       private void flushSingleRow(RowImpl row, PreparedStatement ps)
  202           throws SQLException {
  203           row.flush(ps, _dict, _store);
  204           int count = ps.executeUpdate();
  205           if (count != 1) {
  206               Object failed = row.getFailedObject();
  207               if (failed != null)
  208                   _exceptions.add(new OptimisticException(failed));
  209               else if (row.getAction() == Row.ACTION_INSERT)
  210                   throw new SQLException(_loc.get("update-failed-no-failed-obj",
  211                       String.valueOf(count), row.getSQL(_dict)).getMessage());
  212           }
  213       }
  214   
  215       /*
  216        * Process executeBatch function array of return counts.
  217        */
  218       private void checkUpdateCount(int[] count, int batchedRowsBaseIndex)
  219           throws SQLException {
  220           int cnt = 0;
  221           Object failed = null;
  222           for (int i = 0; i < count.length; i++) {
  223               cnt = count[i];
  224               RowImpl row = (RowImpl) _batchedRows.get(batchedRowsBaseIndex + i);
  225               switch (cnt) {
  226               case Statement.EXECUTE_FAILED: // -3
  227                   failed = row.getFailedObject();
  228                   if (failed != null || row.getAction() == Row.ACTION_UPDATE)
  229                       _exceptions.add(new OptimisticException(failed));
  230                   else if (row.getAction() == Row.ACTION_INSERT)
  231                       throw new SQLException(_loc.get(
  232                           "update-failed-no-failed-obj",
  233                           String.valueOf(count[i]), _batchedSql).getMessage());
  234                   break;
  235               case Statement.SUCCESS_NO_INFO: // -2
  236                   if (_log.isTraceEnabled())
  237                       _log.trace(_loc.get("batch_update_info",
  238                           String.valueOf(cnt), _batchedSql).getMessage());
  239                   break;
  240               case 0: // no row is inserted, treats it as failed
  241                   // case
  242                   failed = row.getFailedObject();
  243                   if ((failed != null || row.getAction() == Row.ACTION_INSERT))
  244                       throw new SQLException(_loc.get(
  245                           "update-failed-no-failed-obj",
  246                           String.valueOf(count[i]), _batchedSql).getMessage());
  247               }
  248           }
  249       }
  250   }

Save This Page
Home » apache-openjpa-1.1.0-source » org.apache.openjpa.jdbc » kernel » [javadoc | source]