Save This Page
Home » openjdk-7 » java » net » [javadoc | source]
    1   /*  ZNet - Java Compression Layer for a new Socket Factory
    2       Copyright (C) 1999, Free Software Rulez
    3   
    4       This program is free software; you can redistribute it and/or modify
    5       it under the terms of the GNU General Public License as published by
    6       the Free Software Foundation; either version 2 of the License, or
    7       (at your option) any later version.
    8   
    9       This program is distributed in the hope that it will be useful,
   10       but WITHOUT ANY WARRANTY; without even the implied warranty of
   11       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   12       GNU General Public License for more details.
   13   
   14       You should have received a copy of the GNU General Public License
   15       along with this program; if not, write to the Free Software
   16       Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
   17   
   18           The author of this program may be contacted at morgiaclaudio@yahoo.it. */
   19   
   20   package java.net;
   21   
   22   import java.lang;
   23   import java.io;
   24   import java.util;
   25   import java.util.zip;
   26   
   27   /**
   28    * This class manage the compression layer for the datagram scenario.
   29    * It's a thread responsible for packet receiving and handling.
   30    * It can guess if the packet is compressed and so should be decompressed (or not) or if it's
   31    * an ack for a previously sent packet and should be used to compute statistics.
   32    * @see #run the main loop
   33    * @see java.net.Compressor the compression interface
   34    * @see java.net.Decompressor the decompression interface
   35    * @see java.net.Interpolator the statistical engine interface
   36    * @author <a href="mailto:morgiaclaudio@yahoo.it">Claudio Morgia</a>
   37    * @version 1.0
   38    */
   39   public class DConnector extends Vector implements Runnable {
   40       /**
   41        * The socket implementation reference through which we send and receive raw packets.
   42        */
   43       protected ZDatagramSocketImpl impl;
   44   
   45       protected Thread myself;
   46   
   47       /**
   48        * The main loop is driven by the value of that field. If it becomes true, the loop ends and the thread
   49        * exit.
   50        */
   51       protected boolean notStop;
   52   
   53       /**
   54        * List of objects <i>waiting</i> for ack. This hashtable is indexed by sequence number
   55        * and contains objects of {@link java.net.Listener Listener} type.
   56        * Every <b>Listener</b> waits for a single ack packet and is responsible for the update
   57        * of the statistical interpolator.
   58        */
   59       protected Hashtable listeners;
   60   
   61       /**
   62        * Reference to the real compression engine that must follow the
   63        * {@link java.net.Compressor Compressor} interface.
   64        * @see java.net.Compressor
   65        * @see java.net.DeflatingCompressor
   66        */
   67       protected Compressor compressor;
   68   
   69       /**
   70        * Reference to the real decompression engine that must follow the
   71        * {@link java.net.Decompressor Decompressor} interface.
   72        * @see java.net.Decompressor
   73        * @see java.net.InflatingDecompressor
   74        */
   75       protected Decompressor decompressor;
   76   
   77       /**
   78        * Reference to the real statistical modeler that must follow the
   79        * {@link java.net.Interpolator Interpolator} interface.
   80        * @see java.net.Interpolator
   81        * @see java.net.LinearInterpolator
   82        */
   83       protected Interpolator interpolator;
   84   
   85       protected ByteArrayOutputStream container;
   86       protected DeflaterOutputStream deflater;
   87   
   88       /**
   89        * This is the only constructor. It's responsible for command-line options
   90        * gathering through the properties set.
   91        * Actually recognized property set is:
   92        * <ul>
   93        * <li>compressor   - to load a different compression engine (default: DeflatingCompressor)
   94        * <li>decompressor - to load a different decompression engine (default: InflatingDecompressor)
   95        * <li>interpolator - to load a different statistical modeler (default: LinearInterpolator)
   96        * </li>
   97        * Classes referenced through properties must be present into the classpath or must be loadable
   98        * through invocation of the standard method {#link java.lang.Class#forName(String) forName}.
   99        * The method {@link java.net.Compressor#getLevels() getLevels()} is used to retrieve the number of
  100        * supported compression level and to initialize the interpolator using the appropriate number of levels.
  101        * Finally, the main loop is started through thread creation.
  102        */
  103       public DConnector(ZDatagramSocketImpl impl) throws Exception {
  104   	this.impl=impl;
  105   
  106   	String packname=getClass().getPackage().getName();
  107   	listeners=new Hashtable();
  108   	compressor=(Compressor)Class.forName(System.getProperty("compressor",packname+".DeflatingCompressor")).newInstance();
  109   	decompressor=(Decompressor)Class.forName(System.getProperty("decompressor",packname+".InflatingDecompressor")).newInstance();
  110   	Class interp_class=Class.forName(System.getProperty("interpolator",packname+".LinearInterpolator"));
  111   	Class[] params={Class.forName("java.lang.Integer")};
  112   	Object[] realp={new Integer(compressor.getLevels())};
  113   	interpolator=(Interpolator)interp_class.getConstructor(params).newInstance(realp);
  114   	notStop=true;
  115   
  116   	container=new ByteArrayOutputStream();
  117   	deflater=new DeflaterOutputStream(container,(Deflater)compressor);
  118   	myself=new Thread(this);
  119   	myself.start();
  120       }
  121   
  122       /**
  123        * This method clones a received packet and adds it to the packet queue so it can be later retrieved.
  124        * This method is internal use only.
  125        * @see #run look at run to understand why we use a queue
  126        */
  127       protected void addPacket(DatagramPacket packet) {
  128   	byte[] buffer=new byte[packet.getLength()];
  129   	System.arraycopy(packet.getData(),packet.getOffset(),buffer,packet.getOffset(),packet.getLength());
  130   	addElement(new DatagramPacket(buffer,packet.getOffset(),packet.getLength(),packet.getAddress(),packet.getPort()));
  131       }
  132   
  133       /**
  134        * This is the main loop and it's responsible for all packet receiving and handling activities.
  135        * It can be outlined as follows:
  136        * <ul>
  137        * <li>the raw packet is received through the <b>real</b> socket implementation,
  138        * <li>it's decoded through the class ZObject (that throws AckException in case of an ack packet),
  139        * <li>if it's a compressed packet, it's decompressed and queued and an ack is sent back
  140        * <li>if it's an uncompressed packet, no ack is required because the packet it's not sent by our
  141        * socket implementation so the packet simply gets queued,
  142        * <li>if it's an ack packet, the relative listener is awakened,
  143        * <li>any other situation causes the thread to yield, letting the scheduler do its job without
  144        * overloading the CPU with an empty <i>while</i> loop.
  145        * </ul>
  146        * @see #decompress decompression method
  147        * @see #addPacket method used to queue a packet
  148        * @see java.net.Listener the ack listener class
  149        */
  150       public void run() {
  151   	ZObject received;
  152   	byte[] buf=new byte[65536];
  153   	DatagramPacket packet=new DatagramPacket(buf,buf.length);;
  154   	while (notStop)
  155   	    try {
  156   		packet=new DatagramPacket(buf,buf.length);
  157   		impl._receive(packet);
  158   		received=new ZObject(packet);
  159   		decompress(packet);
  160   		addPacket(packet);
  161   		sendAck(received,packet);
  162   	    } catch(AckException ae) {
  163   		((Listener)listeners.get(new Long(ae.getSeqNum()))).done();
  164   	    } catch(NoMagicException nm) {
  165   		addPacket(packet);
  166   	    } catch(Exception e) {
  167   		Thread.yield();
  168   	    }
  169       }
  170   
  171       /**
  172        * This method is responsible for send ack when a packet is received.
  173        * It guess the return address from the second argument, the reference to the received packet and
  174        * the sequence number from the decoded ZObject (contained in the same datagram).
  175        * It prepares a new ack-type ZObject, ask it to encode itself and send back the so created packet.
  176        * @see java.net.ZObject this class represent protocol packets and utility function to encode/decode it
  177        * @param received the net object received
  178        * @param packet the packet received
  179        */
  180       public void sendAck(ZObject received,DatagramPacket packet) throws IOException {
  181   	ZObject zack=new ZObject(received.getSeqNum());
  182   	byte[] buffer=zack.prepareData();
  183   	DatagramPacket ack=new DatagramPacket(buffer,buffer.length,packet.getAddress(),packet.getPort());
  184   	impl._send(ack);
  185       }
  186   
  187       /**
  188        * This method checks if the system queue is empty or not, so it's used to see if there's some packet
  189        * available.
  190        */
  191       public boolean hasPacket() {
  192   	return size()!=0;
  193       }
  194   
  195       /**
  196        * This method remove a packet from the queue and copy data into the passed datagram.
  197        * @param p the datagram to fill with data
  198        */
  199       public void getPacket(DatagramPacket p) {
  200   	DatagramPacket packet=(DatagramPacket)remove(0);
  201   	p.setData(packet.getData(),packet.getOffset(),packet.getLength());
  202   	p.setAddress(packet.getAddress());
  203   	p.setPort(packet.getPort());
  204       }
  205   
  206       /**
  207        * This method is invoked by the {@link java.net.ZDatagramSocketImpl#send ZDatagramSocketImpl#send} method
  208        * to perform the real data sending.
  209        * Basically it performs the following tasks:<ul>
  210        * <li>ask the interpolator to compute a suitable compression level based on data buffer
  211        * dimension,
  212        * <li>set the compression engine to work at that level
  213        * <li>compress data and acquire compression time
  214        * <li>build ZObject, serialize it and send
  215        * <li>install a new Listener object, waiting for ack to update the interpolator
  216        * </ul>
  217        */
  218       public void send(DatagramPacket p) throws IOException {
  219   	long from=System.currentTimeMillis();
  220   	byte[] buffer;
  221   	ZObject zobj;
  222   	int dim=p.getLength();
  223   	int level=interpolator.computeLevel(dim);
  224   	
  225   	container.reset();
  226   	compressor.reset();
  227   	compressor.setLevel(level);
  228   	deflater.write(p.getData(),p.getOffset(),dim);
  229   	deflater.finish();
  230   	
  231   	zobj=new ZObject(container.toByteArray());
  232   	buffer=zobj.prepareData();
  233   	p.setData(buffer,p.getOffset(),buffer.length);
  234   	
  235   	listeners.put(new Long(zobj.getSeqNum()),new Listener(interpolator,level,System.currentTimeMillis()-from,dim,buffer.length));
  236   	
  237   	impl._send(p);
  238       }
  239   
  240       /**
  241        * This method is responsible for data decompression. It uses the loaded decompression engine
  242        * (or {@link java.net.InflatingDecompressor InflatingDecompressor} if not provided) and
  243        * a ByteArrayInputStream (that renders a byte array as an input stream) to build an
  244        * InflaterInputStream that "reads" compressed data and write "uncompressed data".
  245        * @see java.io.ByteArrayInputStream if you want to build an input stream that gets data from a byte array, you got it
  246        * @see java.net.Decompressor the decompression engine
  247        * @see java.util.zip.InflaterInputStream the decompression stream
  248        */
  249       public void decompress(DatagramPacket p) throws NoMagicException,IOException {
  250   	int length;
  251   	byte[] data;
  252   	try {
  253   	    data=(new ZObject(p)).getData();
  254   	} catch(AckException e) {
  255   	    throw new IOException("Spurious ack packet");
  256   	}
  257   	byte[] buffer=new byte[8192];
  258   	ByteArrayOutputStream out=new ByteArrayOutputStream();
  259   	InflaterInputStream inflater=new InflaterInputStream(new ByteArrayInputStream(data),(Inflater)decompressor);
  260   
  261   	while ((length=inflater.read(buffer,0,buffer.length))!=-1)
  262   	    out.write(buffer,0,length);
  263   
  264   	inflater.close();
  265   	decompressor.reset();
  266   	data=out.toByteArray();
  267   	p.setData(data,p.getOffset(),data.length);
  268       }
  269   }

Save This Page
Home » openjdk-7 » java » net » [javadoc | source]