Source code: org/activemq/message/AbstractPacket.java
1 /**
2 *
3 * Copyright 2004 Protique Ltd
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18
19 package org.activemq.message;
20
21 import java.util.Iterator;
22 import java.util.List;
23 import java.util.ArrayList;
24 import java.util.StringTokenizer;
25
26 import org.activemq.util.BitArray;
27 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArraySet;
28
29 /**
30 * Abstract class for a transportable Packet
31 *
32 * @version $Revision: 1.1.1.1 $
33 */
34 public abstract class AbstractPacket implements Packet {
35
36 /**
37 * Message flag indexes (used for writing/reading to/from a Stream
38 */
39 public static final int RECEIPT_REQUIRED_INDEX = 0;
40 public static final int BROKERS_VISITED_INDEX =1;
41 private short id = 0;
42 protected BitArray bitArray;
43 protected transient int cachedHashCode = -1;
44 private boolean receiptRequired;
45 private transient int memoryUsage = 2048;
46 private transient int memoryUsageReferenceCount;
47
48 private CopyOnWriteArraySet brokersVisited;
49
50 protected AbstractPacket(){
51 this.bitArray = new BitArray();
52 }
53
54 /**
55 * @return the unique id for this Packet
56 */
57 public short getId() {
58 return this.id;
59 }
60
61 /**
62 * Set the unique id for this Packet
63 *
64 * @param newId
65 */
66 public void setId(short newId) {
67 this.id = newId;
68 }
69
70 /**
71 * @return true if a Recipt is required
72 */
73 public boolean isReceiptRequired() {
74 return this.receiptRequired;
75 }
76
77 /**
78 * @return false since most packets are not receipt packets
79 */
80 public boolean isReceipt() {
81 return false;
82 }
83
84 /**
85 * Set if a Recipt if required on receiving this Packet
86 *
87 * @param value
88 */
89 public void setReceiptRequired(boolean value) {
90 this.receiptRequired = value;
91 }
92
93 /**
94 * Retrieve if a JMS Message type or not
95 *
96 * @return true if it is a JMS Message
97 */
98 public boolean isJMSMessage() {
99 return false;
100 }
101
102 /**
103 * Tests equality with another instance
104 *
105 * @param obj - the other instance to test equality with
106 * @return Returns true if the objects are equilvant
107 */
108 public boolean equals(Object obj) {
109 boolean result = this == obj;
110 if (!result && obj != null && obj instanceof AbstractPacket) {
111 AbstractPacket other = (AbstractPacket) obj;
112 result = other.id == this.id;
113 }
114 return result;
115 }
116
117 /**
118 * @return Returns hash code for this instance
119 */
120 public int hashCode() {
121 return this.id;
122 }
123
124 /**
125 * Get a hint about how much memory this Packet is consuming
126 *
127 * @return an aproximation of the current memory used by this instance
128 */
129 public int getMemoryUsage() {
130 return memoryUsage;
131 }
132
133 /**
134 * Set a hint about how mujch memory this packet is consuming
135 *
136 * @param newMemoryUsage
137 */
138 public void setMemoryUsage(int newMemoryUsage) {
139 this.memoryUsage = newMemoryUsage;
140 }
141
142 /**
143 * Increment reference count for bounded memory collections
144 *
145 * @return the incremented reference value
146 * @see org.activemq.io.util.MemoryBoundedQueue
147 */
148 public synchronized int incrementMemoryReferenceCount() {
149 return ++memoryUsageReferenceCount;
150 }
151
152 /**
153 * Decrement reference count for bounded memory collections
154 *
155 * @return the decremented reference value
156 * @see org.activemq.io.util.MemoryBoundedQueue
157 */
158 public synchronized int decrementMemoryReferenceCount() {
159 return --memoryUsageReferenceCount;
160 }
161
162 /**
163 * @return the current reference count for bounded memory collections
164 * @see org.activemq.io.util.MemoryBoundedQueue
165 */
166 public synchronized int getMemoryUsageReferenceCount() {
167 return memoryUsageReferenceCount;
168 }
169
170 /**
171 * As the packet passes through the broker add the broker to the visited list
172 *
173 * @param brokerName the name of the broker
174 */
175 public void addBrokerVisited(String brokerName) {
176 if (brokerName == null || brokerName.trim().equals("")) {
177 throw new IllegalArgumentException("Broker name cannot be empty or null");
178 }
179 initializeBrokersVisited();
180 brokersVisited.add(brokerName);
181 }
182
183 /**
184 * clear list of brokers visited
185 */
186 public void clearBrokersVisited(){
187 brokersVisited = null;
188 }
189
190 /**
191 * test to see if the named broker has already seen this packet
192 *
193 * @param brokerName the name of the broker
194 * @return true if the packet has visited the broker
195 */
196 public boolean hasVisited(String brokerName) {
197 if (brokersVisited == null){
198 return false;
199 }
200 return brokersVisited.contains(brokerName);
201 }
202
203 /**
204 * @return Returns the brokersVisited.
205 */
206 public String getBrokersVisitedAsString() {
207 String result = "";
208 if (brokersVisited != null && !brokersVisited.isEmpty()){
209 for (Iterator i = brokersVisited.iterator(); i.hasNext();){
210 result += i.next().toString() + ",";
211 }
212 }
213 return result;
214 }
215
216 public void setBrokersVisitedAsString(String value) {
217 initializeBrokersVisited();
218 StringTokenizer enm = new StringTokenizer(value, ",");
219 while (enm.hasMoreElements()) {
220 brokersVisited.add(enm.nextToken());
221 }
222 }
223
224 /**
225 * @return pretty print of this Packet
226 */
227 public String toString() {
228 return getPacketTypeAsString(getPacketType()) + ": id = " + getId();
229 }
230
231
232 public static String getPacketTypeAsString(int type) {
233 String packetTypeStr = "";
234 switch (type) {
235 case ACTIVEMQ_MESSAGE:
236 packetTypeStr = "ACTIVEMQ_MESSAGE";
237 break;
238 case ACTIVEMQ_TEXT_MESSAGE:
239 packetTypeStr = "ACTIVEMQ_TEXT_MESSAGE";
240 break;
241 case ACTIVEMQ_OBJECT_MESSAGE:
242 packetTypeStr = "ACTIVEMQ_OBJECT_MESSAGE";
243 break;
244 case ACTIVEMQ_BYTES_MESSAGE:
245 packetTypeStr = "ACTIVEMQ_BYTES_MESSAGE";
246 break;
247 case ACTIVEMQ_STREAM_MESSAGE:
248 packetTypeStr = "ACTIVEMQ_STREAM_MESSAGE";
249 break;
250 case ACTIVEMQ_MAP_MESSAGE:
251 packetTypeStr = "ACTIVEMQ_MAP_MESSAGE";
252 break;
253 case ACTIVEMQ_MSG_ACK:
254 packetTypeStr = "ACTIVEMQ_MSG_ACK";
255 break;
256 case RECEIPT_INFO:
257 packetTypeStr = "RECEIPT_INFO";
258 break;
259 case CONSUMER_INFO:
260 packetTypeStr = "CONSUMER_INFO";
261 break;
262 case PRODUCER_INFO:
263 packetTypeStr = "PRODUCER_INFO";
264 break;
265 case TRANSACTION_INFO:
266 packetTypeStr = "TRANSACTION_INFO";
267 break;
268 case XA_TRANSACTION_INFO:
269 packetTypeStr = "XA_TRANSACTION_INFO";
270 break;
271 case ACTIVEMQ_BROKER_INFO:
272 packetTypeStr = "ACTIVEMQ_BROKER_INFO";
273 break;
274 case ACTIVEMQ_CONNECTION_INFO:
275 packetTypeStr = "ACTIVEMQ_CONNECTION_INFO";
276 break;
277 case SESSION_INFO:
278 packetTypeStr = "SESSION_INFO";
279 break;
280 case DURABLE_UNSUBSCRIBE:
281 packetTypeStr = "DURABLE_UNSUBSCRIBE";
282 break;
283 case RESPONSE_RECEIPT_INFO:
284 packetTypeStr = "RESPONSE_RECEIPT_INFO";
285 break;
286 case INT_RESPONSE_RECEIPT_INFO:
287 packetTypeStr = "INT_RESPONSE_RECEIPT_INFO";
288 break;
289 case CAPACITY_INFO:
290 packetTypeStr = "CAPACITY_INFO";
291 break;
292 case CAPACITY_INFO_REQUEST:
293 packetTypeStr = "CAPACITY_INFO_REQUEST";
294 break;
295 case WIRE_FORMAT_INFO:
296 packetTypeStr = "WIRE_FORMAT_INFO";
297 break;
298 case KEEP_ALIVE:
299 packetTypeStr = "KEEP_ALIVE";
300 break;
301 case CACHED_VALUE_COMMAND:
302 packetTypeStr = "CachedValue";
303 break;
304 default :
305 packetTypeStr = "UNKNOWN PACKET TYPE: " + type;
306 }
307 return packetTypeStr;
308 }
309
310 /**
311 * A helper method used when implementing equals() which returns true if the objects are identical or equal handling
312 * nulls properly
313 * @param left
314 * @param right
315 *
316 * @return true if the objects are the same or equal or both null
317 */
318 protected boolean equals(Object left, Object right) {
319 return left == right || (left != null && left.equals(right));
320 }
321
322 /**
323 * Initializes another message with current values from this instance
324 *
325 * @param other the other ActiveMQMessage to initialize
326 */
327 protected void initializeOther(AbstractPacket other) {
328 initializeBrokersVisited();
329 other.id = this.id;
330 other.receiptRequired = this.receiptRequired;
331 other.memoryUsage = this.memoryUsage;
332 CopyOnWriteArraySet set = this.brokersVisited;
333 if (set != null && !set.isEmpty()){
334 other.brokersVisited = new CopyOnWriteArraySet(set);
335 }
336 }
337
338 synchronized void initializeBrokersVisited(){
339 if (this.brokersVisited == null){
340 this.brokersVisited = new CopyOnWriteArraySet();
341 }
342 }
343
344 /**
345 * @return Returns the brokersVisited.
346 */
347 public Object[] getBrokersVisited() {
348 if (brokersVisited == null || brokersVisited.isEmpty()){
349 return null;
350 }
351 return brokersVisited.toArray();
352 }
353
354 /**
355 * @return Returns the bitArray.
356 */
357 public BitArray getBitArray() {
358 return bitArray;
359 }
360 /**
361 * @param bitArray The bitArray to set.
362 */
363 public void setBitArray(BitArray bitArray) {
364 this.bitArray = bitArray;
365 }
366 }