Source code: org/activemq/store/jdbc/JDBCMessageStore.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.store.jdbc;
19
20 import java.io.IOException;
21 import java.sql.Connection;
22 import java.sql.SQLException;
23
24 import javax.jms.JMSException;
25
26 import org.activemq.io.WireFormat;
27 import org.activemq.message.ActiveMQMessage;
28 import org.activemq.message.MessageAck;
29 import org.activemq.service.MessageIdentity;
30 import org.activemq.store.MessageStore;
31 import org.activemq.store.RecoveryListener;
32 import org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
33 import org.activemq.util.JMSExceptionHelper;
34 import org.activemq.util.LongSequenceGenerator;
35
36 /**
37 * @version $Revision: 1.1 $
38 */
39 public class JDBCMessageStore implements MessageStore {
40
41 protected final WireFormat wireFormat;
42 protected final String destinationName;
43 protected final LongSequenceGenerator sequenceGenerator;
44 protected final JDBCAdapter adapter;
45 protected final JDBCPersistenceAdapter persistenceAdapter;
46
47 public JDBCMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
48 this.persistenceAdapter = persistenceAdapter;
49 this.adapter = adapter;
50 this.sequenceGenerator = adapter.getSequenceGenerator();
51 this.wireFormat = wireFormat;
52 this.destinationName = destinationName;
53 }
54
55 public void addMessage(ActiveMQMessage message) throws JMSException {
56
57 // Serialize the Message..
58 String messageID = message.getJMSMessageID();
59 byte data[];
60 try {
61 data = wireFormat.toBytes(message);
62 } catch (IOException e) {
63 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
64 }
65
66 long seq=sequenceGenerator.getNextSequenceId();
67
68 // Get a connection and insert the message into the DB.
69 Connection c = null;
70 try {
71 c = persistenceAdapter.getConnection();
72 adapter.doAddMessage(c, seq, messageID, destinationName, data, message.getJMSExpiration());
73 } catch (SQLException e) {
74 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + messageID + " in container: " + e, e);
75 } finally {
76 persistenceAdapter.returnConnection(c);
77 }
78
79 MessageIdentity answer = message.getJMSMessageIdentity();
80 answer.setSequenceNumber(new Long(seq));
81 }
82
83
84 public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
85
86 long id;
87 try {
88 id = getMessageSequenceId(identity);
89 } catch (JMSException e1) {
90 return null;
91 }
92
93 // Get a connection and pull the message out of the DB
94 Connection c = null;
95 try {
96 c = persistenceAdapter.getConnection();
97 byte data[] = adapter.doGetMessage(c, id);
98 if( data==null )
99 return null;
100
101 ActiveMQMessage answer = (ActiveMQMessage) wireFormat.fromBytes(data);;
102 answer.setJMSMessageID(identity.getMessageID());
103 answer.setJMSMessageIdentity(identity);
104 return answer;
105 } catch (IOException e) {
106 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
107 } catch (SQLException e) {
108 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID() + " in container: " + e, e);
109 } finally {
110 persistenceAdapter.returnConnection(c);
111 }
112 }
113
114 /**
115 * @param identity
116 * @return
117 * @throws JMSException
118 */
119 protected long getMessageSequenceId(MessageIdentity identity) throws JMSException {
120 Object sequenceNumber = identity.getSequenceNumber();
121 if (sequenceNumber != null && sequenceNumber.getClass() == Long.class) {
122 return ((Long) sequenceNumber).longValue();
123 } else {
124 // Get a connection and pull the message out of the DB
125 Connection c = null;
126 try {
127 c = persistenceAdapter.getConnection();
128 Long rc = adapter.getMessageSequenceId(c, identity.getMessageID());
129 if (rc == null)
130 throw new JMSException("Could not locate message in database with message id: "
131 + identity.getMessageID());
132 return rc.longValue();
133 } catch (SQLException e) {
134 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + identity.getMessageID()
135 + " in container: " + e, e);
136 } finally {
137 persistenceAdapter.returnConnection(c);
138 }
139 }
140 }
141
142 public void removeMessage(MessageAck ack) throws JMSException {
143 long seq = getMessageSequenceId(ack.getMessageIdentity());
144
145 // Get a connection and remove the message from the DB
146 Connection c = null;
147 try {
148 c = persistenceAdapter.getConnection();
149 adapter.doRemoveMessage(c, seq);
150 } catch (SQLException e) {
151 throw JMSExceptionHelper.newJMSException("Failed to broker message: " + ack.getMessageID() + " in container: " + e, e);
152 } finally {
153 persistenceAdapter.returnConnection(c);
154 }
155 }
156
157
158 public void recover(final RecoveryListener listener) throws JMSException {
159
160 // Get all the Message ids out of the database.
161 Connection c = null;
162 try {
163 c = persistenceAdapter.getConnection();
164 adapter.doRecover(c, destinationName, new MessageListResultHandler() {
165 public void onMessage(long seq, String messageID) throws JMSException {
166 listener.recoverMessage(new MessageIdentity(messageID, new Long(seq)));
167 }
168 });
169
170 } catch (SQLException e) {
171 throw JMSExceptionHelper.newJMSException("Failed to recover container. Reason: " + e, e);
172 } finally {
173 persistenceAdapter.returnConnection(c);
174 }
175 }
176
177 public void start() throws JMSException {
178 }
179
180 public void stop() throws JMSException {
181 }
182
183 /**
184 * @see org.activemq.store.MessageStore#removeAllMessages()
185 */
186 public void removeAllMessages() throws JMSException {
187 // Get a connection and remove the message from the DB
188 Connection c = null;
189 try {
190 c = persistenceAdapter.getConnection();
191 adapter.doRemoveAllMessages(c, destinationName);
192 } catch (SQLException e) {
193 throw JMSExceptionHelper.newJMSException("Failed to broker remove all messages: " + e, e);
194 } finally {
195 persistenceAdapter.returnConnection(c);
196 }
197 }
198 }