1 /* ZNet - Java Compression Layer for a new Socket Factory
2 Copyright (C) 1999, Free Software Rulez
3
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
8
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
17
18 The author of this program may be contacted at morgiaclaudio@yahoo.it. */
19
20 package java.net;
21
22 import java.lang;
23 import java.io;
24 import java.util;
25 import java.util.zip;
26
27 /**
28 * This class manage the compression layer for the datagram scenario.
29 * It's a thread responsible for packet receiving and handling.
30 * It can guess if the packet is compressed and so should be decompressed (or not) or if it's
31 * an ack for a previously sent packet and should be used to compute statistics.
32 * @see #run the main loop
33 * @see java.net.Compressor the compression interface
34 * @see java.net.Decompressor the decompression interface
35 * @see java.net.Interpolator the statistical engine interface
36 * @author <a href="mailto:morgiaclaudio@yahoo.it">Claudio Morgia</a>
37 * @version 1.0
38 */
39 public class DConnector extends Vector implements Runnable {
40 /**
41 * The socket implementation reference through which we send and receive raw packets.
42 */
43 protected ZDatagramSocketImpl impl;
44
45 protected Thread myself;
46
47 /**
48 * The main loop is driven by the value of that field. If it becomes true, the loop ends and the thread
49 * exit.
50 */
51 protected boolean notStop;
52
53 /**
54 * List of objects <i>waiting</i> for ack. This hashtable is indexed by sequence number
55 * and contains objects of {@link java.net.Listener Listener} type.
56 * Every <b>Listener</b> waits for a single ack packet and is responsible for the update
57 * of the statistical interpolator.
58 */
59 protected Hashtable listeners;
60
61 /**
62 * Reference to the real compression engine that must follow the
63 * {@link java.net.Compressor Compressor} interface.
64 * @see java.net.Compressor
65 * @see java.net.DeflatingCompressor
66 */
67 protected Compressor compressor;
68
69 /**
70 * Reference to the real decompression engine that must follow the
71 * {@link java.net.Decompressor Decompressor} interface.
72 * @see java.net.Decompressor
73 * @see java.net.InflatingDecompressor
74 */
75 protected Decompressor decompressor;
76
77 /**
78 * Reference to the real statistical modeler that must follow the
79 * {@link java.net.Interpolator Interpolator} interface.
80 * @see java.net.Interpolator
81 * @see java.net.LinearInterpolator
82 */
83 protected Interpolator interpolator;
84
85 protected ByteArrayOutputStream container;
86 protected DeflaterOutputStream deflater;
87
88 /**
89 * This is the only constructor. It's responsible for command-line options
90 * gathering through the properties set.
91 * Actually recognized property set is:
92 * <ul>
93 * <li>compressor - to load a different compression engine (default: DeflatingCompressor)
94 * <li>decompressor - to load a different decompression engine (default: InflatingDecompressor)
95 * <li>interpolator - to load a different statistical modeler (default: LinearInterpolator)
96 * </li>
97 * Classes referenced through properties must be present into the classpath or must be loadable
98 * through invocation of the standard method {#link java.lang.Class#forName(String) forName}.
99 * The method {@link java.net.Compressor#getLevels() getLevels()} is used to retrieve the number of
100 * supported compression level and to initialize the interpolator using the appropriate number of levels.
101 * Finally, the main loop is started through thread creation.
102 */
103 public DConnector(ZDatagramSocketImpl impl) throws Exception {
104 this.impl=impl;
105
106 String packname=getClass().getPackage().getName();
107 listeners=new Hashtable();
108 compressor=(Compressor)Class.forName(System.getProperty("compressor",packname+".DeflatingCompressor")).newInstance();
109 decompressor=(Decompressor)Class.forName(System.getProperty("decompressor",packname+".InflatingDecompressor")).newInstance();
110 Class interp_class=Class.forName(System.getProperty("interpolator",packname+".LinearInterpolator"));
111 Class[] params={Class.forName("java.lang.Integer")};
112 Object[] realp={new Integer(compressor.getLevels())};
113 interpolator=(Interpolator)interp_class.getConstructor(params).newInstance(realp);
114 notStop=true;
115
116 container=new ByteArrayOutputStream();
117 deflater=new DeflaterOutputStream(container,(Deflater)compressor);
118 myself=new Thread(this);
119 myself.start();
120 }
121
122 /**
123 * This method clones a received packet and adds it to the packet queue so it can be later retrieved.
124 * This method is internal use only.
125 * @see #run look at run to understand why we use a queue
126 */
127 protected void addPacket(DatagramPacket packet) {
128 byte[] buffer=new byte[packet.getLength()];
129 System.arraycopy(packet.getData(),packet.getOffset(),buffer,packet.getOffset(),packet.getLength());
130 addElement(new DatagramPacket(buffer,packet.getOffset(),packet.getLength(),packet.getAddress(),packet.getPort()));
131 }
132
133 /**
134 * This is the main loop and it's responsible for all packet receiving and handling activities.
135 * It can be outlined as follows:
136 * <ul>
137 * <li>the raw packet is received through the <b>real</b> socket implementation,
138 * <li>it's decoded through the class ZObject (that throws AckException in case of an ack packet),
139 * <li>if it's a compressed packet, it's decompressed and queued and an ack is sent back
140 * <li>if it's an uncompressed packet, no ack is required because the packet it's not sent by our
141 * socket implementation so the packet simply gets queued,
142 * <li>if it's an ack packet, the relative listener is awakened,
143 * <li>any other situation causes the thread to yield, letting the scheduler do its job without
144 * overloading the CPU with an empty <i>while</i> loop.
145 * </ul>
146 * @see #decompress decompression method
147 * @see #addPacket method used to queue a packet
148 * @see java.net.Listener the ack listener class
149 */
150 public void run() {
151 ZObject received;
152 byte[] buf=new byte[65536];
153 DatagramPacket packet=new DatagramPacket(buf,buf.length);;
154 while (notStop)
155 try {
156 packet=new DatagramPacket(buf,buf.length);
157 impl._receive(packet);
158 received=new ZObject(packet);
159 decompress(packet);
160 addPacket(packet);
161 sendAck(received,packet);
162 } catch(AckException ae) {
163 ((Listener)listeners.get(new Long(ae.getSeqNum()))).done();
164 } catch(NoMagicException nm) {
165 addPacket(packet);
166 } catch(Exception e) {
167 Thread.yield();
168 }
169 }
170
171 /**
172 * This method is responsible for send ack when a packet is received.
173 * It guess the return address from the second argument, the reference to the received packet and
174 * the sequence number from the decoded ZObject (contained in the same datagram).
175 * It prepares a new ack-type ZObject, ask it to encode itself and send back the so created packet.
176 * @see java.net.ZObject this class represent protocol packets and utility function to encode/decode it
177 * @param received the net object received
178 * @param packet the packet received
179 */
180 public void sendAck(ZObject received,DatagramPacket packet) throws IOException {
181 ZObject zack=new ZObject(received.getSeqNum());
182 byte[] buffer=zack.prepareData();
183 DatagramPacket ack=new DatagramPacket(buffer,buffer.length,packet.getAddress(),packet.getPort());
184 impl._send(ack);
185 }
186
187 /**
188 * This method checks if the system queue is empty or not, so it's used to see if there's some packet
189 * available.
190 */
191 public boolean hasPacket() {
192 return size()!=0;
193 }
194
195 /**
196 * This method remove a packet from the queue and copy data into the passed datagram.
197 * @param p the datagram to fill with data
198 */
199 public void getPacket(DatagramPacket p) {
200 DatagramPacket packet=(DatagramPacket)remove(0);
201 p.setData(packet.getData(),packet.getOffset(),packet.getLength());
202 p.setAddress(packet.getAddress());
203 p.setPort(packet.getPort());
204 }
205
206 /**
207 * This method is invoked by the {@link java.net.ZDatagramSocketImpl#send ZDatagramSocketImpl#send} method
208 * to perform the real data sending.
209 * Basically it performs the following tasks:<ul>
210 * <li>ask the interpolator to compute a suitable compression level based on data buffer
211 * dimension,
212 * <li>set the compression engine to work at that level
213 * <li>compress data and acquire compression time
214 * <li>build ZObject, serialize it and send
215 * <li>install a new Listener object, waiting for ack to update the interpolator
216 * </ul>
217 */
218 public void send(DatagramPacket p) throws IOException {
219 long from=System.currentTimeMillis();
220 byte[] buffer;
221 ZObject zobj;
222 int dim=p.getLength();
223 int level=interpolator.computeLevel(dim);
224
225 container.reset();
226 compressor.reset();
227 compressor.setLevel(level);
228 deflater.write(p.getData(),p.getOffset(),dim);
229 deflater.finish();
230
231 zobj=new ZObject(container.toByteArray());
232 buffer=zobj.prepareData();
233 p.setData(buffer,p.getOffset(),buffer.length);
234
235 listeners.put(new Long(zobj.getSeqNum()),new Listener(interpolator,level,System.currentTimeMillis()-from,dim,buffer.length));
236
237 impl._send(p);
238 }
239
240 /**
241 * This method is responsible for data decompression. It uses the loaded decompression engine
242 * (or {@link java.net.InflatingDecompressor InflatingDecompressor} if not provided) and
243 * a ByteArrayInputStream (that renders a byte array as an input stream) to build an
244 * InflaterInputStream that "reads" compressed data and write "uncompressed data".
245 * @see java.io.ByteArrayInputStream if you want to build an input stream that gets data from a byte array, you got it
246 * @see java.net.Decompressor the decompression engine
247 * @see java.util.zip.InflaterInputStream the decompression stream
248 */
249 public void decompress(DatagramPacket p) throws NoMagicException,IOException {
250 int length;
251 byte[] data;
252 try {
253 data=(new ZObject(p)).getData();
254 } catch(AckException e) {
255 throw new IOException("Spurious ack packet");
256 }
257 byte[] buffer=new byte[8192];
258 ByteArrayOutputStream out=new ByteArrayOutputStream();
259 InflaterInputStream inflater=new InflaterInputStream(new ByteArrayInputStream(data),(Inflater)decompressor);
260
261 while ((length=inflater.read(buffer,0,buffer.length))!=-1)
262 out.write(buffer,0,length);
263
264 inflater.close();
265 decompressor.reset();
266 data=out.toByteArray();
267 p.setData(data,p.getOffset(),data.length);
268 }
269 }