Source code: org/activemq/store/journal/JournalTopicMessageStore.java
1 /**
2 *
3 * Copyright 2004 Hiram Chirino
4 * Copyright 2004 Protique Ltd
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 **/
19 package org.activemq.store.journal;
20
21 import java.util.HashMap;
22 import java.util.Iterator;
23
24 import javax.jms.JMSException;
25
26 import org.activeio.journal.RecordLocation;
27 import org.activemq.message.ConsumerInfo;
28 import org.activemq.service.MessageIdentity;
29 import org.activemq.service.SubscriberEntry;
30 import org.activemq.service.Transaction;
31 import org.activemq.service.TransactionManager;
32 import org.activemq.service.TransactionTask;
33 import org.activemq.store.RecoveryListener;
34 import org.activemq.store.TopicMessageStore;
35 import org.activemq.util.Callback;
36 import org.apache.commons.logging.Log;
37 import org.apache.commons.logging.LogFactory;
38
39 /**
40 * A MessageStore that uses a Journal to store it's messages.
41 *
42 * @version $Revision: 1.1 $
43 */
44 public class JournalTopicMessageStore extends JournalMessageStore implements TopicMessageStore {
45 private static final Log log = LogFactory.getLog(JournalTopicMessageStore.class);
46
47 private TopicMessageStore longTermStore;
48 private HashMap ackedLastAckLocations = new HashMap();
49
50 public JournalTopicMessageStore(JournalPersistenceAdapter adapter, TopicMessageStore checkpointStore, String destinationName) {
51 super(adapter, checkpointStore, destinationName);
52 this.longTermStore = checkpointStore;
53 }
54
55 public void recoverSubscription(String subscriptionId, MessageIdentity lastDispatchedMessage, RecoveryListener listener) throws JMSException {
56 peristenceAdapter.checkpoint(true);
57 longTermStore.recoverSubscription(subscriptionId, lastDispatchedMessage, listener);
58 }
59
60 public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
61 return longTermStore.getSubscriberEntry(info);
62 }
63
64 public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
65 peristenceAdapter.checkpoint(true);
66 longTermStore.setSubscriberEntry(info, subscriberEntry);
67 }
68
69 public MessageIdentity getLastestMessageIdentity() throws JMSException {
70 return longTermStore.getLastestMessageIdentity();
71 }
72
73 public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
74 longTermStore.incrementMessageCount(messageId);
75 }
76
77 public void decrementMessageCountAndMaybeDelete(MessageIdentity messageId) throws JMSException {
78 longTermStore.decrementMessageCountAndMaybeDelete(messageId);
79 }
80
81 /**
82 */
83 public void setLastAcknowledgedMessageIdentity(final String subscription, final MessageIdentity messageIdentity) throws JMSException {
84 final boolean debug = log.isDebugEnabled();
85 final RecordLocation location = peristenceAdapter.writePacket(destinationName, subscription, messageIdentity, false);
86 if( !TransactionManager.isCurrentTransaction() ) {
87 if( debug )
88 log.debug("Journalled acknowledge: "+messageIdentity.getMessageID()+" at "+location);
89 acknowledge(subscription, messageIdentity, location);
90 } else {
91 if( debug )
92 log.debug("Journalled in flight acknowledge: "+messageIdentity.getMessageID()+" at "+location);
93
94 synchronized (this) {
95 inFlightTxLocations.add(location);
96 }
97 final Transaction tx = TransactionManager.getContexTransaction();
98 JournalAck ack = new JournalAck(destinationName,subscription,messageIdentity.getMessageID(), tx.getTransactionId());
99 transactionStore.acknowledge(this, ack, location);
100 tx.addPostCommitTask(new TransactionTask(){
101 public void execute() throws Throwable {
102 if( debug )
103 log.debug("In flight acknowledge commit: "+messageIdentity.getMessageID()+" at "+location);
104
105 synchronized (JournalTopicMessageStore.this) {
106 inFlightTxLocations.remove(location);
107 acknowledge(subscription, messageIdentity, location);
108 }
109 }
110 });
111 tx.addPostRollbackTask(new TransactionTask(){
112 public void execute() throws Throwable {
113 if( debug )
114 log.debug("In flight acknowledge rollback: "+messageIdentity.getMessageID()+" at "+location);
115 // TODO Auto-generated method stub
116 synchronized (JournalTopicMessageStore.this) {
117 inFlightTxLocations.remove(location);
118 }
119 }
120 });
121
122 }
123 }
124
125 private void acknowledge(String subscription, MessageIdentity messageIdentity, RecordLocation location) {
126 synchronized(this) {
127 lastLocation = location;
128 ackedLastAckLocations.put(subscription,messageIdentity);
129 }
130 }
131
132 public RecordLocation checkpoint() throws JMSException {
133
134 // swap the acks before check pointing the added messages since we don't want to ack
135 // a message that has not been checkpointed yet.
136 final HashMap cpAckedLastAckLocations;
137 synchronized(this) {
138 cpAckedLastAckLocations = this.ackedLastAckLocations;
139 this.ackedLastAckLocations = new HashMap();
140 }
141
142 // Check point the added messages.
143 RecordLocation rc = super.checkpoint();
144
145 if( log.isDebugEnabled() ) {
146 log.debug("Checkpoint acknowledgments: "+cpAckedLastAckLocations);
147 }
148
149 transactionTemplate.run(new Callback() {
150 public void execute() throws Throwable {
151
152 // Checkpoint the acknowledged messages.
153 Iterator iterator = cpAckedLastAckLocations.keySet().iterator();
154 while (iterator.hasNext()) {
155 String subscription = (String) iterator.next();
156 MessageIdentity identity = (MessageIdentity) cpAckedLastAckLocations.get(subscription);
157 longTermStore.setLastAcknowledgedMessageIdentity(subscription, identity);
158 }
159
160 }
161
162 });
163
164 return rc;
165 }
166
167 /**
168 * @return Returns the longTermStore.
169 */
170 public TopicMessageStore getLongTermTopicMessageStore() {
171 return longTermStore;
172 }
173
174 public void deleteSubscription(String subscription) throws JMSException {
175 peristenceAdapter.checkpoint(true);
176 longTermStore.deleteSubscription(subscription);
177 }
178
179 public void replayAcknowledge(String subscription, MessageIdentity identity) {
180 try {
181 longTermStore.setLastAcknowledgedMessageIdentity(subscription,identity);
182 }
183 catch (Throwable e) {
184 log.debug("Could not replay acknowledge for message '"+identity.getMessageID()+"'. Message may have already been acknowledged. reason: " + e);
185 }
186 }
187
188 }