1 /* ZNet - Java Compression Layer for a new Socket Factory
2 Copyright (C) 1999, Free Software Rulez
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
18 The author of this program may be contacted at morgiaclaudio@yahoo.it. */
19
20 package java.net;
21
22 import java.lang;
23 import java.io;
24 import java.util;
25 import java.util.zip;
26
27 /**
28 * This class manage the compression layer for the stream (connected) scenario.
29 * It's a thread responsible for packet receiving and handling.
30 * It can guess if the packet is compressed and so should be decompressed or if it's
31 * an ack for a previously sent packet and should be used to compute statistics.
32 * @see #run the main loop
33 * @see java.net.Compressor the compression interface
34 * @see java.net.Decompressor the decompression interface
35 * @see java.net.Interpolator the statistical engine interface
36 * @author <a href="mailto:morgiaclaudio@yahoo.it">Claudio Morgia</a>
37 * @version 1.0
38 */
39 public class Connector implements Runnable {
40
41 /**
42 * Reference to the real compression engine that must follow the
43 * {@link java.net.Compressor Compressor} interface.
44 * @see java.net.Compressor
45 * @see java.net.DeflatingCompressor
46 */
47 protected Compressor compressor;
48
49 /**
50 * Reference to the real decompression engine that must follow the
51 * {@link java.net.Decompressor Decompressor} interface.
52 * @see java.net.Decompressor
53 * @see java.net.InflatingDecompressor
54 */
55 protected Decompressor decompressor;
56
57 /**
58 * Reference to the real statistical modeler that must follow the
59 * {@link java.net.Interpolator Interpolator} interface.
60 * @see java.net.Interpolator
61 * @see java.net.LinearInterpolator
62 */
63 protected Interpolator interpolator;
64
65 /**
66 * Reference to the real input stream as provided by the plain-old
67 * socket factory, tipically {@link java.net.PlainSocketImpl PlainSocketImpl}.
68 */
69 protected InputStream in;
70
71 /**
72 * Reference to the real output stream as provided by the plain-old
73 * socket factory, tipically {@link java.net.PlainSocketImpl PlainSocketImpl}.
74 */
75 protected OutputStream out;
76
77 /**
78 * This input stream represents the input to the compression engine.
79 * It is the reference passed when {@link #getInputStream getInputStream} is invoked.
80 * IMHO it's interesting to see how it is realized because it's an infinite data queue.
81 * @see java.net.ZInputStream
82 */
83 protected ZInputStream zin;
84
85 /**
86 * This output stream represents the output coming from the compression engine.
87 * It is the reference passed when {@link #getOutputStream getOutputStream} is invoked.
88 * @see java.net.ZOutputStream
89 */
90 protected ZOutputStream zout;
91
92 /**
93 * List of objects <i>waiting</i> for ack. This hashtable is indexed by sequence number
94 * and contains objects of {@link java.net.Listener Listener} type.
95 * Every <b>Listener</b> waits for a single ack packet and is responsible for the update
96 * of the statistical interpolator.
97 */
98 protected Hashtable listeners;
99
100 protected Thread myself;
101
102 /**
103 * This variable drive the main loop: a false value will stop the thread and prepare for exit.
104 * The value can be changed using the {@link #stop stop} method.
105 */
106 protected boolean notStop;
107
108 /**
109 * This internal method is the common part of constructors. It's responsible for command-line options
110 * gathering through the properties set.
111 * Actually recognized property set is:
112 * <ul>
113 * <li>compressor - to load a different compression engine (default: DeflatingCompressor)
114 * <li>decompressor - to load a different decompression engine (default: InflatingDecompressor)
115 * <li>interpolator - to load a different statistical modeler (default: LinearInterpolator)
116 * </li>
117 * Classes referenced through properties must be present into the classpath or must be loadable
118 * through invocation of the standard method {#link java.lang.Class#forName(String) forName}.
119 * The method {@link java.net.Compressor#getLevels() getLevels()} is used to retrieve the number of
120 * supported compression level and to initialize the interpolator using the appropriate number of levels.
121 * Finally, the main loop is started through thread creation.
122 */
123 protected synchronized void create() throws Exception {
124 String packname=getClass().getPackage().getName();
125
126 compressor=(Compressor)Class.forName(System.getProperty("compressor",packname+".DeflatingCompressor")).newInstance();
127 decompressor=(Decompressor)Class.forName(System.getProperty("decompressor",packname+".InflatingDecompressor")).newInstance();
128 Class interp_class=Class.forName(System.getProperty("interpolator",packname+".LinearInterpolator"));
129 Class[] params={Class.forName("java.lang.Integer")};
130 Object[] realp={new Integer(compressor.getLevels())};
131 interpolator=(Interpolator)interp_class.getConstructor(params).newInstance(realp);
132 myself=new Thread(this);
133 myself.start();
134 notStop=true;
135 }
136
137 /**
138 * This constructor is used when the connector is not yet started and it is request by
139 * the method {@link java.net.ZSocketImpl#getInputStream() getInputStream} of our socket
140 * implementation.
141 * It's perfectly specular (but non identical) to the next constructor.
142 * Here the question is that, like with Object{Input | Output}Stream class, on client side
143 * you should open output then input streams and on server side you should do the converse,
144 * because the stream opening causes handshaking packet to be exchanged between the ObjectOutputStream
145 * on client and ObjectInputStream on server.
146 * If you open the two output (on both client and server) they'll send handshacking packets that nobody
147 * receive and answer because no input stream is opened yet.
148 * @see java.net.ZInputStream the unlimited input queue
149 * @see java.net.ZOutputSteeam
150 */
151 public Connector(InputStream in,OutputStream out) throws Exception {
152 this.in=in;
153 this.zin=new ZInputStream();
154 this.out=out;
155 this.zout=new ZOutputStream(this);
156 listeners=new Hashtable();
157 create();
158 }
159
160 /**
161 * See the previous constructor keeping in mind this one works for
162 * {@link java.net.ZSocketImpl#getOutputStream() getOutputStream}
163 * @see #Connector(InputStream,OutputStream)
164 * @see java.net.ZInputStream the unlimited input queue
165 * @see java.net.ZOutputSteeam
166 */
167 public Connector(OutputStream out,InputStream in) throws Exception {
168 this.out=out;
169 this.zout=new ZOutputStream(this);
170 this.in=in;
171 this.zin=new ZInputStream();
172 listeners=new Hashtable();
173 create();
174 }
175
176 /**
177 * This method is responsible for data decompression. It uses the loaded decompression engine
178 * (or {@link java.net.InflatingDecompressor InflatingDecompressor} if not provided) and
179 * a ByteArrayInputStream (that renders a byte array as an input stream) to build an
180 * InflaterInputStream that "reads" compressed data and write "uncompressed data".
181 * @see java.io.ByteArrayInputStream if you want to build an input stream that gets data from a byte array, you got it
182 * @see java.net.Decompressor the decompression engine
183 * @see java.util.zip.InflaterInputStream the decompression stream
184 */
185 protected byte[] decompress(byte[] buf) throws IOException {
186 int length;
187 //buf=D.readFile("here.out");
188 byte[] buffer=new byte[65536];
189 ByteArrayOutputStream container=new ByteArrayOutputStream();
190 InflaterInputStream inflater=new InflaterInputStream(new ByteArrayInputStream(buf),(Inflater)decompressor);
191 while ((length=inflater.read(buffer,0,buffer.length))!=-1) {
192 container.write(buffer,0,length);
193 }
194 inflater.close();
195 decompressor.reset();
196 return container.toByteArray();
197 }
198
199 /**
200 * This method is responsible for data compression. It uses the loaded compression engine
201 * (or {@link java.net.DeflatingCompressor DeflatingCompressor} if not provided) and
202 * a ByteArrayOutputStream (that renders an output stream as a byte array) to build a
203 * DeflaterOutputStream that "writes" uncompressed data and read "compressed data".
204 * @see java.io.ByteArrayOutputStream if you want to build an unlimited data queue that can be translated into a byte array, you got it
205 * @see java.net.Compressor the compression engine
206 * @see java.util.zip.DeflaterOutputStream the compression stream
207 */
208 protected byte[] compress(byte[] buf) throws IOException {
209 byte[] tmp;
210 ByteArrayOutputStream container=new ByteArrayOutputStream();
211 DeflaterOutputStream deflater=new DeflaterOutputStream(container,(Deflater)compressor);
212 deflater.write(buf,0,buf.length);
213 deflater.finish();
214 deflater.close();
215 tmp=container.toByteArray();
216 //D.writeFile("here.out",tmp);
217 return tmp;
218 }
219
220 /**
221 * The main loop. Here packets get captured and converted into {@link java.net.ZObject ZObjects}
222 * to see if they are normal compressed or ack. In the first case, data gets decompressed and queued
223 * into the ZInputStream that can be given out as the primary input stream (through the method
224 * {@link java.net.ZSocketImpl#getInputStream getInputStream} and an ack packet is immediately sent back.
225 * In the case of a received ack, the appropriate listener gets notified of the arriving ack and it will
226 * provide to update the interpolator.
227 * @see java.net.ZInputStream our unlimited queue
228 * @see java.net.ZObject
229 * @see java.net.Listener the agent responsible for interpolator update
230 */
231 public void run() {
232 ZObject received;
233 while (notStop) {
234 try {
235 received=new ZObject(in);
236 zin.addBytes(decompress(received.getData()));
237 sendAck(received);
238 } catch(AckException ae) {
239 ((Listener)listeners.get(new Long(ae.getSeqNum()))).done();
240 } catch(Exception e) {
241 Thread.yield();
242 }
243 }
244 }
245
246 /**
247 * An ack is sent through this method, based on the passed ZObject that's the received one.
248 * From that object it gets the sequence number and use it to build an ack-type ZObject.
249 * Then the latter is "serialized" and transformed into a byte array , ready to be sent through the
250 * real output stream (as provided by the previous socket factory).
251 * We don't use standard serialization because the object's type domain is really limited (only one
252 * class) and the resulting overhead would not be justified.
253 */
254 public void sendAck(ZObject zobj) throws IOException {
255 byte[] toSend=(new ZObject(zobj.getSeqNum())).prepareData();
256 out.write(toSend);
257 }
258
259 /**
260 * This method is invoked by the {@link java.net.ZOutputStream#flush() ZoutputStream.flush} method
261 * to perform the real data sending.
262 * Basically it performs the following tasks:<ul>
263 * <li>ask the interpolator to compute a suitable compression level based on data buffer
264 * dimension,
265 * <li>set the compression engine to work at that level
266 * <li>compress data and acquire compression time
267 * <li>build ZObject, serialize it and send
268 * <li>install a new Listener object, waiting for ack to update the interpolator
269 * </ul>
270 */
271 public void send(byte[] data) throws IOException {
272 Listener here;
273 byte[] buf;
274
275 int level=interpolator.computeLevel(data.length);
276 compressor.reset();
277 compressor.setLevel(level);
278
279 long from=System.currentTimeMillis();
280 buf=compress(data);
281 int compressed_length=buf.length;
282 long compress_time=System.currentTimeMillis()-from;
283
284 ZObject toSend=new ZObject(buf);
285
286 from=System.currentTimeMillis();
287 out.write(toSend.prepareData());
288 listeners.put(new Long(toSend.getSeqNum()),new Listener(interpolator,level,compress_time,data.length,compressed_length));
289 }
290
291 public InputStream getInputStream() {
292 return zin;
293 }
294
295 public OutputStream getOutputStream() {
296 return zout;
297 }
298
299 public synchronized void stop() {
300 notStop=false;
301 }
302
303 }