Source code: org/ematgine/utils/concurrent/Channel.java
1 /**
2 * Ematgine server source file
3 *
4 * Copyright (C) 2000-2001 <Mathieu Beauvais>
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * any later version.
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19 *
20 * Concurrent Versions System
21 * $Id: Channel.java,v 1.2 2002/11/10 21:00:52 none Exp $
22 */
23 /*
24 File: Channel.java
25
26 Originally written by Doug Lea and released into the public domain.
27 This may be used for any purposes whatsoever without acknowledgment.
28 Thanks for the assistance and support of Sun Microsystems Labs,
29 and everyone contributing, testing, and using this code.
30
31 History:
32 Date Who What
33 11Jun1998 dl Create public version
34 25aug1998 dl added peek
35 */
36
37 package org.ematgine.utils.concurrent;
38
39 /**
40 * Main interface for buffers, queues, pipes, conduits, etc.
41 * <p>
42 * A Channel represents anything that you can put items
43 * into and take them out of. As with the Sync
44 * interface, both
45 * blocking (put(x), take),
46 * and timeouts (offer(x, msecs), poll(msecs)) policies
47 * are provided. Using a
48 * zero timeout for offer and poll results in a pure balking policy.
49 * <p>
50 * To aid in efforts to use Channels in a more typesafe manner,
51 * this interface extends Puttable and Takable. You can restrict
52 * arguments of instance variables to this type as a way of
53 * guaranteeing that producers never try to take, or consumers put.
54 * for example:
55 * <pre>
56 * class Producer implements Runnable {
57 * final Puttable chan;
58 * Producer(Puttable channel) { chan = channel; }
59 * public void run() {
60 * try {
61 * for(;;) { chan.put(produce()); }
62 * }
63 * catch (InterruptedException ex) {}
64 * }
65 * Object produce() { ... }
66 * }
67 *
68 *
69 * class Consumer implements Runnable {
70 * final Takable chan;
71 * Producer(Takable channel) { chan = channel; }
72 * public void run() {
73 * try {
74 * for(;;) { consume(chan.take()); }
75 * }
76 * catch (xception ex) {}
77 * }
78 * void consume(Object x) { ... }
79 * }
80 *
81 * class Setup {
82 * void main() {
83 * Channel chan = new SomeChannelImplementation();
84 * Producer p = new Producer(chan);
85 * Consumer c = new Consumer(chan);
86 * new Thread(p).start();
87 * new Thread(c).start();
88 * }
89 * }
90 * </pre>
91 * <p>
92 * A given channel implementation might or might not have bounded
93 * capacity or other insertion constraints, so in general, you cannot tell if
94 * a given put will block. However,
95 * Channels that are designed to
96 * have an element capacity (and so always block when full)
97 * should implement the
98 * BoundedChannel
99 * subinterface.
100 * <p>
101 * Channels may hold any kind of item. However,
102 * insertion of null is not in general supported. Implementations
103 * may (all currently do) throw IllegalArgumentExceptions upon attempts to
104 * insert null.
105 * <p>
106 * By design, the Channel interface does not support any methods to determine
107 * the current number of elements being held in the channel, or
108 * whether it is empty or full.
109 * This decision reflects the fact that in
110 * concurrent programming, such methods are so rarely useful
111 * that including them invites misuse; at best they could
112 * provide a snapshot of current
113 * state, that could change immediately after being reported.
114 * It is better practice to instead use poll and offer to try
115 * to take and put elements without blocking. For example,
116 * to empty out the current contents of a channel, you could write:
117 * <pre>
118 * try {
119 * for (;;) {
120 * Object item = channel.poll(0);
121 * if (item != null)
122 * process(item);
123 * else
124 * break;
125 * }
126 * }
127 * catch(InterruptedException ex) { ... }
128 * </pre>
129 * <p>
130 * However, it is possible to determine whether an item
131 * exists in a Channel via <code>peek</code>, which returns
132 * but does NOT remove the next item that can be taken (or null
133 * if there is no such item). The peek operation has a limited
134 * range of applicability, and must be used with care. Unless it
135 * is known that a given thread is the only possible consumer
136 * of a channel, and that no time-out-based <code>offer</code> operations
137 * are ever invoked, there is no guarantee that the item returned
138 * by peek will be available for a subsequent take.
139 * <p>
140 * Also, as a compromise, even though it does not appear in interface,
141 * implementation classes that can readily compute the number
142 * of elements support a <code>size()</code> method. This allows careful
143 * use, for example in queue length monitors, appropriate to the
144 * particular implementation constraints and properties.
145 * <p>
146 * All channels allow multiple producers and/or consumers.
147 * They do not support any kind of <em>close</em> method
148 * to shut down operation or indicate completion of particular
149 * producer or consumer threads.
150 * If you need to signal completion, one way to do it is to
151 * create a class such as
152 * <pre>
153 * class EndOfStream {
154 * // Application-dependent field/methods
155 * }
156 * </pre>
157 * And to have producers put an instance of this class into
158 * the channel when they are done. The consumer side can then
159 * check this via
160 * <pre>
161 * Object x = aChannel.take();
162 * if (x instanceof EndOfStream)
163 * // special actions; perhaps terminate
164 * else
165 * // process normally
166 * </pre>
167 * <p>
168 * In time-out based methods (poll(msecs) and offer(x, msecs),
169 * time bounds are interpreted in
170 * a coarse-grained, best-effort fashion. Since there is no
171 * way in Java to escape out of a wait for a synchronized
172 * method/block, time bounds can sometimes be exceeded when
173 * there is a lot contention for the channel. Additionally,
174 * some Channel semantics entail a ``point of
175 * no return'' where, once some parts of the operation have completed,
176 * others must follow, regardless of time bound.
177 * <p>
178 * Interruptions are in general handled as early as possible
179 * in all methods. Normally, InterruptionExceptions are thrown
180 * in put/take and offer(msec)/poll(msec) if interruption
181 * is detected upon entry to the method, as well as in any
182 * later context surrounding waits.
183 * <p>
184 * If a put returns normally, an offer
185 * returns true, or a put or poll returns non-null, the operation
186 * completed successfully.
187 * In all other cases, the operation fails cleanly -- the
188 * element is not put or taken.
189 * <p>
190 * As with Sync classes, spinloops are not directly supported,
191 * are not particularly recommended for routine use, but are not hard
192 * to construct. For example, here is an exponential backoff version:
193 * <pre>
194 * Object backOffTake(Channel q) throws InterruptedException {
195 * long waitTime = 0;
196 * for (;;) {
197 * Object x = q.poll(0);
198 * if (x != null)
199 * return x;
200 * else {
201 * Thread.sleep(waitTime);
202 * waitTime = 3 * waitTime / 2 + 1;
203 * }
204 * }
205 * </pre>
206 * <p>
207 * <b>Sample Usage</b>. Here is a producer/consumer design
208 * where the channel is used to hold Runnable commands representing
209 * background tasks.
210 * <pre>
211 * class Service {
212 * private final Channel channel = ... some Channel implementation;
213 *
214 * private void backgroundTask(int taskParam) { ... }
215 *
216 * public void action(final int arg) {
217 * Runnable command =
218 * new Runnable() {
219 * public void run() { backgroundTask(arg); }
220 * };
221 * try { channel.put(command) }
222 * catch (InterruptedException ex) {
223 * Thread.currentThread().interrupt(); // ignore but propagate
224 * }
225 * }
226 *
227 * public Service() {
228 * Runnable backgroundLoop =
229 * new Runnable() {
230 * public void run() {
231 * for (;;) {
232 * try {
233 * Runnable task = (Runnable)(channel.take());
234 * task.run();
235 * }
236 * catch (InterruptedException ex) { return; }
237 * }
238 * }
239 * };
240 * new Thread(backgroundLoop).start();
241 * }
242 * }
243 *
244 * </pre>
245 * <p>[<a href="http://gee.cs.oswego.edu/dl/classes/EDU/oswego/cs/dl/util/concurrent/intro.html"> Introduction to this package. </a>]
246 * @see Sync
247 * @see BoundedChannel
248 **/
249
250 public interface Channel extends Puttable, Takable {
251
252 /**
253 * Place item in the channel, possibly waiting indefinitely until
254 * it can be accepted. Channels implementing the BoundedChannel
255 * subinterface are generally guaranteed to block on puts upon
256 * reaching capacity, but other implementations may or may not block.
257 * @param item the element to be inserted. Should be non-null.
258 * @exception InterruptedException if the current thread has
259 * been interrupted at a point at which interruption
260 * is detected, in which case the element is guaranteed not
261 * to be inserted. Otherwise, on normal return, the element is guaranteed
262 * to have been inserted.
263 **/
264 public void put(Object item) throws InterruptedException;
265
266 /**
267 * Place item in channel only if it can be accepted within
268 * msecs milliseconds. The time bound is interpreted in
269 * a coarse-grained, best-effort fashion.
270 * @param item the element to be inserted. Should be non-null.
271 * @param msecs the number of milliseconds to wait. If less than
272 * or equal to zero, the method does not perform any timed waits,
273 * but might still require
274 * access to a synchronization lock, which can impose unbounded
275 * delay if there is a lot of contention for the channel.
276 * @return true if accepted, else false
277 * @exception InterruptedException if the current thread has
278 * been interrupted at a point at which interruption
279 * is detected, in which case the element is guaranteed not
280 * to be inserted (i.e., is equivalent to a false return).
281 **/
282 public boolean offer(Object item, long msecs) throws InterruptedException;
283
284 /**
285 * Return and remove an item from channel,
286 * possibly waiting indefinitely until
287 * such an item exists.
288 * @return some item from the channel. Different implementations
289 * may guarantee various properties (such as FIFO) about that item
290 * @exception InterruptedException if the current thread has
291 * been interrupted at a point at which interruption
292 * is detected, in which case state of the channel is unchanged.
293 *
294 **/
295 public Object take() throws InterruptedException;
296
297
298 /**
299 * Return and remove an item from channel only if one is available within
300 * msecs milliseconds. The time bound is interpreted in a coarse
301 * grained, best-effort fashion.
302 * @param msecs the number of milliseconds to wait. If less than
303 * or equal to zero, the operation does not perform any timed waits,
304 * but might still require
305 * access to a synchronization lock, which can impose unbounded
306 * delay if there is a lot of contention for the channel.
307 * @return some item, or null if the channel is empty.
308 * @exception InterruptedException if the current thread has
309 * been interrupted at a point at which interruption
310 * is detected, in which case state of the channel is unchanged
311 * (i.e., equivalent to a false return).
312 **/
313
314 public Object poll(long msecs) throws InterruptedException;
315
316 /**
317 * Return, but do not remove object at head of Channel,
318 * or null if it is empty.
319 **/
320
321 public Object peek();
322
323
324 }
325