Source code: org/mule/transformers/JMSTransformer.java
1 /*
2 * $Header: /cvsroot/mule/mule/src/java/org/mule/transformers/JMSTransformer.java,v 1.6 2003/10/20 21:44:38 rossmason Exp $
3 * $Revision: 1.6 $
4 * $Date: 2003/10/20 21:44:38 $
5 * ------------------------------------------------------------------------------------------------------
6 *
7 * Copyright (c) Cubis Limited. All rights reserved.
8 * http://www.cubis.co.uk
9 *
10 * The software in this package is published under the terms of the BSD
11 * style license a copy of which has been included with this distribution in
12 * the LICENSE.txt file.
13 *
14 */
15
16 package org.mule.transformers;
17
18 import java.util.Hashtable;
19
20 import javax.jms.BytesMessage;
21 import javax.jms.Message;
22 import javax.jms.QueueConnection;
23 import javax.jms.QueueConnectionFactory;
24 import javax.jms.QueueSession;
25 import javax.jms.Session;
26 import javax.naming.Context;
27 import javax.naming.InitialContext;
28
29 import org.apache.commons.logging.Log;
30 import org.apache.commons.logging.LogFactory;
31 import org.mule.MuleConstants;
32 import org.mule.umo.transformer.TransformerException;
33 import org.mule.util.MessageUtils;
34 import org.mule.util.compression.CompressionHelper;
35
36 /**
37 * <p><code>JMSTransformer</code> is an abstract class the should be used for all transformers where a JMS message
38 * will be the transformed or transformee object. It provides services for compressing and uncompressing messages.
39 *
40 * @author Ross Mason
41 * @version 1.2
42 */
43
44 public abstract class JMSTransformer extends CompressionTransformer
45 {
46
47 public static final String JMS_PROPERTY_COMPRESSED = "isCompressed";
48
49 /** logger used by this class */
50 private static transient Log log = LogFactory.getLog(JMSTransformer.class);
51
52 private Session session = null;
53
54 public JMSTransformer()
55 {
56 }
57 /**
58 * We create a jms session here so that we can create JMS messages.
59 * The session is not used for any other purpose
60 */
61 public void processProperties() throws TransformerException
62 {
63 super.processProperties();
64 if (props != null && props.size() > 0)
65 {
66 String cnnFactoryName = (String) props.get(MuleConstants.JNDI_CONNECTION_FACTORY_NAME);
67 if (cnnFactoryName != null)
68 {
69 //throw new TransformerException("You must set the property " +
70 // MuleConstants.JNDI_CONNECTION_FACTORY_NAME + " in the properties");
71
72 try
73 {
74 Context ctx = new InitialContext(new Hashtable(props));
75
76 QueueConnectionFactory qcf = (QueueConnectionFactory) ctx.lookup(cnnFactoryName);
77 QueueConnection cnn = qcf.createQueueConnection();
78 session = cnn.createQueueSession(false, QueueSession.AUTO_ACKNOWLEDGE);
79
80 }
81 catch (Exception e)
82 {
83 throw new TransformerException("Failed to create a JMS session for the JMSTransformer" + e, e);
84 }
85 }
86 }
87 }
88
89 /**
90 * Transforms the object.
91 *
92 * @param src The source object to transform.
93 * @param doCompression determines weather the data should be compressed or not
94 * @return The transformed object as an XMLMessage
95 */
96 public Object transform(Object src, Session session) throws TransformerException
97 {
98 if (session == null && this.session == null)
99 throw new TransformerException("Session cannot be null when transforming to a JMS payload");
100 if (session != null)
101 {
102 this.session = session;
103 }
104 if (src == null)
105 {
106 throw new TransformerException("Cannot transform 'null' JMS Message into an object");
107 }
108 Object ret = transform(src);
109 if (log.isDebugEnabled())
110 log.debug(
111 "Transformed message from type: " + src.getClass().getName() + " to type: " + ret.getClass().getName());
112
113 return ret;
114 }
115
116 /**
117 *
118 * @param src The source data to compress
119 * @return
120 * @throws TransformerException
121 */
122 protected Message transformToMessage(Object src) throws TransformerException
123 {
124 try
125 {
126 if (session == null)
127 {
128 throw new TransformerException("You must set the JMS Session on a JMSTransformer before using it");
129 }
130 if (getDoCompression())
131 {
132 byte[] buffer = compressMessage(src);
133 if (log.isDebugEnabled())
134 log.debug("Compressing message in transformation");
135
136 BytesMessage bMsg = session.createBytesMessage();
137 bMsg.clearBody();
138 bMsg.setBooleanProperty(JMS_PROPERTY_COMPRESSED, true);
139
140 //was getting strange results with the unit tests if I used
141 //bMsg.writeObject(buffer) or bMsg.writeBytes(buffer) to do with an
142 //optimisation in the server I am testing with.
143 //Doing it byte by byte for now
144 for (int i = 0; i < buffer.length; i++)
145 {
146 bMsg.writeByte(buffer[i]);
147 }
148
149 return bMsg;
150 }
151 else
152 {
153 return MessageUtils.getMessageForObject(src, session);
154 }
155 }
156 catch (Exception e)
157 {
158 throw new TransformerException("Failed to transform message: " + e, e);
159 }
160 }
161
162 protected Object transformFromMessage(Message source) throws TransformerException
163 {
164 Object result = null;
165 try
166 {
167 if (log.isDebugEnabled())
168 log.debug("Message type received is: " + source.getClass().getName());
169
170 if (source instanceof BytesMessage)
171 {
172 //If this bytes Message is not compressed it will throw a NotGZipFormatException
173 //It would be nice if we could check the custom JMS compression property here. However
174 //When jms bridging other, non-JMS-compliant message servers occur, there is no guarantee that
175 //Custom properties will be proporgated
176 if (CompressionHelper.isMessageCompressed((BytesMessage) source))
177 {
178
179 if (log.isDebugEnabled())
180 log.debug("Message recieved is compressed");
181 result = CompressionHelper.uncompressBytesMessage((BytesMessage) source);
182 }
183 else
184 {
185 //If the message is not compressed, handle it the standard way
186 result = MessageUtils.getObjectForMessage(source);
187 }
188 }
189 else
190 {
191 result = MessageUtils.getObjectForMessage(source);
192 }
193 }
194 catch (Exception e)
195 {
196 throw new TransformerException("Failed to transform message: " + e.getMessage());
197 }
198 return result;
199 }
200
201 public Session getSession()
202 {
203 return session;
204 }
205
206 public void setSession(Session session)
207 {
208 this.session = session;
209 }
210
211 }