Source code: org/activemq/store/jdbc/adapter/DefaultStatementProvider.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.store.jdbc.adapter;
19
20 import org.activemq.store.jdbc.StatementProvider;
21
22
23 /**
24 * @version $Revision: 1.1 $
25 */
26 public class DefaultStatementProvider implements StatementProvider {
27
28 protected String tablePrefix = "";
29 protected String messageTableName = "ACTIVEMQ_MSGS";
30 protected String txTableName = "ACTIVEMQ_TXS";
31 protected String durableSubAcksTableName = "ACTIVEMQ_ACKS";
32
33 protected String binaryDataType = "BLOB";
34 protected String containerNameDataType = "VARCHAR(250)";
35 protected String xidDataType = "VARCHAR(250)";
36 protected String msgIdDataType = "VARCHAR(250)";
37 protected String subscriptionIdDataType = "VARCHAR(250)";
38 protected String sequenceDataType = "INTEGER";
39 protected String charDataType = "CHAR(1)";
40 protected String longDataType = "BIGINT";
41 protected String stringIdDataType = "VARCHAR(250)";
42
43 public String [] getCreateSchemaStatments() {
44 return new String[]{
45 "CREATE TABLE "+tablePrefix+messageTableName+"("
46 +"ID "+sequenceDataType+" NOT NULL"
47 +", CONTAINER "+containerNameDataType
48 +", MSGID "+msgIdDataType
49 +", MSG "+binaryDataType
50 +", PRIMARY KEY ( ID ) )",
51 "CREATE INDEX "+tablePrefix+messageTableName+"_MIDX ON "+tablePrefix+messageTableName+" (MSGID)",
52 "CREATE INDEX "+tablePrefix+messageTableName+"_CIDX ON "+tablePrefix+messageTableName+" (CONTAINER)",
53
54 "CREATE TABLE "+tablePrefix+txTableName+"("
55 +"XID "+xidDataType+" NOT NULL"
56 +", PRIMARY KEY ( XID ))",
57
58 "CREATE TABLE "+tablePrefix+durableSubAcksTableName+"("
59 +"SUB "+subscriptionIdDataType+" NOT NULL"
60 +", CONTAINER "+containerNameDataType+" NOT NULL"
61 +", LAST_ACKED_ID "+sequenceDataType
62 +", SE_ID INTEGER"
63 +", SE_CLIENT_ID "+stringIdDataType
64 +", SE_CONSUMER_NAME "+stringIdDataType
65 +", SE_SELECTOR "+stringIdDataType
66 +", PRIMARY KEY ( SUB, CONTAINER ))",
67 "CREATE INDEX "+tablePrefix+durableSubAcksTableName+"_CIDX ON "+tablePrefix+durableSubAcksTableName+" (CONTAINER)",
68 "ALTER TABLE "+tablePrefix+messageTableName+" ADD EXPIRATION "+longDataType,
69 "ALTER TABLE "+tablePrefix+messageTableName+" ADD SENT_TO_DEADLETTER "+charDataType
70 };
71 }
72
73 public String [] getDropSchemaStatments() {
74 return new String[]{
75 "DROP TABLE "+tablePrefix+durableSubAcksTableName+"",
76 "DROP TABLE "+tablePrefix+messageTableName+"",
77 "DROP TABLE "+tablePrefix+txTableName+""
78 };
79 }
80
81 public String getAddMessageStatment() {
82 return "INSERT INTO "+tablePrefix+messageTableName+"(ID, CONTAINER, MSGID, MSG, EXPIRATION) VALUES (?, ?, ?, ?, ?)";
83 }
84 public String getUpdateMessageStatment() {
85 return "UPDATE "+tablePrefix+messageTableName+" SET MSG=? WHERE ID=?";
86 }
87 public String getRemoveMessageStatment() {
88 return "DELETE FROM "+tablePrefix+messageTableName+" WHERE ID=?";
89 }
90 public String getFindMessageSequenceIdStatment() {
91 return "SELECT ID FROM "+tablePrefix+messageTableName+" WHERE MSGID=?";
92 }
93 public String getFindMessageStatment() {
94 return "SELECT MSG FROM "+tablePrefix+messageTableName+" WHERE ID=?";
95 }
96 public String getFindMessageAttributesStatment() {
97 return "SELECT CONTAINER, MSGID, SENT_TO_DEADLETTER FROM "+tablePrefix+messageTableName+" WHERE ID=?";
98 }
99 public String getFindMessageAttributesForUpdateStatment() {
100 return "SELECT CONTAINER, MSGID, SENT_TO_DEADLETTER FROM "+tablePrefix+messageTableName+" WHERE ID=? FOR UPDATE";
101 }
102 public String getFindAllMessagesStatment() {
103 return "SELECT ID, MSGID FROM "+tablePrefix+messageTableName+" WHERE CONTAINER=? ORDER BY ID";
104 }
105 public String getFindLastSequenceIdInMsgs() {
106 return "SELECT MAX(ID) FROM "+tablePrefix+messageTableName;
107 }
108 public String getFindLastSequenceIdInAcks() {
109 return "SELECT MAX(LAST_ACKED_ID) FROM "+tablePrefix+durableSubAcksTableName;
110 }
111
112 public String getAddXidStatment() {
113 return "INSERT INTO "+tablePrefix+txTableName+"(XID) VALUES (?)";
114 }
115 public String getRemoveXidStatment() {
116 return "DELETE FROM "+tablePrefix+txTableName+" WHERE XID=?";
117 }
118 public String getFindAllXidStatment() {
119 return "SELECT XID FROM "+tablePrefix+txTableName+"";
120 }
121
122 public String getCreateDurableSubStatment() {
123 return "INSERT INTO "+tablePrefix+durableSubAcksTableName
124 +"(SE_ID, SE_CLIENT_ID, SE_CONSUMER_NAME, SE_SELECTOR, SUB, CONTAINER, LAST_ACKED_ID) "
125 +"VALUES (?, ?, ?, ?, ?, ?, ?)";
126 }
127
128 public String getUpdateDurableSubStatment() {
129 return "UPDATE "+tablePrefix+durableSubAcksTableName
130 +" SET SE_ID=?, SE_CLIENT_ID=?, SE_CONSUMER_NAME=?, SE_SELECTOR=? WHERE SUB=? AND CONTAINER=?";
131 }
132
133 public String getFindDurableSubStatment() {
134 return "SELECT SE_ID, SE_CLIENT_ID, SE_CONSUMER_NAME, SE_SELECTOR, CONTAINER=? "+tablePrefix+durableSubAcksTableName
135 +" WHERE SUB=? AND CONTAINER=?";
136 }
137
138 public String getUpdateLastAckOfDurableSub() {
139 return "UPDATE "+tablePrefix+durableSubAcksTableName
140 +" SET LAST_ACKED_ID=? WHERE SUB=? AND CONTAINER=?";
141 }
142
143 public String getDeleteSubscriptionStatment() {
144 return "DELETE FROM "+tablePrefix+durableSubAcksTableName
145 +" WHERE SUB=? AND CONTAINER=?";
146 }
147
148 public String getFindAllDurableSubMessagesStatment() {
149 return "SELECT M.ID, M.MSGID FROM "
150 +tablePrefix+messageTableName+" M, "
151 +tablePrefix+durableSubAcksTableName +" D "
152 +" WHERE D.CONTAINER=? AND D.SUB=? "
153 +" AND M.CONTAINER=D.CONTAINER AND M.ID > D.LAST_ACKED_ID"
154 +" ORDER BY M.ID";
155 }
156
157
158 public String getRemoveAllMessagesStatment() {
159 return "DELETE FROM "+tablePrefix+messageTableName+" WHERE CONTAINER=?";
160 }
161
162 public String getRemoveAllSubscriptionsStatment() {
163 return "DELETE FROM "+tablePrefix+durableSubAcksTableName+" WHERE CONTAINER=?";
164 }
165
166 public String getDeleteOldMessagesStatment() {
167 return "DELETE FROM "+tablePrefix+messageTableName+
168 " WHERE ID <= ( SELECT MIN("+tablePrefix+durableSubAcksTableName+".LAST_ACKED_ID) " +
169 "FROM "+tablePrefix+durableSubAcksTableName+" WHERE " +
170 tablePrefix+durableSubAcksTableName+".CONTAINER="+tablePrefix+messageTableName+
171 ".CONTAINER)";
172 }
173
174 public String getFindExpiredMessagesStatment() {
175 return "SELECT ID, CONTAINER, MSGID, SENT_TO_DEADLETTER FROM "+tablePrefix+messageTableName+
176 " WHERE ( EXPIRATION<>0 AND EXPIRATION<?)";
177 }
178
179 public String getSetDeadLetterFlagStatement() {
180 return "UPDATE "+tablePrefix+messageTableName
181 +" SET SENT_TO_DEADLETTER='Y' WHERE ID=?";
182 }
183
184 public String getDeleteMessageStatement() {
185 return "DELETE FROM "+tablePrefix+messageTableName
186 +" WHERE ID=? AND MSGID=?";
187 }
188
189 /**
190 * @return Returns the containerNameDataType.
191 */
192 public String getContainerNameDataType() {
193 return containerNameDataType;
194 }
195 /**
196 * @param containerNameDataType The containerNameDataType to set.
197 */
198 public void setContainerNameDataType(String containerNameDataType) {
199 this.containerNameDataType = containerNameDataType;
200 }
201 /**
202 * @return Returns the messageDataType.
203 */
204 public String getBinaryDataType() {
205 return binaryDataType;
206 }
207 /**
208 * @param messageDataType The messageDataType to set.
209 */
210 public void setBinaryDataType(String messageDataType) {
211 this.binaryDataType = messageDataType;
212 }
213 /**
214 * @return Returns the messageTableName.
215 */
216 public String getMessageTableName() {
217 return messageTableName;
218 }
219 /**
220 * @param messageTableName The messageTableName to set.
221 */
222 public void setMessageTableName(String messageTableName) {
223 this.messageTableName = messageTableName;
224 }
225 /**
226 * @return Returns the msgIdDataType.
227 */
228 public String getMsgIdDataType() {
229 return msgIdDataType;
230 }
231 /**
232 * @param msgIdDataType The msgIdDataType to set.
233 */
234 public void setMsgIdDataType(String msgIdDataType) {
235 this.msgIdDataType = msgIdDataType;
236 }
237 /**
238 * @return Returns the sequenceDataType.
239 */
240 public String getSequenceDataType() {
241 return sequenceDataType;
242 }
243 /**
244 * @param sequenceDataType The sequenceDataType to set.
245 */
246 public void setSequenceDataType(String sequenceDataType) {
247 this.sequenceDataType = sequenceDataType;
248 }
249 /**
250 * @return Returns the tablePrefix.
251 */
252 public String getTablePrefix() {
253 return tablePrefix;
254 }
255 /**
256 * @param tablePrefix The tablePrefix to set.
257 */
258 public void setTablePrefix(String tablePrefix) {
259 this.tablePrefix = tablePrefix;
260 }
261 /**
262 * @return Returns the txTableName.
263 */
264 public String getTxTableName() {
265 return txTableName;
266 }
267 /**
268 * @param txTableName The txTableName to set.
269 */
270 public void setTxTableName(String txTableName) {
271 this.txTableName = txTableName;
272 }
273 /**
274 * @return Returns the xidDataType.
275 */
276 public String getXidDataType() {
277 return xidDataType;
278 }
279 /**
280 * @param xidDataType The xidDataType to set.
281 */
282 public void setXidDataType(String xidDataType) {
283 this.xidDataType = xidDataType;
284 }
285 /**
286 * @return Returns the durableSubAcksTableName.
287 */
288 public String getDurableSubAcksTableName() {
289 return durableSubAcksTableName;
290 }
291 /**
292 * @param durableSubAcksTableName The durableSubAcksTableName to set.
293 */
294 public void setDurableSubAcksTableName(String durableSubAcksTableName) {
295 this.durableSubAcksTableName = durableSubAcksTableName;
296 }
297 /**
298 * @return Returns the subscriptionIdDataType.
299 */
300 public String getSubscriptionIdDataType() {
301 return subscriptionIdDataType;
302 }
303 /**
304 * @param subscriptionIdDataType The subscriptionIdDataType to set.
305 */
306 public void setSubscriptionIdDataType(String subscriptionIdDataType) {
307 this.subscriptionIdDataType = subscriptionIdDataType;
308 }
309
310 public String getLongDataType() {
311 return longDataType;
312 }
313
314 public void setLongDataType(String longDataType) {
315 this.longDataType = longDataType;
316 }
317
318 public String getStringIdDataType() {
319 return stringIdDataType;
320 }
321
322 public void setStringIdDataType(String stringIdDataType) {
323 this.stringIdDataType = stringIdDataType;
324 }
325
326 }