Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/k_int/IR/ReadAheadEnumeration.java


1   /**
2    * Title:       ReadAheadEnumeration
3    * @version:    $Id: ReadAheadEnumeration.java,v 1.17 2003/05/09 12:54:44 rob_tice Exp $
4    * Copyright:   Copyright (C) 2001 Knowledge Integration Ltd.
5    * @author:     Ian Ibbotson (ian.ibbotson@k-int.com)
6    * Company:     Knowledge Integration Ltd.
7    * Description: 
8    *              
9    */
10  
11  //
12  // This program is free software; you can redistribute it and/or
13  // modify it under the terms of the GNU General Public License
14  // as published by the Free Software Foundation; either version 2.1 of
15  // the license, or (at your option) any later version.
16  // 
17  // This program is distributed in the hope that it will be useful,
18  // but WITHOUT ANY WARRANTY; without even the implied warranty of
19  // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
20  // GNU General Public License for more details.
21  // 
22  // You should have received a copy of the GNU General Public License
23  // along with this program; if not, write to the Free Software
24  // Foundation, Inc., 59 Temple Place - Suite
25  // 330, Boston, MA  02111-1307, USA.
26  // 
27  
28  package com.k_int.IR;
29  
30  import com.k_int.IR.InformationFragmentSource;
31  import com.k_int.IR.InformationFragment;
32  import java.lang.ref.*;
33  
34  // For logging
35  import com.k_int.util.LoggingFacade.*;
36  
37  /**
38   * Enumerate the result records from an InformationFragmentSource.
39   * As normal FragmentSourceEnumeration, but caches chunks of records
40   * as presented by target. E.G. Fetch chunk_size rows, getNext returns 1,2,3,4...
41   * on 11, calls fetch again. And don't I wish I called it iterator instead....
42   *
43   * @version:    $Id: ReadAheadEnumeration.java,v 1.17 2003/05/09 12:54:44 rob_tice Exp $
44   * @author:     Ian Ibbotson (ian.ibbotson@k-int.com)
45   * @see:  com.k_int.IR.FragmentSourceEnumeration
46   * @see:  com.k_int.IR.SearchTask
47   *
48   */
49  public class ReadAheadEnumeration implements AsynchronousEnumeration
50  {
51    private InformationFragmentSource source = null;
52    private int current_element = 0;  // 0 is before first element...
53    private int min_cache_size = 5;
54    private java.util.LinkedList fragment_cache = new java.util.LinkedList();
55    private boolean fetching = false;
56    private int chunk_size = -1;
57    private RecordFormatSpecification spec = null;
58    private int next_to_request = 1;
59    private int next_record_timeout = 100000;
60    private WeakReference notification_target = null;
61  
62    private static LoggingContext cat = LogContextFactory.getContext("ReadAheadEnumeration");
63  
64    IFSNotificationTarget rae_present_callback_handler = new IFSNotificationTarget()
65    {
66      public void notifyRecords(InformationFragment[] records)
67      {
68        cat.debug("ReadAheadEnumeration::IFSNotificationTarget::records:"+records.length);
69  
70        next_to_request += records.length;
71  
72        synchronized ( fragment_cache )
73        {
74          for ( int i=0; i<records.length; i++ )
75            fragment_cache.addLast(records[i]);
76  
77          fetching = false;
78  
79          fragment_cache.notifyAll();
80        }
81  
82        checkCache();
83  
84        // Notify anyone waiting on this asynchronous enumeration that new records are available.
85        if ( notification_target != null )
86        {
87          Object o = notification_target.get();
88          if ( o != null )
89          {
90            synchronized(o)
91            {
92              o.notifyAll();
93            }
94          }
95        }
96      }
97  
98      public void notifyError(   String code_set,  // For z3950 read diagnostic set
99                                Integer code,      // For z3950 read diagnostic
100                                String reason,
101                             Exception source_exception)
102     {
103       fetching = false;
104       checkCache();
105     }
106   };
107 
108   private ReadAheadEnumeration()
109   {
110   }
111 
112   public ReadAheadEnumeration(InformationFragmentSource source)
113   {
114     this(source, 
115          new RecordFormatSpecification(new IndirectFormatProperty("default_record_syntax"),
116                                        null,
117                                        new IndirectFormatProperty("default_element_set_name")), 
118          10,
119          100000,
120          null);
121   }
122 
123   public ReadAheadEnumeration(InformationFragmentSource source, int chunk_size)
124   {
125     this(source,
126          new RecordFormatSpecification(new IndirectFormatProperty("default_record_syntax"),
127                                        null,
128                                        new IndirectFormatProperty("default_element_set_name")), 
129          chunk_size,
130          100000,
131          null);
132   }
133 
134   public ReadAheadEnumeration(InformationFragmentSource source,
135                   RecordFormatSpecification spec,
136                               Object notification_target)
137   {
138     this(source, spec, 10, 100000, notification_target);
139   }
140 
141   public ReadAheadEnumeration(InformationFragmentSource source,
142                   RecordFormatSpecification spec)
143   {
144     this(source, spec, 10, 100000, null);
145   }
146 
147   public ReadAheadEnumeration(InformationFragmentSource source, 
148                   RecordFormatSpecification spec,
149             int chunk_size,
150                               int next_record_timeout,
151                               Object notification_target)
152   {
153     this.chunk_size = chunk_size;
154     this.source = source;
155     this.spec = spec;
156     this.next_record_timeout = next_record_timeout;
157 
158     if ( notification_target != null )
159       this.notification_target = new WeakReference(notification_target);
160 
161     cat.debug("ReadAheadEumeration::ReadAheadEumeration(source,"+chunk_size+","+spec+")");
162   }
163 
164   public boolean hasMoreElements()
165   {
166     cat.debug("ReadAheadEumeration::hasMoreElements()");
167 
168     if ( (current_element) < source.getFragmentCount() )
169       return true;
170 
171     return false;
172   }
173 
174   public Object nextElement()
175   {
176     cat.debug("ReadAheadEumeration::nextElement()");
177 
178     Object retval = null;
179 
180     checkCache();
181 
182     if ( fragment_cache.size() == 0 )
183       waitForNextRecord(next_record_timeout);
184 
185     if ( fragment_cache.size() > 0 )
186       retval = fragment_cache.removeFirst();
187 
188     if ( retval != null )
189       current_element++;
190 
191     return retval;
192   }
193 
194   private void checkCache()
195   {
196     if ( fragment_cache.size() < min_cache_size )
197     {
198       if ( !fetching )
199       {
200         if ( cat.isDebugEnabled() )
201         {
202           cat.debug("Checking cache in ReadAheadEnumeration, current_element="+current_element+
203           " cache.size()="+fragment_cache.size()+" total="+source.getFragmentCount() );
204         }
205 
206         // Firstly, check that the cache does not represent the last items in the
207         // result set
208         if ( current_element + fragment_cache.size() < source.getFragmentCount() )
209         {
210           // Is the cache as big as we would like?
211           // Nope, we need to fetch the next chunk of records
212           // It's possible that this should use a thread pool when it looks like
213           // we might be approaching the cache limit... Need a semaphore to block
214           // if we ask for the next record whilst the threaded fetch is still in process.
215           cat.debug("There are still items outstanding....");
216           requestRecords();
217         }
218       }
219     }
220     else
221     {
222       // Cache is fine...
223     }
224   }
225 
226   private synchronized void requestRecords()
227   {
228     fetching = true;
229 
230     int records_available = source.getFragmentCount();
231 
232     int num_to_request = 0;
233 
234     if ( ( next_to_request + chunk_size ) > records_available )
235       num_to_request = records_available - next_to_request + 1;
236     else
237       num_to_request =  chunk_size;
238 
239     if ( cat.isDebugEnabled() )
240     {
241       cat.debug("ReadAheadEumeration::fetchRecords()");
242       cat.debug("Requesting "+num_to_request+" records, current="+current_element+" max="+records_available+" next to request="+next_to_request);
243     }
244 
245     // Don't forget getFragment is 1-based!
246     source.asyncGetFragment(next_to_request,num_to_request,spec,rae_present_callback_handler); 
247   }
248 
249   public boolean nextIsAvailable()
250   {
251     cat.debug("ReadAheadEumeration::nextIsAvailable()");
252 
253     checkCache();
254 
255     if ( fragment_cache.size() > 0 )
256       return true;
257 
258     return false;
259   }
260 
261   public IRStatusReport getStatus()
262   {
263     return source.getStatusReport();
264   }
265 
266   public void waitForNextRecord(int timeout)
267   {
268     cat.debug("Waiting for up to "+timeout+" ms for next record");
269 
270     switch ( timeout )
271     {
272       case -1:
273       {
274         while ( fragment_cache.size() == 0 )
275         {
276           synchronized( fragment_cache )
277           {
278             try
279             {
280               fragment_cache.wait();
281             }
282             catch ( InterruptedException ie )
283             {
284             }
285           }
286         }
287         break;
288       }
289       case 0:
290       {
291         break;
292       }
293       default:
294       {
295         long end_time = System.currentTimeMillis() + timeout;
296 
297         while ( ( System.currentTimeMillis() < end_time ) &&
298                 ( fragment_cache.size() == 0 ) )
299         {
300           long wait_time = end_time - System.currentTimeMillis();
301 
302           if ( wait_time > 0 )
303           {
304             synchronized( fragment_cache )
305             {
306               try
307               {
308                 fragment_cache.wait(wait_time);
309               }
310               catch ( InterruptedException ie )
311               {
312               }
313             }
314           }
315         }
316       }
317     }
318   }
319 
320   public void registerNotificationTarget(Object o)
321   {
322     notification_target = new WeakReference(o);
323   }
324 }