1 /*
2 * JBoss, Home of Professional Open Source.
3 * Copyright 2006, Red Hat Middleware LLC, and individual contributors
4 * as indicated by the @author tags. See the copyright.txt file in the
5 * distribution for a full listing of individual contributors.
6 *
7 * This is free software; you can redistribute it and/or modify it
8 * under the terms of the GNU Lesser General Public License as
9 * published by the Free Software Foundation; either version 2.1 of
10 * the License, or (at your option) any later version.
11 *
12 * This software is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * Lesser General Public License for more details.
16 *
17 * You should have received a copy of the GNU Lesser General Public
18 * License along with this software; if not, write to the Free
19 * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
20 * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
21 */
22 package org.jboss.mq.pm.jdbc2;
23
24 import java.io.ByteArrayInputStream;
25 import java.io.ByteArrayOutputStream;
26 import java.io.IOException;
27 import java.io.ObjectInputStream;
28 import java.io.ObjectOutputStream;
29 import java.io.StreamCorruptedException;
30 import java.sql.Connection;
31 import java.sql.PreparedStatement;
32 import java.sql.ResultSet;
33 import java.sql.SQLException;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.Map;
37 import java.util.Properties;
38
39 import javax.jms.JMSException;
40 import javax.management.AttributeNotFoundException;
41 import javax.management.InstanceNotFoundException;
42 import javax.management.MBeanException;
43 import javax.management.ObjectName;
44 import javax.management.ReflectionException;
45 import javax.naming.InitialContext;
46 import javax.naming.NamingException;
47 import javax.sql.DataSource;
48 import javax.transaction.Transaction;
49 import javax.transaction.TransactionManager;
50 import javax.transaction.xa.Xid;
51
52 import org.jboss.mq.SpyDestination;
53 import org.jboss.mq.SpyJMSException;
54 import org.jboss.mq.SpyMessage;
55 import org.jboss.mq.SpyTopic;
56 import org.jboss.mq.pm.CacheStore;
57 import org.jboss.mq.pm.PersistenceManagerExt;
58 import org.jboss.mq.pm.Tx;
59 import org.jboss.mq.pm.TxManager;
60 import org.jboss.mq.server.JMSDestination;
61 import org.jboss.mq.server.MessageCache;
62 import org.jboss.mq.server.MessageReference;
63 import org.jboss.system.ServiceMBeanSupport;
64 import org.jboss.tm.TransactionManagerLocator;
65 import org.jboss.tm.TransactionTimeoutConfiguration;
66 import org.jboss.util.UnreachableStatementException;
67
68 import EDU.oswego.cs.dl.util.concurrent.SynchronizedLong;
69
70 /**
71 * This class manages all persistence related services for JDBC based
72 * persistence.
73 *
74 * @author Jayesh Parayali (jayeshpk1@yahoo.com)
75 * @author Hiram Chirino (cojonudo14@hotmail.com)
76 * @author Adrian Brock (adrian@jboss.com)
77 * @version $Revision: 73848 $
78 */
79 public class PersistenceManager extends ServiceMBeanSupport
80 implements PersistenceManagerMBean, org.jboss.mq.pm.PersistenceManager, CacheStore, PersistenceManagerExt
81 {
82 /** FAQ about concurrency problems */
83 private static String CONCURRENCY_WARNING = "\nCommon reasons for missing messages are \n1) You have multiple JBossMQs running over the same database.\n2) You are using a replicating database that is not keeping up with replication.";
84
85 /////////////////////////////////////////////////////////////////////////////////
86 //
87 // TX state attibutes
88 //
89 /////////////////////////////////////////////////////////////////////////////////
90
91 /** The next transaction id */
92 protected SynchronizedLong nextTransactionId = new SynchronizedLong(0l);
93
94 /** The jta transaction manager */
95 protected TxManager txManager;
96
97 /** The DataSource */
98 protected DataSource datasource;
99
100 /** The JBossMQ transaction mananger */
101 protected TransactionManager tm;
102
103 /** The override recovery timeout */
104 private int recoveryTimeout = 0;
105
106 /** The recovery retries */
107 private int recoveryRetries = 0;
108
109 /** The recover messages chunk */
110 private int recoverMessagesChunk = 0;
111
112 /** The statement retries */
113 private int statementRetries = 5;
114
115 /////////////////////////////////////////////////////////////////////////////////
116 //
117 // JDBC Access Attributes
118 //
119 /////////////////////////////////////////////////////////////////////////////////
120
121 protected String UPDATE_MARKED_MESSAGES = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=?";
122 protected String UPDATE_MARKED_MESSAGES_XARECOVERY = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID NOT IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID IS NOT NULL)";
123 protected String UPDATE_MARKED_MESSAGES_WITH_TX = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE TXOP=? AND TXID=?";
124 protected String DELETE_MARKED_MESSAGES_WITH_TX =
125 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS) AND TXOP=?";
126 protected String DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
127 "DELETE FROM JMS_MESSAGES WHERE TXID IN (SELECT TXID FROM JMS_TRANSACTIONS WHERE XID = NULL) AND TXOP=?";
128 protected String DELETE_TX = "DELETE FROM JMS_TRANSACTIONS WHERE TXID = ?";
129 protected String DELETE_MARKED_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXID=? AND TXOP=?";
130 protected String DELETE_TEMPORARY_MESSAGES = "DELETE FROM JMS_MESSAGES WHERE TXOP = 'T'";
131 protected String INSERT_TX = "INSERT INTO JMS_TRANSACTIONS (TXID) values(?)";
132 protected String INSERT_TX_XARECOVERY = "INSERT INTO JMS_TRANSACTIONS (TXID, XID) values(?, ?)";
133 protected String DELETE_ALL_TX = "DELETE FROM JMS_TRANSACTIONS";
134 protected String DELETE_ALL_TX_XARECOVERY = "DELETE FROM JMS_TRANSACTIONS WHERE XID = NULL";
135 protected String SELECT_MAX_TX = "SELECT MAX(TXID) FROM (SELECT MAX(TXID) FROM JMS_TRANSACTIONS UNION SELECT MAX(TXID) FROM JMS_MESSAGES)";
136 protected String SELECT_ALL_TX_XARECOVERY = "SELECT TXID, XID FROM JMS_TRANSACTIONS";
137 protected String SELECT_MESSAGES_IN_DEST = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE DESTINATION=?";
138 protected String SELECT_MESSAGES_IN_DEST_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE DESTINATION=?";
139 protected String SELECT_MESSAGE_KEYS_IN_DEST = "SELECT MESSAGEID FROM JMS_MESSAGES WHERE DESTINATION=?";
140 protected String SELECT_MESSAGE = "SELECT MESSAGEID, MESSAGEBLOB FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
141 protected String SELECT_MESSAGE_XARECOVERY = "SELECT MESSAGEID, MESSAGEBLOB, TXID, TXOP FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
142 protected String INSERT_MESSAGE =
143 "INSERT INTO JMS_MESSAGES (MESSAGEID, DESTINATION, MESSAGEBLOB, TXID, TXOP) VALUES(?,?,?,?,?)";
144 protected String MARK_MESSAGE = "UPDATE JMS_MESSAGES SET TXID=?, TXOP=? WHERE MESSAGEID=? AND DESTINATION=?";
145 protected String DELETE_MESSAGE = "DELETE FROM JMS_MESSAGES WHERE MESSAGEID=? AND DESTINATION=?";
146 protected String UPDATE_MESSAGE = "UPDATE JMS_MESSAGES SET MESSAGEBLOB=? WHERE MESSAGEID=? AND DESTINATION=?";
147 protected String CREATE_MESSAGE_TABLE =
148 "CREATE TABLE JMS_MESSAGES ( MESSAGEID INTEGER NOT NULL, "
149 + "DESTINATION VARCHAR(32) NOT NULL, TXID INTEGER, TXOP CHAR(1),"
150 + "MESSAGEBLOB OBJECT, PRIMARY KEY (MESSAGEID, DESTINATION) )";
151 protected String CREATE_IDX_MESSAGE_TXOP_TXID = "CREATE INDEX JMS_MESSAGES_TXOP_TXID ON JMS_MESSAGES (TXOP, TXID)";
152 protected String CREATE_IDX_MESSAGE_DESTINATION = "CREATE INDEX JMS_MESSAGES_DESTINATION ON JMS_MESSAGES (DESTINATION)";
153 protected String CREATE_TX_TABLE = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, PRIMARY KEY (TXID) )";
154 protected String CREATE_TX_TABLE_XARECOVERY = "CREATE TABLE JMS_TRANSACTIONS ( TXID INTEGER, XID OBJECT, PRIMARY KEY (TXID) )";
155
156 protected static final int OBJECT_BLOB = 0;
157 protected static final int BYTES_BLOB = 1;
158 protected static final int BINARYSTREAM_BLOB = 2;
159 protected static final int BLOB_BLOB = 3;
160
161 protected int blobType = OBJECT_BLOB;
162 protected boolean createTables;
163
164 protected int connectionRetryAttempts = 5;
165
166 protected boolean xaRecovery = false;
167
168 /////////////////////////////////////////////////////////////////////////////////
169 //
170 // Constructor.
171 //
172 /////////////////////////////////////////////////////////////////////////////////
173 public PersistenceManager() throws javax.jms.JMSException
174 {
175 txManager = new TxManager(this);
176 }
177
178 /**
179 * This inner class helps handle the tx management of the jdbc connections.
180 *
181 */
182 protected class TransactionManagerStrategy
183 {
184 boolean rollback = false;
185
186 Transaction threadTx;
187
188 void startTX() throws JMSException
189 {
190 try
191 {
192 // Thread arriving must be clean (jboss doesn't set the thread
193 // previously). However optimized calls come with associated
194 // thread for example. We suspend the thread association here, and
195 // resume in the finally block of the following try.
196 threadTx = tm.suspend();
197
198 // Always begin a transaction
199 tm.begin();
200 }
201 catch (Exception e)
202 {
203 try
204 {
205 if (threadTx != null)
206 tm.resume(threadTx);
207 }
208 catch (Exception ignore)
209 {
210 }
211 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
212 }
213 }
214
215 void setRollbackOnly() throws JMSException
216 {
217 rollback = true;
218 }
219
220 void endTX() throws JMSException
221 {
222 try
223 {
224 if (rollback)
225 {
226 tm.rollback();
227 }
228 else
229 {
230 tm.commit();
231 }
232 }
233 catch (Exception e)
234 {
235 throw new SpyJMSException("Could not start a transaction with the transaction manager.", e);
236 }
237 finally
238 {
239 try
240 {
241 if (threadTx != null)
242 tm.resume(threadTx);
243 }
244 catch (Exception ignore)
245 {
246 }
247 }
248 }
249 }
250
251 /////////////////////////////////////////////////////////////////////////////////
252 //
253 // TX Resolution.
254 //
255 /////////////////////////////////////////////////////////////////////////////////
256
257 synchronized protected void createSchema() throws JMSException
258 {
259 TransactionManagerStrategy tms = new TransactionManagerStrategy();
260 tms.startTX();
261 Connection c = null;
262 PreparedStatement stmt = null;
263 boolean threadWasInterrupted = Thread.interrupted();
264 try
265 {
266 if (createTables)
267 {
268 c = this.getConnection();
269
270 boolean createdMessageTable = false;
271 try
272 {
273 stmt = c.prepareStatement(CREATE_MESSAGE_TABLE);
274 stmt.executeUpdate();
275 createdMessageTable = true;
276 }
277 catch (SQLException e)
278 {
279 log.debug("Could not create table with SQL: " + CREATE_MESSAGE_TABLE, e);
280 }
281 finally
282 {
283 try
284 {
285 if (stmt != null)
286 stmt.close();
287 }
288 catch (Throwable ignored)
289 {
290 log.trace("Ignored: " + ignored);
291 }
292 stmt = null;
293 }
294
295 if (createdMessageTable)
296 {
297 try
298 {
299 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_TXOP_TXID);
300 stmt.executeUpdate();
301 }
302 catch (SQLException e)
303 {
304 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_TXOP_TXID, e);
305 }
306 finally
307 {
308 try
309 {
310 if (stmt != null)
311 stmt.close();
312 }
313 catch (Throwable ignored)
314 {
315 log.trace("Ignored: " + ignored);
316 }
317 stmt = null;
318 }
319 try
320 {
321 stmt = c.prepareStatement(CREATE_IDX_MESSAGE_DESTINATION);
322 stmt.executeUpdate();
323 }
324 catch (SQLException e)
325 {
326 log.debug("Could not create index with SQL: " + CREATE_IDX_MESSAGE_DESTINATION, e);
327 }
328 finally
329 {
330 try
331 {
332 if (stmt != null)
333 stmt.close();
334 }
335 catch (Throwable ignored)
336 {
337 log.trace("Ignored: " + ignored);
338 }
339 stmt = null;
340 }
341 }
342
343 String createTxTable = CREATE_TX_TABLE;
344 if (xaRecovery)
345 createTxTable = CREATE_TX_TABLE_XARECOVERY;
346 try
347 {
348 stmt = c.prepareStatement(createTxTable);
349 stmt.executeUpdate();
350 }
351 catch (SQLException e)
352 {
353 log.debug("Could not create table with SQL: " + createTxTable, e);
354 }
355 finally
356 {
357 try
358 {
359 if (stmt != null)
360 stmt.close();
361 }
362 catch (Throwable ignored)
363 {
364 log.trace("Ignored: " + ignored);
365 }
366 stmt = null;
367 }
368 }
369 }
370 catch (SQLException e)
371 {
372 tms.setRollbackOnly();
373 throw new SpyJMSException("Could not get a connection for jdbc2 table construction ", e);
374 }
375 finally
376 {
377 try
378 {
379 if (stmt != null)
380 stmt.close();
381 }
382 catch (Throwable ignore)
383 {
384 }
385 stmt = null;
386 try
387 {
388 if (c != null)
389 c.close();
390 }
391 catch (Throwable ignore)
392 {
393 }
394 c = null;
395 tms.endTX();
396
397 // Restore the interrupted state of the thread
398 if (threadWasInterrupted)
399 Thread.currentThread().interrupt();
400 }
401 }
402
403 synchronized protected void resolveAllUncommitedTXs() throws JMSException
404 {
405 // We perform recovery in a different thread to the table creation
406 // Postgres doesn't like create table failing in the same transaction
407 // as other operations
408
409 TransactionManagerStrategy tms = new TransactionManagerStrategy();
410 tms.startTX();
411 Connection c = null;
412 PreparedStatement stmt = null;
413 ResultSet rs = null;
414 boolean threadWasInterrupted = Thread.interrupted();
415 try
416 {
417 c = this.getConnection();
418
419 // Find out what the next TXID should be
420 stmt = c.prepareStatement(SELECT_MAX_TX);
421 rs = stmt.executeQuery();
422 if (rs.next())
423 nextTransactionId.set(rs.getLong(1) + 1);
424 rs.close();
425 rs = null;
426 stmt.close();
427 stmt = null;
428
429 // Delete all the temporary messages.
430 stmt = c.prepareStatement(DELETE_TEMPORARY_MESSAGES);
431 stmt.executeUpdate();
432 stmt.close();
433 stmt = null;
434
435 // Delete all the messages that were added but thier tx's were not commited.
436 String deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX;
437 if (xaRecovery)
438 deleteMarkedMessagesWithTx = DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY;
439 stmt = c.prepareStatement(deleteMarkedMessagesWithTx);
440 stmt.setString(1, "A");
441 stmt.executeUpdate();
442 stmt.close();
443 stmt = null;
444
445 // Restore all the messages that were removed but their tx's were not commited.
446 String updateMarkedMessages = UPDATE_MARKED_MESSAGES;
447 if (xaRecovery)
448 updateMarkedMessages = UPDATE_MARKED_MESSAGES_XARECOVERY;
449 stmt = c.prepareStatement(updateMarkedMessages);
450 stmt.setNull(1, java.sql.Types.BIGINT);
451 stmt.setString(2, "A");
452 stmt.setString(3, "D");
453 stmt.executeUpdate();
454 stmt.close();
455 stmt = null;
456
457 // Now recovery is complete, clear the transaction table.
458 String deleteAllTx = DELETE_ALL_TX;
459 if (xaRecovery)
460 deleteAllTx = DELETE_ALL_TX_XARECOVERY;
461 stmt = c.prepareStatement(deleteAllTx);
462 stmt.execute();
463 stmt.close();
464 stmt = null;
465
466 // If we are doing XARecovery restore the prepared transactions
467 if (xaRecovery)
468 {
469 stmt = c.prepareStatement(SELECT_ALL_TX_XARECOVERY);
470 rs = stmt.executeQuery();
471 while (rs.next())
472 {
473 long txid = rs.getLong(1);
474 Xid xid = extractXid(rs, 2);
475 Tx tx = new Tx(txid);
476 tx.setXid(xid);
477 tx.checkPersisted();
478 txManager.restoreTx(tx);
479 }
480 rs.close();
481 rs = null;
482 stmt.close();
483 stmt = null;
484 }
485 }
486 catch (Exception e)
487 {
488 tms.setRollbackOnly();
489 throw new SpyJMSException("Could not resolve uncommited transactions. Message recovery may not be accurate", e);
490 }
491 finally
492 {
493 try
494 {
495 if (rs != null)
496 rs.close();
497 }
498 catch (Throwable ignore)
499 {
500 }
501 try
502 {
503 if (stmt != null)
504 stmt.close();
505 }
506 catch (Throwable ignore)
507 {
508 }
509 try
510 {
511 if (c != null)
512 c.close();
513 }
514 catch (Throwable ignore)
515 {
516 }
517 tms.endTX();
518
519 // Restore the interrupted state of the thread
520 if (threadWasInterrupted)
521 Thread.currentThread().interrupt();
522 }
523 }
524
525 /////////////////////////////////////////////////////////////////////////////////
526 //
527 // Message Recovery
528 //
529 /////////////////////////////////////////////////////////////////////////////////
530
531 synchronized public void restoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
532 {
533 if (jmsDest == null)
534 throw new IllegalArgumentException("Must supply non null JMSDestination to restoreQueue");
535 if (dest == null)
536 throw new IllegalArgumentException("Must supply non null SpyDestination to restoreQueue");
537
538 boolean canOverrideTimeout = (tm instanceof TransactionTimeoutConfiguration);
539 int previousTimeout = 0;
540 try
541 {
542 // Set our timeout
543 if (recoveryTimeout != 0)
544 {
545 if (canOverrideTimeout)
546 {
547 previousTimeout = ((TransactionTimeoutConfiguration) tm).getTransactionTimeout();
548 tm.setTransactionTimeout(recoveryTimeout);
549 }
550 else
551 {
552 log.debug("Cannot override recovery timeout, TransactionManager does implement " + TransactionTimeoutConfiguration.class.getName());
553 }
554 }
555
556 // restore the queue
557 try
558 {
559 internalRestoreQueue(jmsDest, dest);
560 }
561 finally
562 {
563 // restore the transaction timeout
564 if (recoveryTimeout != 0 && canOverrideTimeout)
565 tm.setTransactionTimeout(previousTimeout);
566 }
567 }
568 catch (Exception e)
569 {
570 SpyJMSException.rethrowAsJMSException("Unexpected error in recovery", e);
571 }
572 }
573
574 synchronized protected void internalRestoreQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
575 {
576 // Work out the prepared transactions
577 Map prepared = null;
578 if (xaRecovery)
579 {
580 prepared = new HashMap();
581 Map map = txManager.getPreparedTransactions();
582 for (Iterator i = map.values().iterator(); i.hasNext();)
583 {
584 TxManager.PreparedInfo info = (TxManager.PreparedInfo) i.next();
585 for (Iterator j = info.getTxids().iterator(); j.hasNext();)
586 {
587 Tx tx = (Tx) j.next();
588 prepared.put(new Long(tx.longValue()), tx);
589 }
590 }
591 }
592
593 TransactionManagerStrategy tms = new TransactionManagerStrategy();
594 tms.startTX();
595 Connection c = null;
596 PreparedStatement stmt = null;
597 PreparedStatement stmt2 = null;
598 ResultSet rs = null;
599 boolean threadWasInterrupted = Thread.interrupted();
600 try
601 {
602 String selectMessagesInDest = SELECT_MESSAGES_IN_DEST;
603 String selectMessage = SELECT_MESSAGE;
604 if (xaRecovery)
605 {
606 selectMessagesInDest = SELECT_MESSAGES_IN_DEST_XARECOVERY;
607 selectMessage = SELECT_MESSAGE_XARECOVERY;
608 }
609 c = this.getConnection();
610 if (recoverMessagesChunk == 0)
611 stmt = c.prepareStatement(selectMessagesInDest);
612 else
613 {
614 stmt = c.prepareStatement(SELECT_MESSAGE_KEYS_IN_DEST);
615 stmt2 = c.prepareStatement(selectMessage);
616 }
617 stmt.setString(1, dest.toString());
618
619 long txid = 0;
620 String txop = null;
621 rs = stmt.executeQuery();
622 int counter = 0;
623 int recovery = 0;
624 while (rs.next())
625 {
626 long msgid = rs.getLong(1);
627 SpyMessage message = null;
628 if (recoverMessagesChunk == 0)
629 {
630 message = extractMessage(rs);
631 if (xaRecovery)
632 {
633 txid = rs.getLong(3);
634 txop = rs.getString(4);
635 }
636 }
637 else
638 {
639 ResultSet rs2 = null;
640 try
641 {
642 stmt2.setLong(1, msgid);
643 stmt2.setString(2, dest.toString());
644 rs2 = stmt2.executeQuery();
645 if (rs2.next())
646 {
647 message = extractMessage(rs2);
648 if (xaRecovery)
649 {
650 txid = rs2.getLong(3);
651 txop = rs2.getString(4);
652 }
653 }
654 else
655 log.warn("Failed to find message msgid=" + msgid +" dest=" + dest);
656 }
657 finally
658 {
659 if (rs2 != null)
660 {
661 try
662 {
663 rs2.close();
664 }
665 catch (Exception ignored)
666 {
667 }
668 }
669 }
670 }
671 // The durable subscription is not serialized
672 if (dest instanceof SpyTopic)
673 message.header.durableSubscriberID = ((SpyTopic) dest).getDurableSubscriptionID();
674
675 if (xaRecovery == false || txid == 0 || txop == null)
676 jmsDest.restoreMessage(message);
677 else
678 {
679 Tx tx = (Tx) prepared.get(new Long(txid));
680 if (tx == null)
681 jmsDest.restoreMessage(message);
682 else if ("A".equals(txop))
683 {
684 jmsDest.restoreMessage(message, tx, Tx.ADD);
685 recovery++;
686 }
687 else if ("D".equals(txop))
688 {
689 jmsDest.restoreMessage(message, tx, Tx.REMOVE);
690 recovery++;
691 }
692 else
693 throw new IllegalStateException("Unknown txop=" + txop + " for msg=" + msgid + " dest=" + dest);
694 }
695 counter++;
696 }
697
698 log.debug("Restored " + counter + " message(s) to: " + dest + " " + recovery + " need recovery.");
699 }
700 catch (IOException e)
701 {
702 tms.setRollbackOnly();
703 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
704 }
705 catch (SQLException e)
706 {
707 tms.setRollbackOnly();
708 throw new SpyJMSException("Could not restore messages to destination : " + dest.toString(), e);
709 }
710 finally
711 {
712 try
713 {
714 if (rs != null)
715 rs.close();
716 }
717 catch (Throwable ignore)
718 {
719 }
720 try
721 {
722 if (stmt != null)
723 stmt.close();
724 }
725 catch (Throwable ignore)
726 {
727 }
728 try
729 {
730 if (c != null)
731 c.close();
732 }
733 catch (Throwable ignore)
734 {
735 }
736 tms.endTX();
737
738 // Restore the interrupted state of the thread
739 if (threadWasInterrupted)
740 Thread.currentThread().interrupt();
741 }
742
743 }
744
745 SpyMessage extractMessage(ResultSet rs) throws SQLException, IOException
746 {
747 try
748 {
749 long messageid = rs.getLong(1);
750
751 SpyMessage message = null;
752
753 if (blobType == OBJECT_BLOB)
754 {
755
756 message = (SpyMessage) rs.getObject(2);
757
758 }
759 else if (blobType == BYTES_BLOB)
760 {
761
762 byte[] st = rs.getBytes(2);
763 ByteArrayInputStream baip = new ByteArrayInputStream(st);
764 ObjectInputStream ois = new ObjectInputStream(baip);
765 message = SpyMessage.readMessage(ois);
766
767 }
768 else if (blobType == BINARYSTREAM_BLOB)
769 {
770
771 ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(2));
772 message = SpyMessage.readMessage(ois);
773
774 }
775 else if (blobType == BLOB_BLOB)
776 {
777
778 ObjectInputStream ois = new ObjectInputStream(rs.getBlob(2).getBinaryStream());
779 message = SpyMessage.readMessage(ois);
780 }
781
782 message.header.messageId = messageid;
783 return message;
784 }
785 catch (StreamCorruptedException e)
786 {
787 throw new IOException("Could not load the message: " + e);
788 }
789 }
790
791 Xid extractXid(ResultSet rs, int column) throws SQLException, IOException, ClassNotFoundException
792 {
793 try
794 {
795 Xid xid = null;
796
797 if (blobType == OBJECT_BLOB)
798 {
799 xid = (Xid) rs.getObject(column);
800 }
801 else if (blobType == BYTES_BLOB)
802 {
803 byte[] st = rs.getBytes(column);
804 ByteArrayInputStream baip = new ByteArrayInputStream(st);
805 ObjectInputStream ois = new ObjectInputStream(baip);
806 xid = (Xid) ois.readObject();
807 }
808 else if (blobType == BINARYSTREAM_BLOB)
809 {
810 ObjectInputStream ois = new ObjectInputStream(rs.getBinaryStream(column));
811 xid = (Xid) ois.readObject();
812 }
813 else if (blobType == BLOB_BLOB)
814 {
815 ObjectInputStream ois = new ObjectInputStream(rs.getBlob(column).getBinaryStream());
816 xid = (Xid) ois.readObject();
817 }
818
819 return xid;
820 }
821 catch (StreamCorruptedException e)
822 {
823 throw new IOException("Could not load the message: " + e);
824 }
825 }
826
827 public void forcePersistentTx(Tx txId) throws javax.jms.JMSException
828 {
829 // No need if already done or not doing xa recovery
830 if (txId.wasPersisted() || xaRecovery == false)
831 return;
832
833 TransactionManagerStrategy tms = new TransactionManagerStrategy();
834 tms.startTX();
835 Connection c = null;
836 boolean threadWasInterrupted = Thread.interrupted();
837 try
838 {
839
840 c = this.getConnection();
841 insertPersistentTx(tms, c, txId);
842 }
843 catch (SQLException e)
844 {
845 tms.setRollbackOnly();
846 throw new SpyJMSException("Could not commit tx: " + txId, e);
847 }
848 finally
849 {
850 try
851 {
852 if (c != null)
853 c.close();
854 }
855 catch (Throwable ignore)
856 {
857 }
858 tms.endTX();
859
860 // Restore the interrupted state of the thread
861 if (threadWasInterrupted)
862 Thread.currentThread().interrupt();
863 }
864 }
865
866 /////////////////////////////////////////////////////////////////////////////////
867 //
868 // TX Commit
869 //
870 /////////////////////////////////////////////////////////////////////////////////
871 public void commitPersistentTx(Tx txId) throws javax.jms.JMSException
872 {
873 if (txId.wasPersisted() == false)
874 return;
875
876 TransactionManagerStrategy tms = new TransactionManagerStrategy();
877 tms.startTX();
878 Connection c = null;
879 boolean threadWasInterrupted = Thread.interrupted();
880 try
881 {
882
883 c = this.getConnection();
884 removeMarkedMessages(c, txId, "D");
885 removeTXRecord(c, txId.longValue());
886
887 }
888 catch (SQLException e)
889 {
890 tms.setRollbackOnly();
891 throw new SpyJMSException("Could not commit tx: " + txId, e);
892 }
893 finally
894 {
895 try
896 {
897 if (c != null)
898 c.close();
899 }
900 catch (Throwable ignore)
901 {
902 }
903 tms.endTX();
904
905 // Restore the interrupted state of the thread
906 if (threadWasInterrupted)
907 Thread.currentThread().interrupt();
908 }
909 }
910
911 public void removeMarkedMessages(Connection c, Tx txid, String mark) throws SQLException
912 {
913 PreparedStatement stmt = null;
914 try
915 {
916 stmt = c.prepareStatement(DELETE_MARKED_MESSAGES);
917 stmt.setLong(1, txid.longValue());
918 stmt.setString(2, mark);
919 stmt.executeUpdate();
920 }
921 finally
922 {
923 try
924 {
925 if (stmt != null)
926 stmt.close();
927 }
928 catch (Throwable e)
929 {
930 }
931 }
932 }
933
934 public void addTXRecord(Connection c, Tx txid) throws SQLException, IOException
935 {
936 PreparedStatement stmt = null;
937 try
938 {
939 String insertTx = INSERT_TX;
940 if (xaRecovery)
941 insertTx = INSERT_TX_XARECOVERY;
942 stmt = c.prepareStatement(insertTx);
943 stmt.setLong(1, txid.longValue());
944 if (xaRecovery)
945 {
946 Xid xid = txid.getXid();
947 if (xid != null)
948 setBlob(stmt, 2, xid);
949 else
950 stmt.setNull(2, java.sql.Types.BLOB);
951 }
952 stmt.executeUpdate();
953 }
954 finally
955 {
956 try
957 {
958 if (stmt != null)
959 stmt.close();
960 }
961 catch (Throwable e)
962 {
963 }
964 }
965 }
966
967 public void removeTXRecord(Connection c, long txid) throws SQLException
968 {
969 PreparedStatement stmt = null;
970 try
971 {
972 stmt = c.prepareStatement(DELETE_TX);
973 stmt.setLong(1, txid);
974 stmt.executeUpdate();
975 }
976 finally
977 {
978 try
979 {
980 if (stmt != null)
981 stmt.close();
982 }
983 catch (Throwable e)
984 {
985 }
986 }
987 }
988
989 /////////////////////////////////////////////////////////////////////////////////
990 //
991 // TX Rollback
992 //
993 /////////////////////////////////////////////////////////////////////////////////
994 public void rollbackPersistentTx(Tx txId) throws JMSException
995 {
996 if (txId.wasPersisted() == false)
997 return;
998
999 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1000 tms.startTX();
1001 Connection c = null;
1002 PreparedStatement stmt = null;
1003 boolean threadWasInterrupted = Thread.interrupted();
1004 try
1005 {
1006
1007 c = this.getConnection();
1008 removeMarkedMessages(c, txId, "A");
1009 removeTXRecord(c, txId.longValue());
1010
1011 // Restore all the messages that were logically removed.
1012 stmt = c.prepareStatement(UPDATE_MARKED_MESSAGES_WITH_TX);
1013 stmt.setNull(1, java.sql.Types.BIGINT);
1014 stmt.setString(2, "A");
1015 stmt.setString(3, "D");
1016 stmt.setLong(4, txId.longValue());
1017 stmt.executeUpdate();
1018 stmt.close();
1019 stmt = null;
1020 }
1021 catch (SQLException e)
1022 {
1023 tms.setRollbackOnly();
1024 throw new SpyJMSException("Could not rollback tx: " + txId, e);
1025 }
1026 finally
1027 {
1028 try
1029 {
1030 if (stmt != null)
1031 stmt.close();
1032 }
1033 catch (Throwable ignore)
1034 {
1035 }
1036 try
1037 {
1038 if (c != null)
1039 c.close();
1040 }
1041 catch (Throwable ignore)
1042 {
1043 }
1044 tms.endTX();
1045
1046 // Restore the interrupted state of the thread
1047 if (threadWasInterrupted)
1048 Thread.currentThread().interrupt();
1049 }
1050
1051 }
1052
1053 /////////////////////////////////////////////////////////////////////////////////
1054 //
1055 // TX Creation
1056 //
1057 /////////////////////////////////////////////////////////////////////////////////
1058 public Tx createPersistentTx() throws JMSException
1059 {
1060 Tx id = new Tx(nextTransactionId.increment());
1061 return id;
1062 }
1063
1064 public void insertPersistentTx(TransactionManagerStrategy tms, Connection c, Tx tx) throws JMSException
1065 {
1066 try
1067 {
1068 if (tx != null && tx.checkPersisted() == false)
1069 addTXRecord(c, tx);
1070 }
1071 catch (Exception e)
1072 {
1073 tms.setRollbackOnly();
1074 throw new SpyJMSException("Could not create tx: " + tx.longValue(), e);
1075 }
1076 }
1077
1078 /////////////////////////////////////////////////////////////////////////////////
1079 //
1080 // Adding a message
1081 //
1082 /////////////////////////////////////////////////////////////////////////////////
1083 public void add(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
1084 {
1085 boolean trace = log.isTraceEnabled();
1086 if (trace)
1087 log.trace("About to add message " + messageRef + " transaction=" + txId);
1088
1089 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1090 tms.startTX();
1091 Connection c = null;
1092 boolean threadWasInterrupted = Thread.interrupted();
1093 try
1094 {
1095 c = this.getConnection();
1096
1097 // Lazily write the peristent transaction
1098 insertPersistentTx(tms, c, txId);
1099
1100 // Synchronize on the message to avoid a race with the softener
1101 synchronized (messageRef)
1102 {
1103 SpyMessage message = messageRef.getMessage();
1104
1105 // has it allready been stored by the message cache interface??
1106 if (messageRef.stored == MessageReference.STORED)
1107 {
1108 if (trace)
1109 log.trace("Updating message " + messageRef + " transaction=" + txId);
1110
1111 markMessage(c, messageRef.messageId, messageRef.getPersistentKey(), txId, "A");
1112 }
1113 else
1114 {
1115 if (trace)
1116 log.trace("Inserting message " + messageRef + " transaction=" + txId);
1117
1118 add(c, messageRef.getPersistentKey(), message, txId, "A");
1119 messageRef.setStored(MessageReference.STORED);
1120 }
1121 if (trace)
1122 log.trace("Added message " + messageRef + " transaction=" + txId);
1123 }
1124 }
1125 catch (IOException e)
1126 {
1127 tms.setRollbackOnly();
1128 throw new SpyJMSException("Could not store message: " + messageRef, e);
1129 }
1130 catch (SQLException e)
1131 {
1132 tms.setRollbackOnly();
1133 throw new SpyJMSException("Could not store message: " + messageRef, e);
1134 }
1135 finally
1136 {
1137 try
1138 {
1139 if (c != null)
1140 c.close();
1141 }
1142 catch (Throwable ignore)
1143 {
1144 }
1145 tms.endTX();
1146
1147 // Restore the interrupted state of the thread
1148 if (threadWasInterrupted)
1149 Thread.currentThread().interrupt();
1150 }
1151 }
1152
1153 protected void add(Connection c, String queue, SpyMessage message, Tx txId, String mark)
1154 throws SQLException, IOException
1155 {
1156 PreparedStatement stmt = null;
1157 try
1158 {
1159
1160 stmt = c.prepareStatement(INSERT_MESSAGE);
1161
1162 stmt.setLong(1, message.header.messageId);
1163 stmt.setString(2, queue);
1164 setBlob(stmt, 3, message);
1165
1166 if (txId != null)
1167 stmt.setLong(4, txId.longValue());
1168 else
1169 stmt.setNull(4, java.sql.Types.BIGINT);
1170 stmt.setString(5, mark);
1171
1172 stmt.executeUpdate();
1173 }
1174 finally
1175 {
1176 try
1177 {
1178 if (stmt != null)
1179 stmt.close();
1180 }
1181 catch (Throwable ignore)
1182 {
1183 }
1184 }
1185 }
1186
1187 public void markMessage(Connection c, long messageid, String destination, Tx txId, String mark)
1188 throws SQLException
1189 {
1190 PreparedStatement stmt = null;
1191 try
1192 {
1193
1194 stmt = c.prepareStatement(MARK_MESSAGE);
1195 if (txId == null)
1196 {
1197 stmt.setNull(1, java.sql.Types.BIGINT);
1198 }
1199 else
1200 {
1201 stmt.setLong(1, txId.longValue());
1202 }
1203 stmt.setString(2, mark);
1204 stmt.setLong(3, messageid);
1205 stmt.setString(4, destination);
1206 stmt.executeUpdate();
1207 }
1208 finally
1209 {
1210 try
1211 {
1212 if (stmt != null)
1213 stmt.close();
1214 }
1215 catch (Throwable ignore)
1216 {
1217 }
1218 }
1219
1220 }
1221
1222 public void setBlob(PreparedStatement stmt, int column, SpyMessage message) throws IOException, SQLException
1223 {
1224 if (blobType == OBJECT_BLOB)
1225 {
1226 stmt.setObject(column, message);
1227 }
1228 else if (blobType == BYTES_BLOB)
1229 {
1230 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1231 ObjectOutputStream oos = new ObjectOutputStream(baos);
1232 SpyMessage.writeMessage(message, oos);
1233 oos.flush();
1234 byte[] messageAsBytes = baos.toByteArray();
1235 stmt.setBytes(column, messageAsBytes);
1236 }
1237 else if (blobType == BINARYSTREAM_BLOB)
1238 {
1239 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1240 ObjectOutputStream oos = new ObjectOutputStream(baos);
1241 SpyMessage.writeMessage(message, oos);
1242 oos.flush();
1243 byte[] messageAsBytes = baos.toByteArray();
1244 ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
1245 stmt.setBinaryStream(column, bais, messageAsBytes.length);
1246 }
1247 else if (blobType == BLOB_BLOB)
1248 {
1249
1250 throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1251 /** TODO:
1252 ByteArrayOutputStream baos= new ByteArrayOutputStream();
1253 ObjectOutputStream oos= new ObjectOutputStream(baos);
1254 oos.writeObject(message);
1255 byte[] messageAsBytes= baos.toByteArray();
1256 ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1257 stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1258 */
1259 }
1260 }
1261
1262 public void setBlob(PreparedStatement stmt, int column, Xid xid) throws IOException, SQLException
1263 {
1264 if (blobType == OBJECT_BLOB)
1265 {
1266 stmt.setObject(column, xid);
1267 }
1268 else if (blobType == BYTES_BLOB)
1269 {
1270 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1271 ObjectOutputStream oos = new ObjectOutputStream(baos);
1272 oos.writeObject(xid);
1273 oos.flush();
1274 byte[] messageAsBytes = baos.toByteArray();
1275 stmt.setBytes(column, messageAsBytes);
1276 }
1277 else if (blobType == BINARYSTREAM_BLOB)
1278 {
1279 ByteArrayOutputStream baos = new ByteArrayOutputStream();
1280 ObjectOutputStream oos = new ObjectOutputStream(baos);
1281 oos.writeObject(xid);
1282 oos.flush();
1283 byte[] messageAsBytes = baos.toByteArray();
1284 ByteArrayInputStream bais = new ByteArrayInputStream(messageAsBytes);
1285 stmt.setBinaryStream(column, bais, messageAsBytes.length);
1286 }
1287 else if (blobType == BLOB_BLOB)
1288 {
1289
1290 throw new RuntimeException("BLOB_TYPE: BLOB_BLOB is not yet implemented.");
1291 /** TODO:
1292 ByteArrayOutputStream baos= new ByteArrayOutputStream();
1293 ObjectOutputStream oos= new ObjectOutputStream(baos);
1294 oos.writeObject(xid);
1295 byte[] messageAsBytes= baos.toByteArray();
1296 ByteArrayInputStream bais= new ByteArrayInputStream(messageAsBytes);
1297 stmt.setBsetBinaryStream(column, bais, messageAsBytes.length);
1298 */
1299 }
1300 }
1301
1302 /////////////////////////////////////////////////////////////////////////////////
1303 //
1304 // Updating a message
1305 //
1306 /////////////////////////////////////////////////////////////////////////////////
1307 public void update(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
1308 {
1309 boolean trace = log.isTraceEnabled();
1310 if (trace)
1311 log.trace("Updating message " + messageRef + " transaction=" + txId);
1312
1313 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1314 tms.startTX();
1315 Connection c = null;
1316 PreparedStatement stmt = null;
1317 boolean threadWasInterrupted = Thread.interrupted();
1318 try
1319 {
1320
1321 c = this.getConnection();
1322 if (txId == null)
1323 {
1324
1325 stmt = c.prepareStatement(UPDATE_MESSAGE);
1326 setBlob(stmt, 1, messageRef.getMessage());
1327 stmt.setLong(2, messageRef.messageId);
1328 stmt.setString(3, messageRef.getPersistentKey());
1329 int rc = stmt.executeUpdate();
1330 if (rc != 1)
1331 throw new SpyJMSException(
1332 "Could not update the message in the database: update affected " + rc + " rows");
1333 }
1334 else
1335 {
1336 throw new SpyJMSException("NYI: Updating a message in a transaction is not currently used");
1337 }
1338 if (trace)
1339 log.trace("Updated message " + messageRef + " transaction=" + txId);
1340
1341 }
1342 catch (IOException e)
1343 {
1344 tms.setRollbackOnly();
1345 throw new SpyJMSException("Could not update message: " + messageRef, e);
1346 }
1347 catch (SQLException e)
1348 {
1349 tms.setRollbackOnly();
1350 throw new SpyJMSException("Could not update message: " + messageRef, e);
1351 }
1352 finally
1353 {
1354 try
1355 {
1356 if (stmt != null)
1357 stmt.close();
1358 }
1359 catch (Throwable ignore)
1360 {
1361 }
1362 try
1363 {
1364 if (c != null)
1365 c.close();
1366 }
1367 catch (Throwable ignore)
1368 {
1369 }
1370 tms.endTX();
1371
1372 // Restore the interrupted state of the thread
1373 if (threadWasInterrupted)
1374 Thread.currentThread().interrupt();
1375 }
1376
1377 }
1378
1379 /////////////////////////////////////////////////////////////////////////////////
1380 //
1381 // Removing a message
1382 //
1383 /////////////////////////////////////////////////////////////////////////////////
1384 public void remove(MessageReference messageRef, Tx txId) throws javax.jms.JMSException
1385 {
1386 boolean trace = log.isTraceEnabled();
1387 if (trace)
1388 log.trace("Removing message " + messageRef + " transaction=" + txId);
1389
1390 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1391 tms.startTX();
1392 Connection c = null;
1393 PreparedStatement stmt = null;
1394 boolean threadWasInterrupted = Thread.interrupted();
1395 try
1396 {
1397 c = this.getConnection();
1398
1399 // Lazily write the peristent transaction
1400 insertPersistentTx(tms, c, txId);
1401
1402 // Synchronize on the message to avoid a race with the softener
1403 synchronized (messageRef)
1404 {
1405 if (txId == null)
1406 {
1407 stmt = c.prepareStatement(DELETE_MESSAGE);
1408 stmt.setLong(1, messageRef.messageId);
1409 stmt.setString(2, messageRef.getPersistentKey());
1410
1411 // Adrian Brock:
1412 // Remove the message from the cache, but don't
1413 // return it to the pool just yet. The queue still holds
1414 // a reference to the message and will return it
1415 // to the pool once it gets enough time slice.
1416 // The alternative is to remove the validation
1417 // for double removal from the cache,
1418 // which I don't want to do because it is useful
1419 // for spotting errors
1420 messageRef.setStored(MessageReference.NOT_STORED);
1421 messageRef.removeDelayed();
1422 }
1423 else
1424 {
1425 stmt = c.prepareStatement(MARK_MESSAGE);
1426 stmt.setLong(1, txId.longValue());
1427 stmt.setString(2, "D");
1428 stmt.setLong(3, messageRef.messageId);
1429 stmt.setString(4, messageRef.getPersistentKey());
1430 }
1431
1432 int tries = 0;
1433 while (true)
1434 {
1435 try
1436 {
1437 int rc = stmt.executeUpdate();
1438
1439 if (tries > 0)
1440 {
1441 if (rc != 1)
1442 throw new SpyJMSException(
1443 "Could not mark the message as deleted in the database: update affected " + rc + " rows." + CONCURRENCY_WARNING);
1444
1445 log.warn("Remove operation worked after " +tries +" retries");
1446 }
1447 break;
1448 }
1449 catch (SQLException e)
1450 {
1451 log.warn("SQLException caught - assuming deadlock detected, try:" + (tries + 1), e);
1452 tries++;
1453 if (tries >= statementRetries)
1454 {
1455 log.error("Retried " + tries + " times, now giving up");
1456 throw new IllegalStateException("Could not remove message after " +tries + "attempts");
1457 }
1458 log.warn("Trying again after a pause");
1459 //Now we wait for a random amount of time to minimise risk of deadlock
1460 Thread.sleep((long)(Math.random() * 500));
1461 }
1462 }
1463
1464 if (trace)
1465 log.trace("Removed message " + messageRef + " transaction=" + txId);
1466 }
1467 }
1468 catch (Exception e)
1469 {
1470 tms.setRollbackOnly();
1471 throw new SpyJMSException("Could not remove message: " + messageRef, e);
1472 }
1473 finally
1474 {
1475 try
1476 {
1477 if (stmt != null)
1478 stmt.close();
1479 }
1480 catch (Throwable ignore)
1481 {
1482 }
1483 try
1484 {
1485 if (c != null)
1486 c.close();
1487 }
1488 catch (Throwable ignore)
1489 {
1490 }
1491 tms.endTX();
1492
1493 // Restore the interrupted state of the thread
1494 if (threadWasInterrupted)
1495 Thread.currentThread().interrupt();
1496 }
1497
1498 }
1499
1500 /////////////////////////////////////////////////////////////////////////////////
1501 //
1502 // Misc. PM functions
1503 //
1504 /////////////////////////////////////////////////////////////////////////////////
1505
1506 public TxManager getTxManager()
1507 {
1508 return txManager;
1509 }
1510
1511 public void closeQueue(JMSDestination jmsDest, SpyDestination dest) throws JMSException
1512 {
1513 // Nothing to clean up, all the state is in the db.
1514 }
1515
1516 public SpyMessage loadFromStorage(MessageReference messageRef) throws JMSException
1517 {
1518 if (log.isTraceEnabled())
1519 log.trace("Loading message from storage " + messageRef);
1520
1521 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1522 tms.startTX();
1523 Connection c = null;
1524 PreparedStatement stmt = null;
1525 ResultSet rs = null;
1526 boolean threadWasInterrupted = Thread.interrupted();
1527 try
1528 {
1529
1530 c = this.getConnection();
1531 stmt = c.prepareStatement(SELECT_MESSAGE);
1532 stmt.setLong(1, messageRef.messageId);
1533 stmt.setString(2, messageRef.getPersistentKey());
1534
1535 rs = stmt.executeQuery();
1536 if (rs.next())
1537 return extractMessage(rs);
1538 else
1539 throw new SpyJMSException("Could not load message from storage: " + messageRef + " " + CONCURRENCY_WARNING);
1540
1541 }
1542 catch (Exception e)
1543 {
1544 tms.setRollbackOnly();
1545 SpyJMSException.rethrowAsJMSException("Could not load message : " + messageRef, e);
1546 throw new UnreachableStatementException();
1547 }
1548 finally
1549 {
1550 try
1551 {
1552 if (rs != null)
1553 rs.close();
1554 }
1555 catch (Throwable ignore)
1556 {
1557 }
1558 try
1559 {
1560 if (stmt != null)
1561 stmt.close();
1562 }
1563 catch (Throwable ignore)
1564 {
1565 }
1566 try
1567 {
1568 if (c != null)
1569 c.close();
1570 }
1571 catch (Throwable ignore)
1572 {
1573 }
1574 tms.endTX();
1575
1576 // Restore the interrupted state of the thread
1577 if (threadWasInterrupted)
1578 Thread.currentThread().interrupt();
1579 }
1580 }
1581
1582 /////////////////////////////////////////////////////////////////////////////////
1583 //
1584 // CacheStore Functions
1585 //
1586 /////////////////////////////////////////////////////////////////////////////////
1587 public void removeFromStorage(MessageReference messageRef) throws JMSException
1588 {
1589 // We don't remove persistent messages sent to persistent queues
1590 if (messageRef.isPersistent())
1591 return;
1592
1593 boolean trace = log.isTraceEnabled();
1594 if (trace)
1595 log.trace("Removing message from storage " + messageRef);
1596
1597 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1598 tms.startTX();
1599 Connection c = null;
1600 PreparedStatement stmt = null;
1601 boolean threadWasInterrupted = Thread.interrupted();
1602 try
1603 {
1604 c = this.getConnection();
1605 stmt = c.prepareStatement(DELETE_MESSAGE);
1606 stmt.setLong(1, messageRef.messageId);
1607 stmt.setString(2, messageRef.getPersistentKey());
1608 stmt.executeUpdate();
1609 messageRef.setStored(MessageReference.NOT_STORED);
1610
1611 if (trace)
1612 log.trace("Removed message from storage " + messageRef);
1613 }
1614 catch (SQLException e)
1615 {
1616 tms.setRollbackOnly();
1617 throw new SpyJMSException("Could not remove message: " + messageRef, e);
1618 }
1619 finally
1620 {
1621 try
1622 {
1623 if (stmt != null)
1624 stmt.close();
1625 }
1626 catch (Throwable ignore)
1627 {
1628 }
1629 try
1630 {
1631 if (c != null)
1632 c.close();
1633 }
1634 catch (Throwable ignore)
1635 {
1636 }
1637 tms.endTX();
1638
1639 // Restore the interrupted state of the thread
1640 if (threadWasInterrupted)
1641 Thread.currentThread().interrupt();
1642 }
1643 }
1644
1645 public void saveToStorage(MessageReference messageRef, SpyMessage message) throws JMSException
1646 {
1647 // Ignore save operations for persistent messages sent to persistent queues
1648 // The queues handle the persistence
1649 if (messageRef.isPersistent())
1650 return;
1651
1652 boolean trace = log.isTraceEnabled();
1653 if (trace)
1654 log.trace("Saving message to storage " + messageRef);
1655
1656 TransactionManagerStrategy tms = new TransactionManagerStrategy();
1657 tms.startTX();
1658 Connection c = null;
1659 boolean threadWasInterrupted = Thread.interrupted();
1660 try
1661 {
1662
1663 c = this.getConnection();
1664 add(c, messageRef.getPersistentKey(), message, null, "T");
1665 messageRef.setStored(MessageReference.STORED);
1666
1667 if (trace)
1668 log.trace("Saved message to storage " + messageRef);
1669 }
1670 catch (IOException e)
1671 {
1672 tms.setRollbackOnly();
1673 throw new SpyJMSException("Could not store message: " + messageRef, e);
1674 }
1675 catch (SQLException e)
1676 {
1677 tms.setRollbackOnly();
1678 throw new SpyJMSException("Could not store message: " + messageRef, e);
1679 }
1680 finally
1681 {
1682 try
1683 {
1684 if (c != null)
1685 c.close();
1686 }
1687 catch (Throwable ignore)
1688 {
1689 }
1690 tms.endTX();
1691
1692 // Restore the interrupted state of the thread
1693 if (threadWasInterrupted)
1694 Thread.currentThread().interrupt();
1695 }
1696 }
1697
1698 /**
1699 * Gets a connection from the datasource, retrying as needed. This was
1700 * implemented because in some minimal configurations (i.e. little logging
1701 * and few services) the database wasn't ready when we tried to get a
1702 * connection. We, therefore, implement a retry loop wich is controled
1703 * by the ConnectionRetryAttempts attribute. Submitted by terry@amicas.com
1704 *
1705 * @return the connection
1706 * @exception SQLException if an error occurs.
1707 */
1708 protected Connection getConnection() throws SQLException
1709 {
1710 int attempts = this.connectionRetryAttempts;
1711 int attemptCount = 0;
1712 SQLException sqlException = null;
1713 while (attempts-- > 0)
1714 {
1715 if (++attemptCount > 1)
1716 {
1717 log.debug("Retrying connection: attempt # " + attemptCount);
1718 }
1719 try
1720 {
1721 sqlException = null;
1722 return datasource.getConnection();
1723 }
1724 catch (SQLException exception)
1725 {
1726 log.debug("Connection attempt # " + attemptCount + " failed with SQLException", exception);
1727 sqlException = exception;
1728 }
1729 finally
1730 {
1731 if (sqlException == null && attemptCount > 1)
1732 {
1733 log.debug("Connection succeeded on attempt # " + attemptCount);
1734 }
1735 }
1736
1737 if (attempts > 0)
1738 {
1739 try
1740 {
1741 Thread.sleep(1500);
1742 }
1743 catch (InterruptedException interruptedException)
1744 {
1745 break;
1746 }
1747 }
1748 }
1749 if (sqlException != null)
1750 {
1751 throw sqlException;
1752 }
1753 throw new SQLException("connection attempt interrupted");
1754 }
1755
1756 /////////////////////////////////////////////////////////////////////////////////
1757 //
1758 // JMX Interface
1759 //
1760 /////////////////////////////////////////////////////////////////////////////////
1761
1762 /** The object name of the DataSource */
1763 protected ObjectName connectionManagerName;
1764
1765 /** The SQL properties */
1766 protected Properties sqlProperties = new Properties();
1767
1768 public void startService() throws Exception
1769 {
1770 UPDATE_MARKED_MESSAGES = sqlProperties.getProperty("UPDATE_MARKED_MESSAGES", UPDATE_MARKED_MESSAGES);
1771 UPDATE_MARKED_MESSAGES_XARECOVERY =
1772 sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_XARECOVERY", UPDATE_MARKED_MESSAGES_XARECOVERY);
1773 UPDATE_MARKED_MESSAGES_WITH_TX =
1774 sqlProperties.getProperty("UPDATE_MARKED_MESSAGES_WITH_TX", UPDATE_MARKED_MESSAGES_WITH_TX);
1775 DELETE_MARKED_MESSAGES_WITH_TX =
1776 sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX", DELETE_MARKED_MESSAGES_WITH_TX);
1777 DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY =
1778 sqlProperties.getProperty("DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY", DELETE_MARKED_MESSAGES_WITH_TX_XARECOVERY);
1779 DELETE_TX = sqlProperties.getProperty("DELETE_TX", DELETE_TX);
1780 DELETE_MARKED_MESSAGES = sqlProperties.getProperty("DELETE_MARKED_MESSAGES", DELETE_MARKED_MESSAGES);
1781 DELETE_TEMPORARY_MESSAGES = sqlProperties.getProperty("DELETE_TEMPORARY_MESSAGES", DELETE_TEMPORARY_MESSAGES);
1782 INSERT_TX = sqlProperties.getProperty("INSERT_TX", INSERT_TX);
1783 INSERT_TX_XARECOVERY = sqlProperties.getProperty("INSERT_TX_XARECOVERY", INSERT_TX_XARECOVERY);
1784 DELETE_ALL_TX = sqlProperties.getProperty("DELETE_ALL_TX", DELETE_ALL_TX);
1785 DELETE_ALL_TX_XARECOVERY = sqlProperties.getProperty("DELETE_ALL_TX_XARECOVERY", DELETE_ALL_TX_XARECOVERY);
1786 SELECT_ALL_TX_XARECOVERY = sqlProperties.getProperty("SELECT_ALL_TX_XARECOVERY", SELECT_ALL_TX_XARECOVERY);
1787 SELECT_MAX_TX = sqlProperties.getProperty("SELECT_MAX_TX", SELECT_MAX_TX);
1788 SELECT_MESSAGES_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST", SELECT_MESSAGES_IN_DEST);
1789 SELECT_MESSAGES_IN_DEST_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGES_IN_DEST_XARECOVERY", SELECT_MESSAGES_IN_DEST_XARECOVERY);
1790 SELECT_MESSAGE_KEYS_IN_DEST = sqlProperties.getProperty("SELECT_MESSAGE_KEYS_IN_DEST", SELECT_MESSAGE_KEYS_IN_DEST);
1791 SELECT_MESSAGE = sqlProperties.getProperty("SELECT_MESSAGE", SELECT_MESSAGE);
1792 SELECT_MESSAGE_XARECOVERY = sqlProperties.getProperty("SELECT_MESSAGE_XARECOVERY", SELECT_MESSAGE_XARECOVERY);
1793 INSERT_MESSAGE = sqlProperties.getProperty("INSERT_MESSAGE", INSERT_MESSAGE);
1794 MARK_MESSAGE = sqlProperties.getProperty("MARK_MESSAGE", MARK_MESSAGE);
1795 DELETE_MESSAGE = sqlProperties.getProperty("DELETE_MESSAGE", DELETE_MESSAGE);
1796 UPDATE_MESSAGE = sqlProperties.getProperty("UPDATE_MESSAGE", UPDATE_MESSAGE);
1797 CREATE_MESSAGE_TABLE = sqlProperties.getProperty("CREATE_MESSAGE_TABLE", CREATE_MESSAGE_TABLE);
1798 CREATE_IDX_MESSAGE_TXOP_TXID = sqlProperties.getProperty("CREATE_IDX_MESSAGE_TXOP_TXID", CREATE_IDX_MESSAGE_TXOP_TXID);
1799 CREATE_IDX_MESSAGE_DESTINATION = sqlProperties.getProperty("CREATE_IDX_MESSAGE_DESTINATION", CREATE_IDX_MESSAGE_DESTINATION);
1800 CREATE_TX_TABLE = sqlProperties.getProperty("CREATE_TX_TABLE", CREATE_TX_TABLE);
1801 CREATE_TX_TABLE_XARECOVERY = sqlProperties.getProperty("CREATE_TX_TABLE_XARECOVERY", CREATE_TX_TABLE_XARECOVERY);
1802 createTables = sqlProperties.getProperty("CREATE_TABLES_ON_STARTUP", "true").equalsIgnoreCase("true");
1803 String s = sqlProperties.getProperty("BLOB_TYPE", "OBJECT_BLOB");
1804
1805 if (s.equals("OBJECT_BLOB"))
1806 {
1807 blobType = OBJECT_BLOB;
1808 }
1809 else if (s.equals("BYTES_BLOB"))
1810 {
1811 blobType = BYTES_BLOB;
1812 }
1813 else if (s.equals("BINARYSTREAM_BLOB"))
1814 {
1815 blobType = BINARYSTREAM_BLOB;
1816 }
1817 else if (s.equals("BLOB_BLOB"))
1818 {
1819 blobType = BLOB_BLOB;
1820 }
1821
1822
1823 // initialize tm and datasource
1824 initializeFields();
1825
1826 log.debug("Creating Schema");
1827 try
1828 {
1829 createSchema();
1830 }
1831 catch (Exception e)
1832 {
1833 log.warn("Error creating schema", e);
1834 }
1835
1836 log.debug("Resolving uncommited TXS");
1837 Throwable error = null;
1838 for (int i = 0; i <= recoveryRetries; ++i)
1839 {
1840 try
1841 {
1842 resolveAllUncommitedTXs();
1843
1844 // done
1845 break;
1846 }
1847 catch (Throwable t)
1848 {
1849 if (i < recoveryRetries)
1850 log.warn("Error resolving transactions retries=" + i + " of " + recoveryRetries, t);
1851 else
1852 error = t;
1853 }
1854 }
1855
1856 if (error != null)
1857 SpyJMSException.rethrowAsJMSException("Unable to resolve transactions retries=" + recoveryRetries, error);
1858 }
1859
1860 protected void initializeFields()
1861 throws MBeanException, AttributeNotFoundException, InstanceNotFoundException, ReflectionException, NamingException
1862 {
1863 //Find the ConnectionFactoryLoader MBean so we can find the datasource
1864 String dsName = (String) getServer().getAttribute(connectionManagerName, "BindName");
1865 //Get an InitialContext
1866
1867 InitialContext ctx = new InitialContext();
1868 datasource = (DataSource) ctx.lookup(dsName);
1869
1870 //Get the Transaction Manager so we can control the jdbc tx
1871 tm = TransactionManagerLocator.locateTransactionManager();
1872 }
1873
1874 public Object getInstance()
1875 {
1876 return this;
1877 }
1878
1879 public ObjectName getMessageCache()
1880 {
1881 throw new UnsupportedOperationException("This is now set on the destination manager");
1882 }
1883
1884 public void setMessageCache(ObjectName messageCache)
1885 {
1886 throw new UnsupportedOperationException("This is now set on the destination manager");
1887 }
1888
1889 public ObjectName getConnectionManager()
1890 {
1891 return connectionManagerName;
1892 }
1893
1894 public void setConnectionManager(ObjectName connectionManagerName)
1895 {
1896 this.connectionManagerName = connectionManagerName;
1897 }
1898
1899 public MessageCache getMessageCacheInstance()
1900 {
1901 throw new UnsupportedOperationException("This is now set on the destination manager");
1902 }
1903
1904 public String getSqlProperties()
1905 {
1906 try
1907 {
1908 ByteArrayOutputStream boa = new ByteArrayOutputStream();
1909 sqlProperties.store(boa, "");
1910 return new String(boa.toByteArray());
1911 }
1912 catch (IOException shouldnothappen)
1913 {
1914 return "";
1915 }
1916 }
1917
1918 public void setSqlProperties(String value)
1919 {
1920 try
1921 {
1922 ByteArrayInputStream is = new ByteArrayInputStream(value.getBytes());
1923 sqlProperties = new Properties();
1924 sqlProperties.load(is);
1925 }
1926 catch (IOException shouldnothappen)
1927 {
1928 }
1929 }
1930
1931 public void setConnectionRetryAttempts(int value)
1932 {
1933 this.connectionRetryAttempts = value;
1934 }
1935
1936 public int getConnectionRetryAttempts()
1937 {
1938 return this.connectionRetryAttempts;
1939 }
1940
1941 public int getRecoveryTimeout()
1942 {
1943 return recoveryTimeout;
1944 }
1945
1946 public void setRecoveryTimeout(int timeout)
1947 {
1948 this.recoveryTimeout = timeout;
1949 }
1950
1951 public int getRecoveryRetries()
1952 {
1953 return recoveryRetries;
1954 }
1955
1956 public void setRecoveryRetries(int retries)
1957 {
1958 this.recoveryRetries = retries;
1959 }
1960
1961 public int getRecoverMessagesChunk()
1962 {
1963 return recoverMessagesChunk;
1964 }
1965
1966 public void setRecoverMessagesChunk(int recoverMessagesChunk)
1967 {
1968 if (recoverMessagesChunk != 0 && recoverMessagesChunk != 1)
1969 {
1970 log.warn("Only the values 0 and 1 are currently support for chunk size, using chunk size=1");
1971 recoverMessagesChunk = 1;
1972 }
1973 this.recoverMessagesChunk = recoverMessagesChunk;
1974 }
1975
1976 public boolean isXARecovery()
1977 {
1978 return xaRecovery;
1979 }
1980
1981 public void setXARecovery(boolean xaRecovery)
1982 {
1983 this.xaRecovery = xaRecovery;
1984 }
1985
1986 public int getStatementRetries()
1987 {
1988 return statementRetries;
1989 }
1990
1991 public void setStatementRetries(int statementRetries)
1992 {
1993 if (statementRetries < 0)
1994 statementRetries = 0;
1995 this.statementRetries = statementRetries;
1996 }
1997 }