Source code: com/k_int/IR/ReadAheadEnumeration.java
1 /**
2 * Title: ReadAheadEnumeration
3 * @version: $Id: ReadAheadEnumeration.java,v 1.17 2003/05/09 12:54:44 rob_tice Exp $
4 * Copyright: Copyright (C) 2001 Knowledge Integration Ltd.
5 * @author: Ian Ibbotson (ian.ibbotson@k-int.com)
6 * Company: Knowledge Integration Ltd.
7 * Description:
8 *
9 */
10
11 //
12 // This program is free software; you can redistribute it and/or
13 // modify it under the terms of the GNU General Public License
14 // as published by the Free Software Foundation; either version 2.1 of
15 // the license, or (at your option) any later version.
16 //
17 // This program is distributed in the hope that it will be useful,
18 // but WITHOUT ANY WARRANTY; without even the implied warranty of
19 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 // GNU General Public License for more details.
21 //
22 // You should have received a copy of the GNU General Public License
23 // along with this program; if not, write to the Free Software
24 // Foundation, Inc., 59 Temple Place - Suite
25 // 330, Boston, MA 02111-1307, USA.
26 //
27
28 package com.k_int.IR;
29
30 import com.k_int.IR.InformationFragmentSource;
31 import com.k_int.IR.InformationFragment;
32 import java.lang.ref.*;
33
34 // For logging
35 import com.k_int.util.LoggingFacade.*;
36
37 /**
38 * Enumerate the result records from an InformationFragmentSource.
39 * As normal FragmentSourceEnumeration, but caches chunks of records
40 * as presented by target. E.G. Fetch chunk_size rows, getNext returns 1,2,3,4...
41 * on 11, calls fetch again. And don't I wish I called it iterator instead....
42 *
43 * @version: $Id: ReadAheadEnumeration.java,v 1.17 2003/05/09 12:54:44 rob_tice Exp $
44 * @author: Ian Ibbotson (ian.ibbotson@k-int.com)
45 * @see: com.k_int.IR.FragmentSourceEnumeration
46 * @see: com.k_int.IR.SearchTask
47 *
48 */
49 public class ReadAheadEnumeration implements AsynchronousEnumeration
50 {
51 private InformationFragmentSource source = null;
52 private int current_element = 0; // 0 is before first element...
53 private int min_cache_size = 5;
54 private java.util.LinkedList fragment_cache = new java.util.LinkedList();
55 private boolean fetching = false;
56 private int chunk_size = -1;
57 private RecordFormatSpecification spec = null;
58 private int next_to_request = 1;
59 private int next_record_timeout = 100000;
60 private WeakReference notification_target = null;
61
62 private static LoggingContext cat = LogContextFactory.getContext("ReadAheadEnumeration");
63
64 IFSNotificationTarget rae_present_callback_handler = new IFSNotificationTarget()
65 {
66 public void notifyRecords(InformationFragment[] records)
67 {
68 cat.debug("ReadAheadEnumeration::IFSNotificationTarget::records:"+records.length);
69
70 next_to_request += records.length;
71
72 synchronized ( fragment_cache )
73 {
74 for ( int i=0; i<records.length; i++ )
75 fragment_cache.addLast(records[i]);
76
77 fetching = false;
78
79 fragment_cache.notifyAll();
80 }
81
82 checkCache();
83
84 // Notify anyone waiting on this asynchronous enumeration that new records are available.
85 if ( notification_target != null )
86 {
87 Object o = notification_target.get();
88 if ( o != null )
89 {
90 synchronized(o)
91 {
92 o.notifyAll();
93 }
94 }
95 }
96 }
97
98 public void notifyError( String code_set, // For z3950 read diagnostic set
99 Integer code, // For z3950 read diagnostic
100 String reason,
101 Exception source_exception)
102 {
103 fetching = false;
104 checkCache();
105 }
106 };
107
108 private ReadAheadEnumeration()
109 {
110 }
111
112 public ReadAheadEnumeration(InformationFragmentSource source)
113 {
114 this(source,
115 new RecordFormatSpecification(new IndirectFormatProperty("default_record_syntax"),
116 null,
117 new IndirectFormatProperty("default_element_set_name")),
118 10,
119 100000,
120 null);
121 }
122
123 public ReadAheadEnumeration(InformationFragmentSource source, int chunk_size)
124 {
125 this(source,
126 new RecordFormatSpecification(new IndirectFormatProperty("default_record_syntax"),
127 null,
128 new IndirectFormatProperty("default_element_set_name")),
129 chunk_size,
130 100000,
131 null);
132 }
133
134 public ReadAheadEnumeration(InformationFragmentSource source,
135 RecordFormatSpecification spec,
136 Object notification_target)
137 {
138 this(source, spec, 10, 100000, notification_target);
139 }
140
141 public ReadAheadEnumeration(InformationFragmentSource source,
142 RecordFormatSpecification spec)
143 {
144 this(source, spec, 10, 100000, null);
145 }
146
147 public ReadAheadEnumeration(InformationFragmentSource source,
148 RecordFormatSpecification spec,
149 int chunk_size,
150 int next_record_timeout,
151 Object notification_target)
152 {
153 this.chunk_size = chunk_size;
154 this.source = source;
155 this.spec = spec;
156 this.next_record_timeout = next_record_timeout;
157
158 if ( notification_target != null )
159 this.notification_target = new WeakReference(notification_target);
160
161 cat.debug("ReadAheadEumeration::ReadAheadEumeration(source,"+chunk_size+","+spec+")");
162 }
163
164 public boolean hasMoreElements()
165 {
166 cat.debug("ReadAheadEumeration::hasMoreElements()");
167
168 if ( (current_element) < source.getFragmentCount() )
169 return true;
170
171 return false;
172 }
173
174 public Object nextElement()
175 {
176 cat.debug("ReadAheadEumeration::nextElement()");
177
178 Object retval = null;
179
180 checkCache();
181
182 if ( fragment_cache.size() == 0 )
183 waitForNextRecord(next_record_timeout);
184
185 if ( fragment_cache.size() > 0 )
186 retval = fragment_cache.removeFirst();
187
188 if ( retval != null )
189 current_element++;
190
191 return retval;
192 }
193
194 private void checkCache()
195 {
196 if ( fragment_cache.size() < min_cache_size )
197 {
198 if ( !fetching )
199 {
200 if ( cat.isDebugEnabled() )
201 {
202 cat.debug("Checking cache in ReadAheadEnumeration, current_element="+current_element+
203 " cache.size()="+fragment_cache.size()+" total="+source.getFragmentCount() );
204 }
205
206 // Firstly, check that the cache does not represent the last items in the
207 // result set
208 if ( current_element + fragment_cache.size() < source.getFragmentCount() )
209 {
210 // Is the cache as big as we would like?
211 // Nope, we need to fetch the next chunk of records
212 // It's possible that this should use a thread pool when it looks like
213 // we might be approaching the cache limit... Need a semaphore to block
214 // if we ask for the next record whilst the threaded fetch is still in process.
215 cat.debug("There are still items outstanding....");
216 requestRecords();
217 }
218 }
219 }
220 else
221 {
222 // Cache is fine...
223 }
224 }
225
226 private synchronized void requestRecords()
227 {
228 fetching = true;
229
230 int records_available = source.getFragmentCount();
231
232 int num_to_request = 0;
233
234 if ( ( next_to_request + chunk_size ) > records_available )
235 num_to_request = records_available - next_to_request + 1;
236 else
237 num_to_request = chunk_size;
238
239 if ( cat.isDebugEnabled() )
240 {
241 cat.debug("ReadAheadEumeration::fetchRecords()");
242 cat.debug("Requesting "+num_to_request+" records, current="+current_element+" max="+records_available+" next to request="+next_to_request);
243 }
244
245 // Don't forget getFragment is 1-based!
246 source.asyncGetFragment(next_to_request,num_to_request,spec,rae_present_callback_handler);
247 }
248
249 public boolean nextIsAvailable()
250 {
251 cat.debug("ReadAheadEumeration::nextIsAvailable()");
252
253 checkCache();
254
255 if ( fragment_cache.size() > 0 )
256 return true;
257
258 return false;
259 }
260
261 public IRStatusReport getStatus()
262 {
263 return source.getStatusReport();
264 }
265
266 public void waitForNextRecord(int timeout)
267 {
268 cat.debug("Waiting for up to "+timeout+" ms for next record");
269
270 switch ( timeout )
271 {
272 case -1:
273 {
274 while ( fragment_cache.size() == 0 )
275 {
276 synchronized( fragment_cache )
277 {
278 try
279 {
280 fragment_cache.wait();
281 }
282 catch ( InterruptedException ie )
283 {
284 }
285 }
286 }
287 break;
288 }
289 case 0:
290 {
291 break;
292 }
293 default:
294 {
295 long end_time = System.currentTimeMillis() + timeout;
296
297 while ( ( System.currentTimeMillis() < end_time ) &&
298 ( fragment_cache.size() == 0 ) )
299 {
300 long wait_time = end_time - System.currentTimeMillis();
301
302 if ( wait_time > 0 )
303 {
304 synchronized( fragment_cache )
305 {
306 try
307 {
308 fragment_cache.wait(wait_time);
309 }
310 catch ( InterruptedException ie )
311 {
312 }
313 }
314 }
315 }
316 }
317 }
318 }
319
320 public void registerNotificationTarget(Object o)
321 {
322 notification_target = new WeakReference(o);
323 }
324 }