Save This Page
Home » openjdk-7 » java » net » [javadoc | source]
    1   /*  ZNet - Java Compression Layer for a new Socket Factory
    2       Copyright (C) 1999, Free Software Rulez
    3   
    4       This program is free software; you can redistribute it and/or modify
    5       it under the terms of the GNU General Public License as published by
    6       the Free Software Foundation; either version 2 of the License, or
    7       (at your option) any later version.
    8   
    9       This program is distributed in the hope that it will be useful,
   10       but WITHOUT ANY WARRANTY; without even the implied warranty of
   11       MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   12       GNU General Public License for more details.
   13   
   14       You should have received a copy of the GNU General Public License
   15       along with this program; if not, write to the Free Software
   16       Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
   17   
   18           The author of this program may be contacted at morgiaclaudio@yahoo.it. */
   19   
   20   package java.net;
   21   
   22   import java.lang;
   23   import java.io;
   24   import java.util;
   25   import java.util.zip;
   26   
   27   /**
   28    * This class manage the compression layer for the stream (connected) scenario.
   29    * It's a thread responsible for packet receiving and handling.
   30    * It can guess if the packet is compressed and so should be decompressed or if it's
   31    * an ack for a previously sent packet and should be used to compute statistics.
   32    * @see #run the main loop
   33    * @see java.net.Compressor the compression interface
   34    * @see java.net.Decompressor the decompression interface
   35    * @see java.net.Interpolator the statistical engine interface
   36    * @author <a href="mailto:morgiaclaudio@yahoo.it">Claudio Morgia</a>
   37    * @version 1.0
   38    */
   39   public class Connector implements Runnable {
   40   
   41       /**
   42        * Reference to the real compression engine that must follow the
   43        * {@link java.net.Compressor Compressor} interface.
   44        * @see java.net.Compressor
   45        * @see java.net.DeflatingCompressor
   46        */
   47       protected Compressor compressor;
   48   
   49       /**
   50        * Reference to the real decompression engine that must follow the
   51        * {@link java.net.Decompressor Decompressor} interface.
   52        * @see java.net.Decompressor
   53        * @see java.net.InflatingDecompressor
   54        */
   55       protected Decompressor decompressor;
   56   
   57       /**
   58        * Reference to the real statistical modeler that must follow the
   59        * {@link java.net.Interpolator Interpolator} interface.
   60        * @see java.net.Interpolator
   61        * @see java.net.LinearInterpolator
   62        */
   63       protected Interpolator interpolator;
   64   
   65       /**
   66        * Reference to the real input stream as provided by the plain-old
   67        * socket factory, tipically {@link java.net.PlainSocketImpl PlainSocketImpl}.
   68        */
   69       protected InputStream in;
   70   
   71       /**
   72        * Reference to the real output stream as provided by the plain-old
   73        * socket factory, tipically {@link java.net.PlainSocketImpl PlainSocketImpl}.
   74        */
   75       protected OutputStream out;
   76   
   77       /**
   78        * This input stream represents the input to the compression engine.
   79        * It is the reference passed when {@link #getInputStream getInputStream} is invoked.
   80        * IMHO it's interesting to see how it is realized because it's an infinite data queue.
   81        * @see java.net.ZInputStream
   82        */
   83       protected ZInputStream zin;
   84   
   85       /**
   86        * This output stream represents the output coming from the compression engine.
   87        * It is the reference passed when {@link #getOutputStream getOutputStream} is invoked.
   88        * @see java.net.ZOutputStream
   89        */
   90       protected ZOutputStream zout;
   91   
   92       /**
   93        * List of objects <i>waiting</i> for ack. This hashtable is indexed by sequence number
   94        * and contains objects of {@link java.net.Listener Listener} type.
   95        * Every <b>Listener</b> waits for a single ack packet and is responsible for the update
   96        * of the statistical interpolator.
   97        */
   98       protected Hashtable listeners;
   99   
  100       protected Thread myself;
  101   
  102       /**
  103        * This variable drive the main loop: a false value will stop the thread and prepare for exit.
  104        * The value can be changed using the {@link #stop stop} method.
  105        */
  106       protected boolean notStop;
  107   
  108       /**
  109        * This internal method is the common part of constructors. It's responsible for command-line options
  110        * gathering through the properties set.
  111        * Actually recognized property set is:
  112        * <ul>
  113        * <li>compressor   - to load a different compression engine (default: DeflatingCompressor)
  114        * <li>decompressor - to load a different decompression engine (default: InflatingDecompressor)
  115        * <li>interpolator - to load a different statistical modeler (default: LinearInterpolator)
  116        * </li>
  117        * Classes referenced through properties must be present into the classpath or must be loadable
  118        * through invocation of the standard method {#link java.lang.Class#forName(String) forName}.
  119        * The method {@link java.net.Compressor#getLevels() getLevels()} is used to retrieve the number of
  120        * supported compression level and to initialize the interpolator using the appropriate number of levels.
  121        * Finally, the main loop is started through thread creation.
  122        */
  123       protected synchronized void create() throws Exception {
  124   	String packname=getClass().getPackage().getName();
  125   	    
  126   	compressor=(Compressor)Class.forName(System.getProperty("compressor",packname+".DeflatingCompressor")).newInstance();
  127   	decompressor=(Decompressor)Class.forName(System.getProperty("decompressor",packname+".InflatingDecompressor")).newInstance();
  128   	Class interp_class=Class.forName(System.getProperty("interpolator",packname+".LinearInterpolator"));
  129   	Class[] params={Class.forName("java.lang.Integer")};
  130   	Object[] realp={new Integer(compressor.getLevels())};
  131   	interpolator=(Interpolator)interp_class.getConstructor(params).newInstance(realp);
  132   	myself=new Thread(this);
  133   	myself.start();
  134   	notStop=true;
  135       }
  136   
  137       /**
  138        * This constructor is used when the connector is not yet started and it is request by
  139        * the method {@link java.net.ZSocketImpl#getInputStream() getInputStream} of our socket
  140        * implementation.
  141        * It's perfectly specular (but non identical) to the next constructor.
  142        * Here the question is that, like with Object{Input | Output}Stream class, on client side
  143        * you should open output then input streams and on server side you should do the converse,
  144        * because the stream opening causes handshaking packet to be exchanged between the ObjectOutputStream
  145        * on client and ObjectInputStream on server.
  146        * If you open the two output (on both client and server) they'll send handshacking packets that nobody
  147        * receive and answer because no input stream is opened yet.
  148        * @see java.net.ZInputStream the unlimited input queue
  149        * @see java.net.ZOutputSteeam
  150        */
  151       public Connector(InputStream in,OutputStream out) throws Exception {
  152   	this.in=in;
  153   	this.zin=new ZInputStream();
  154   	this.out=out;
  155   	this.zout=new ZOutputStream(this);
  156   	listeners=new Hashtable();
  157   	create();
  158       }
  159   
  160       /**
  161        * See the previous constructor keeping in mind this one works for
  162        * {@link java.net.ZSocketImpl#getOutputStream() getOutputStream}
  163        * @see #Connector(InputStream,OutputStream)
  164        * @see java.net.ZInputStream the unlimited input queue
  165        * @see java.net.ZOutputSteeam
  166        */
  167       public Connector(OutputStream out,InputStream in) throws Exception {
  168   	this.out=out;
  169   	this.zout=new ZOutputStream(this);
  170   	this.in=in;
  171   	this.zin=new ZInputStream();
  172   	listeners=new Hashtable();
  173   	create();
  174       }
  175   
  176       /**
  177        * This method is responsible for data decompression. It uses the loaded decompression engine
  178        * (or {@link java.net.InflatingDecompressor InflatingDecompressor} if not provided) and
  179        * a ByteArrayInputStream (that renders a byte array as an input stream) to build an
  180        * InflaterInputStream that "reads" compressed data and write "uncompressed data".
  181        * @see java.io.ByteArrayInputStream if you want to build an input stream that gets data from a byte array, you got it
  182        * @see java.net.Decompressor the decompression engine
  183        * @see java.util.zip.InflaterInputStream the decompression stream
  184        */
  185       protected byte[] decompress(byte[] buf) throws IOException {
  186   	int length;
  187   	//buf=D.readFile("here.out");
  188   	byte[] buffer=new byte[65536];
  189   	ByteArrayOutputStream container=new ByteArrayOutputStream();
  190   	InflaterInputStream inflater=new InflaterInputStream(new ByteArrayInputStream(buf),(Inflater)decompressor);
  191   	while ((length=inflater.read(buffer,0,buffer.length))!=-1) {
  192   	    container.write(buffer,0,length);
  193   	}
  194   	inflater.close();
  195   	decompressor.reset();
  196   	return container.toByteArray();	
  197       }
  198   
  199       /**
  200        * This method is responsible for data compression. It uses the loaded compression engine
  201        * (or {@link java.net.DeflatingCompressor DeflatingCompressor} if not provided) and
  202        * a ByteArrayOutputStream (that renders an output stream as a byte array) to build a
  203        * DeflaterOutputStream that "writes" uncompressed data and read "compressed data".
  204        * @see java.io.ByteArrayOutputStream if you want to build an unlimited data queue that can be translated into a byte array, you got it
  205        * @see java.net.Compressor the compression engine
  206        * @see java.util.zip.DeflaterOutputStream the compression stream
  207        */
  208       protected byte[] compress(byte[] buf) throws IOException {
  209   	byte[] tmp;
  210   	ByteArrayOutputStream container=new ByteArrayOutputStream();
  211   	DeflaterOutputStream deflater=new DeflaterOutputStream(container,(Deflater)compressor);
  212   	deflater.write(buf,0,buf.length);
  213   	deflater.finish();
  214   	deflater.close();
  215   	tmp=container.toByteArray();
  216   	//D.writeFile("here.out",tmp);
  217   	return tmp;
  218       }
  219   
  220       /**
  221        * The main loop. Here packets get captured and converted into {@link java.net.ZObject ZObjects}
  222        * to see if they are normal compressed or ack. In the first case, data gets decompressed and queued
  223        * into the ZInputStream that can be given out as the primary input stream (through the method
  224        * {@link java.net.ZSocketImpl#getInputStream getInputStream} and an ack packet is immediately sent back.
  225        * In the case of a received ack, the appropriate listener gets notified of the arriving ack and it will
  226        * provide to update the interpolator.
  227        * @see java.net.ZInputStream our unlimited queue
  228        * @see java.net.ZObject
  229        * @see java.net.Listener the agent responsible for interpolator update
  230        */
  231       public void run() {
  232   	ZObject received;
  233   	while (notStop) {
  234   	    try {
  235   		received=new ZObject(in);
  236   		zin.addBytes(decompress(received.getData()));
  237   		sendAck(received);
  238   	    } catch(AckException ae) {
  239   		((Listener)listeners.get(new Long(ae.getSeqNum()))).done();
  240   	    } catch(Exception e) {
  241   		Thread.yield();
  242   	    }
  243   	}
  244       }
  245   
  246       /**
  247        * An ack is sent through this method, based on the passed ZObject that's the received one.
  248        * From that object it gets the sequence number and use it to build an ack-type ZObject.
  249        * Then the latter is "serialized" and transformed into a byte array , ready to be sent through the
  250        * real output stream (as provided by the previous socket factory).
  251        * We don't use standard serialization because the object's type domain is really limited (only one
  252        * class) and the resulting overhead would not be justified.
  253        */
  254       public void sendAck(ZObject zobj) throws IOException {
  255   	byte[] toSend=(new ZObject(zobj.getSeqNum())).prepareData();
  256   	out.write(toSend);
  257       }
  258   
  259       /**
  260        * This method is invoked by the {@link java.net.ZOutputStream#flush() ZoutputStream.flush} method
  261        * to perform the real data sending.
  262        * Basically it performs the following tasks:<ul>
  263        * <li>ask the interpolator to compute a suitable compression level based on data buffer
  264        * dimension,
  265        * <li>set the compression engine to work at that level
  266        * <li>compress data and acquire compression time
  267        * <li>build ZObject, serialize it and send
  268        * <li>install a new Listener object, waiting for ack to update the interpolator
  269        * </ul>
  270        */
  271       public void send(byte[] data) throws IOException {
  272    	Listener here;
  273    	byte[] buf;
  274   
  275    	int level=interpolator.computeLevel(data.length);
  276    	compressor.reset();
  277    	compressor.setLevel(level);
  278   	
  279    	long from=System.currentTimeMillis();
  280    	buf=compress(data);
  281    	int compressed_length=buf.length;
  282    	long compress_time=System.currentTimeMillis()-from;
  283   
  284    	ZObject toSend=new ZObject(buf);
  285   
  286    	from=System.currentTimeMillis();
  287    	out.write(toSend.prepareData());
  288    	listeners.put(new Long(toSend.getSeqNum()),new Listener(interpolator,level,compress_time,data.length,compressed_length));
  289        }
  290   
  291       public InputStream getInputStream() {
  292   	return zin;
  293       }
  294   
  295       public OutputStream getOutputStream() {
  296   	return zout;
  297       }
  298   
  299       public synchronized void stop() {
  300   	notStop=false;
  301       }
  302   
  303   }

Save This Page
Home » openjdk-7 » java » net » [javadoc | source]