Source code: org/activemq/ra/MessageEndpointProxy.java
1 /**
2 *
3 * Copyright 2004 Michael Gaffney
4 *
5 * Licensed under the Apache License, Version 2.0 (the "License");
6 * you may not use this file except in compliance with the License.
7 * You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 *
17 **/
18 package org.activemq.ra;
19
20 import org.apache.commons.logging.Log;
21 import org.apache.commons.logging.LogFactory;
22
23 import javax.jms.Message;
24 import javax.jms.MessageListener;
25 import javax.resource.ResourceException;
26 import javax.resource.spi.endpoint.MessageEndpoint;
27 import java.lang.reflect.Method;
28
29 /**
30 * @author <a href="mailto:michael.gaffney@panacya.com">Michael Gaffney </a>
31 */
32 public class MessageEndpointProxy implements MessageListener, MessageEndpoint {
33
34 private static final MessageEndpointState ALIVE = new MessageEndpointAlive();
35 private static final MessageEndpointState GOING_TO_DIE = new MessageEndpointInTheElectricChair();
36 private static final MessageEndpointState DEAD = new MessageEndpointDead();
37
38 private static final Log log = LogFactory.getLog(MessageEndpointProxy.class);
39
40 private static int proxyCount = 0;
41
42 private static int deliveryCount = 0;
43
44 private final int proxyID;
45
46 private MessageEndpoint endpoint;
47
48 private boolean alive = true;
49
50 private MessageEndpointState state = ALIVE;
51
52 private static int getID() {
53 return ++proxyCount;
54 }
55
56 private static int getNextDeliveryCount() {
57 return ++deliveryCount;
58 }
59
60
61 public MessageEndpointProxy(MessageEndpoint endpoint) {
62 if (!(endpoint instanceof MessageListener)) {
63 throw new IllegalArgumentException("MessageEndpoint is not a MessageListener");
64 }
65 proxyID = getID();
66 this.endpoint = endpoint;
67 }
68
69 public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
70 state.beforeDelivery(this, method);
71 }
72
73 public void onMessage(Message message) {
74 // log.warn("Delivery Count: " + getNextDeliveryCount() );
75 state.onMessage(this, message);
76 }
77
78 public void afterDelivery() throws ResourceException {
79 state.afterDelivery(this);
80 }
81
82 public void release() {
83 state.release(this);
84 }
85
86 public String toString() {
87 return "MessageEndpointProxy{ " +
88 "proxyID: " + proxyID +
89 ", endpoint: " + endpoint +
90 " }";
91 }
92
93 private abstract static class MessageEndpointState {
94
95 public void beforeDelivery(MessageEndpointProxy proxy, Method method) throws NoSuchMethodException, ResourceException {
96 throw new IllegalStateException();
97 }
98
99 public void onMessage(MessageEndpointProxy proxy, Message message) {
100 throw new IllegalStateException();
101 }
102
103 public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
104 throw new IllegalStateException();
105 }
106
107 public void release(MessageEndpointProxy proxy) {
108 throw new IllegalStateException();
109 }
110
111 protected final void transition(MessageEndpointProxy proxy, MessageEndpointState nextState) {
112 proxy.state = nextState;
113 nextState.enter(proxy);
114 }
115
116 protected void enter(MessageEndpointProxy proxy) {
117 }
118 }
119
120 private static class MessageEndpointAlive extends MessageEndpointState {
121
122 public void beforeDelivery(MessageEndpointProxy proxy, Method method) throws NoSuchMethodException, ResourceException {
123 try {
124 proxy.endpoint.beforeDelivery(method);
125 } catch (NoSuchMethodException e) {
126 transition(proxy, DEAD);
127 throw e;
128 } catch (ResourceException e) {
129 transition(proxy, DEAD);
130 throw e;
131 }
132 }
133
134 public void onMessage(MessageEndpointProxy proxy, Message message) {
135 try {
136 ((MessageListener) proxy.endpoint).onMessage(message);
137 } catch (RuntimeException e) {
138 transition(proxy, GOING_TO_DIE);
139 throw e;
140 }
141 }
142
143 public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
144 try {
145 proxy.endpoint.afterDelivery();
146 } catch (ResourceException e) {
147 transition(proxy, DEAD);
148 throw e;
149 }
150 }
151
152 public void release(MessageEndpointProxy proxy) {
153 transition(proxy, DEAD);
154 }
155 }
156
157 private static class MessageEndpointInTheElectricChair extends MessageEndpointState {
158
159 public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
160 try {
161 proxy.endpoint.afterDelivery();
162 } catch (ResourceException e) {
163 throw e;
164 } finally {
165 transition(proxy, DEAD);
166 }
167 }
168
169 public void release(MessageEndpointProxy proxy) {
170 transition(proxy, DEAD);
171 }
172 }
173
174 private static class MessageEndpointDead extends MessageEndpointState {
175
176 protected void enter(MessageEndpointProxy proxy) {
177 proxy.endpoint.release();
178 proxy.endpoint = null;
179 }
180
181 public void beforeDelivery(MessageEndpointProxy proxy, Method method) throws NoSuchMethodException, ResourceException {
182 throw new InvalidMessageEndpointException();
183 }
184
185 public void onMessage(MessageEndpointProxy proxy, Message message) {
186 throw new InvalidMessageEndpointException();
187 }
188
189 public void afterDelivery(MessageEndpointProxy proxy) throws ResourceException {
190 throw new InvalidMessageEndpointException();
191 }
192
193 public void release(MessageEndpointProxy proxy) {
194 throw new InvalidMessageEndpointException();
195 }
196 }
197 }