Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/activemq/store/jdbc/JDBCMessageStore.java


1   /** 
2    * 
3    * Copyright 2004 Hiram Chirino
4    * 
5    * Licensed under the Apache License, Version 2.0 (the "License"); 
6    * you may not use this file except in compliance with the License. 
7    * You may obtain a copy of the License at 
8    * 
9    * http://www.apache.org/licenses/LICENSE-2.0
10   * 
11   * Unless required by applicable law or agreed to in writing, software
12   * distributed under the License is distributed on an "AS IS" BASIS, 
13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
14   * See the License for the specific language governing permissions and 
15   * limitations under the License. 
16   * 
17   **/
18  package org.activemq.store.jdbc;
19  
20  import java.io.IOException;
21  import java.sql.Connection;
22  import java.sql.SQLException;
23  
24  import javax.jms.JMSException;
25  
26  import org.activemq.io.WireFormat;
27  import org.activemq.message.ActiveMQMessage;
28  import org.activemq.message.MessageAck;
29  import org.activemq.service.MessageIdentity;
30  import org.activemq.store.MessageStore;
31  import org.activemq.store.RecoveryListener;
32  import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
33  import org.activemq.util.JMSExceptionHelper;
34  import org.activemq.util.LongSequenceGenerator;
35  
36  /**
37   * @version $Revision: 1.1 $
38   */
39  public class JDBCMessageStore implements MessageStore {
40      
41      protected final WireFormat wireFormat;
42      protected final String destinationName;
43      protected final LongSequenceGenerator sequenceGenerator;
44      protected final JDBCAdapter adapter;
45    protected final JDBCPersistenceAdapter persistenceAdapter;
46  
47      public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
48          this.persistenceAdapter = persistenceAdapter;
49      this.adapter = adapter;
50          this.sequenceGenerator = adapter.getSequenceGenerator();
51          this.wireFormat = wireFormat;
52          this.destinationName = destinationName;
53      }
54  
55      public void addMessage(ActiveMQMessage message) throws JMSException {
56          
57          // Serialize the Message..
58          String messageID = message.getJMSMessageID();
59          byte data[];
60          try {
61              data = wireFormat.toBytes(message);
62          } catch (IOException e) {
63              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
64          }
65          
66          long seq=sequenceGenerator.getNextSequenceId();
67  
68          // Get a connection and insert the message into the DB.
69          Connection c = null;
70          try {
71              c = persistenceAdapter.getConnection();            
72              adapter.doAddMessage(c, seq, messageID, destinationName, data, message.getJMSExpiration());
73          } catch (SQLException e) {
74              throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
75          } finally {
76            persistenceAdapter.returnConnection(c);
77          }
78  
79          MessageIdentity answer = message.getJMSMessageIdentity();
80          answer.setSequenceNumber(new Long(seq));
81      }
82  
83  
84      public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
85  
86        long id;
87          try {
88              id = getMessageSequenceId(identity);
89          } catch (JMSException e1) {
90              return null;
91          }
92          
93          // Get a connection and pull the message out of the DB
94          Connection c = null;
95          try {
96              c = persistenceAdapter.getConnection();            
97              byte data[] = adapter.doGetMessage(c, id);
98              if( data==null )
99                  return null;
100             
101             ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);;
102             answer.setJMSMessageID(identity.getMessageID());
103             answer.setJMSMessageIdentity(identity);
104             return answer;            
105         } catch (IOException e) {
106             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
107         } catch (SQLException e) {
108             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
109         } finally {
110           persistenceAdapter.returnConnection(c);
111         }
112     }
113 
114     /**
115      * @param identity
116      * @return
117      * @throws JMSException
118      */
119     protected long getMessageSequenceId(MessageIdentity identity) throws JMSException {
120         Object sequenceNumber = identity.getSequenceNumber();
121         if (sequenceNumber != null && sequenceNumber.getClass() == Long.class) {
122             return ((Long) sequenceNumber).longValue();
123         } else {
124             // Get a connection and pull the message out of the DB
125             Connection c = null;
126             try {
127                 c = persistenceAdapter.getConnection();
128                 Long rc = adapter.getMessageSequenceId(c, identity.getMessageID());
129                 if (rc == null)
130                     throw new JMSException("Could not locate message in database with message id: "
131                             + identity.getMessageID());
132                 return rc.longValue();
133             } catch (SQLException e) {
134                 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID()
135                         + " in container: " + e, e);
136             } finally {
137                 persistenceAdapter.returnConnection(c);
138             }
139         }
140     }
141 
142   public void removeMessage(MessageAck ack) throws JMSException {
143         long seq = getMessageSequenceId(ack.getMessageIdentity());
144 
145         // Get a connection and remove the message from the DB
146         Connection c = null;
147         try {
148             c = persistenceAdapter.getConnection();            
149             adapter.doRemoveMessage(c, seq);
150         } catch (SQLException e) {
151             throw JMSExceptionHelper.newJMSException("Failed to broker message: " + ack.getMessageID() + " in container: " + e, e);
152         } finally {
153           persistenceAdapter.returnConnection(c);
154         }
155     }
156 
157 
158     public void recover(final RecoveryListener listener) throws JMSException {
159         
160         // Get all the Message ids out of the database.
161         Connection c = null;
162         try {
163             c = persistenceAdapter.getConnection();            
164             adapter.doRecover(c, destinationName, new MessageListResultHandler() {
165                 public void onMessage(long seq, String messageID) throws JMSException {
166                     listener.recoverMessage(new MessageIdentity(messageID, new Long(seq)));                
167                 }
168             });     
169             
170         } catch (SQLException e) {
171             throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
172         } finally {
173           persistenceAdapter.returnConnection(c);
174         } 
175     }
176 
177     public void start() throws JMSException {
178     }
179 
180     public void stop() throws JMSException {
181     }
182 
183     /**
184      * @see org.activemq.store.MessageStore#removeAllMessages()
185      */
186     public void removeAllMessages() throws JMSException {
187         // Get a connection and remove the message from the DB
188         Connection c = null;
189         try {
190             c = persistenceAdapter.getConnection();            
191             adapter.doRemoveAllMessages(c, destinationName);
192         } catch (SQLException e) {
193             throw JMSExceptionHelper.newJMSException("Failed to broker remove all messages: " + e, e);
194         } finally {
195           persistenceAdapter.returnConnection(c);
196         }
197     }
198 }