1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.util;
18
19 /**
20 * A fast queue that remover thread lock the adder thread. <br/>Limit the queue
21 * length when you have strange producer thread problemes.
22 *
23 * FIXME add i18n support to log messages
24 * @author Rainer Jung
25 * @author Peter Rossbach
26 * @version $Revision: 345567 $ $Date: 2005-11-18 16:07:23 -0500 (Fri, 18 Nov 2005) $
27 */
28 public class FastQueue implements IQueue {
29
30 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
31 .getLog(FastQueue.class);
32
33 /**
34 * This is the actual queue
35 */
36 private SingleRemoveSynchronizedAddLock lock = null;
37
38 /**
39 * First Object at queue (consumer message)
40 */
41 private LinkObject first = null;
42
43 /**
44 * Last object in queue (producer Object)
45 */
46 private LinkObject last = null;
47
48 /**
49 * Current Queue elements size
50 */
51 private int size = 0;
52
53 /**
54 * check lock to detect strange threadings things
55 */
56 private boolean checkLock = false;
57
58 /**
59 * protocol the thread wait times
60 */
61 private boolean timeWait = false;
62
63 /**
64 * calc stats data
65 */
66 private boolean doStats = false;
67
68 private boolean inAdd = false;
69
70 private boolean inRemove = false;
71
72 private boolean inMutex = false;
73
74 /**
75 * limit the queue legnth ( default is unlimited)
76 */
77 private int maxQueueLength = 0;
78
79 /**
80 * addWaitTimeout for producer
81 */
82 private long addWaitTimeout = 10000L;
83
84
85 /**
86 * removeWaitTimeout for consumer
87 */
88 private long removeWaitTimeout = 30000L;
89
90 /**
91 * enabled the queue
92 */
93 private boolean enabled = true;
94
95 /**
96 * calc all add objects
97 */
98 private long addCounter = 0;
99
100 /**
101 * calc all add objetcs in error state ( see limit queue length)
102 */
103 private long addErrorCounter = 0;
104
105 /**
106 * calc all remove objects
107 */
108 private long removeCounter = 0;
109
110 /**
111 * calc all remove objects failures (hupps probleme detection)
112 */
113 private long removeErrorCounter = 0;
114
115 /**
116 * Calc wait time thread
117 */
118 private long addWait = 0;
119
120 /**
121 * Calc remove time threads
122 */
123 private long removeWait = 0;
124
125 /**
126 * max queue size
127 */
128 private int maxSize = 0;
129
130 /**
131 * avg queue size
132 */
133 private long avgSize = 0;
134
135 private int maxSizeSample = 0;
136
137 private long avgSizeSample = 0;
138
139 /**
140 * avg size sample interval
141 */
142 private int sampleInterval = 100;
143
144 /**
145 * Generate Queue SingleRemoveSynchronizedAddLock and set add and wait
146 * Timeouts
147 */
148 public FastQueue() {
149 lock = new SingleRemoveSynchronizedAddLock();
150 lock.setAddWaitTimeout(addWaitTimeout);
151 lock.setRemoveWaitTimeout(removeWaitTimeout);
152 }
153
154 /**
155 * get current add wait timeout
156 *
157 * @return current wait timeout
158 */
159 public long getAddWaitTimeout() {
160 addWaitTimeout = lock.getAddWaitTimeout();
161 return addWaitTimeout;
162 }
163
164 /**
165 * Set add wait timeout (default 10000 msec)
166 *
167 * @param timeout
168 */
169 public void setAddWaitTimeout(long timeout) {
170 addWaitTimeout = timeout;
171 lock.setAddWaitTimeout(addWaitTimeout);
172 }
173
174 /**
175 * get current remove wait timeout
176 *
177 * @return The timeout
178 */
179 public long getRemoveWaitTimeout() {
180 removeWaitTimeout = lock.getRemoveWaitTimeout();
181 return removeWaitTimeout;
182 }
183
184 /**
185 * set remove wait timeout ( default 30000 msec)
186 *
187 * @param timeout
188 */
189 public void setRemoveWaitTimeout(long timeout) {
190 removeWaitTimeout = timeout;
191 lock.setRemoveWaitTimeout(removeWaitTimeout);
192 }
193
194 /**
195 * get Max Queue length
196 *
197 * @see org.apache.catalina.cluster.util.IQueue#getMaxQueueLength()
198 */
199 public int getMaxQueueLength() {
200 return maxQueueLength;
201 }
202
203 public void setMaxQueueLength(int length) {
204 maxQueueLength = length;
205 }
206
207 public boolean isEnabled() {
208 return enabled;
209 }
210
211 public void setEnabled(boolean enable) {
212 enabled = enable;
213 if (!enabled) {
214 lock.abortRemove();
215 }
216 }
217
218 /**
219 * @return Returns the checkLock.
220 */
221 public boolean isCheckLock() {
222 return checkLock;
223 }
224
225 /**
226 * @param checkLock The checkLock to set.
227 */
228 public void setCheckLock(boolean checkLock) {
229 this.checkLock = checkLock;
230 }
231
232 /**
233 * @return Returns the doStats.
234 */
235 public boolean isDoStats() {
236 return doStats;
237 }
238
239 /**
240 * @param doStats The doStats to set.
241 */
242 public void setDoStats(boolean doStats) {
243 this.doStats = doStats;
244 }
245
246 /**
247 * @return Returns the timeWait.
248 */
249 public boolean isTimeWait() {
250 return timeWait;
251 }
252
253 /**
254 * @param timeWait The timeWait to set.
255 */
256 public void setTimeWait(boolean timeWait) {
257 this.timeWait = timeWait;
258 }
259
260 public int getSampleInterval() {
261 return sampleInterval;
262 }
263
264 public void setSampleInterval(int interval) {
265 sampleInterval = interval;
266 }
267
268 public long getAddCounter() {
269 return addCounter;
270 }
271
272 public void setAddCounter(long counter) {
273 addCounter = counter;
274 }
275
276 public long getAddErrorCounter() {
277 return addErrorCounter;
278 }
279
280 public void setAddErrorCounter(long counter) {
281 addErrorCounter = counter;
282 }
283
284 public long getRemoveCounter() {
285 return removeCounter;
286 }
287
288 public void setRemoveCounter(long counter) {
289 removeCounter = counter;
290 }
291
292 public long getRemoveErrorCounter() {
293 return removeErrorCounter;
294 }
295
296 public void setRemoveErrorCounter(long counter) {
297 removeErrorCounter = counter;
298 }
299
300 public long getAddWait() {
301 return addWait;
302 }
303
304 public void setAddWait(long wait) {
305 addWait = wait;
306 }
307
308 public long getRemoveWait() {
309 return removeWait;
310 }
311
312 public void setRemoveWait(long wait) {
313 removeWait = wait;
314 }
315
316 /**
317 * @return The max size
318 */
319 public int getMaxSize() {
320 return maxSize;
321 }
322
323 /**
324 * @param size
325 */
326 public void setMaxSize(int size) {
327 maxSize = size;
328 }
329
330
331 /**
332 * Avg queue size
333 * @return The average queue size
334 */
335 public long getAvgSize() {
336 if (addCounter > 0) {
337 return avgSize / addCounter;
338 } else {
339 return 0;
340 }
341 }
342
343 /**
344 * reset all stats data
345 */
346 public void resetStatistics() {
347 addCounter = 0;
348 addErrorCounter = 0;
349 removeCounter = 0;
350 removeErrorCounter = 0;
351 avgSize = 0;
352 maxSize = 0;
353 addWait = 0;
354 removeWait = 0;
355 }
356
357 /**
358 * unlock queue for next add
359 */
360 public void unlockAdd() {
361 lock.unlockAdd(size > 0 ? true : false);
362 }
363
364 /**
365 * unlock queue for next remove
366 */
367 public void unlockRemove() {
368 lock.unlockRemove();
369 }
370
371 /**
372 * start queuing
373 */
374 public void start() {
375 setEnabled(true);
376 }
377
378 /**
379 * start queuing
380 */
381 public void stop() {
382 setEnabled(false);
383 }
384
385 public long getSample() {
386 return addCounter % sampleInterval;
387 }
388
389 public int getMaxSizeSample() {
390 return maxSizeSample;
391 }
392
393 public void setMaxSizeSample(int size) {
394 maxSizeSample = size;
395 }
396
397 public long getAvgSizeSample() {
398 long sample = addCounter % sampleInterval;
399 if (sample > 0) {
400 return avgSizeSample / sample;
401 } else if (addCounter > 0) {
402 return avgSizeSample / sampleInterval;
403 } else {
404 return 0;
405 }
406 }
407
408 public int getSize() {
409 int sz;
410 sz = size;
411 return sz;
412 }
413
414 /**
415 * Add new data to the queue
416 * @see org.apache.catalina.cluster.util.IQueue#add(java.lang.String, java.lang.Object)
417 * FIXME extract some method
418 */
419 public boolean add(String key, Object data) {
420 boolean ok = true;
421 long time = 0;
422
423 if (!enabled) {
424 if (log.isInfoEnabled())
425 log.info("FastQueue.add: queue disabled, add aborted");
426 return false;
427 }
428
429 if (timeWait) {
430 time = System.currentTimeMillis();
431 }
432 lock.lockAdd();
433 try {
434 if (timeWait) {
435 addWait += (System.currentTimeMillis() - time);
436 }
437
438 if (log.isTraceEnabled()) {
439 log.trace("FastQueue.add: starting with size " + size);
440 }
441 if (checkLock) {
442 if (inAdd)
443 log.warn("FastQueue.add: Detected other add");
444 inAdd = true;
445 if (inMutex)
446 log.warn("FastQueue.add: Detected other mutex in add");
447 inMutex = true;
448 }
449
450 if ((maxQueueLength > 0) && (size >= maxQueueLength)) {
451 ok = false;
452 if (log.isTraceEnabled()) {
453 log.trace("FastQueue.add: Could not add, since queue is full ("
454 + size + ">=" + maxQueueLength + ")");
455 }
456
457 } else {
458 LinkObject element = new LinkObject(key, data);
459 if (size == 0) {
460 first = last = element;
461 size = 1;
462 } else {
463 if (last == null) {
464 ok = false;
465 log
466 .error("FastQueue.add: Could not add, since last is null although size is "
467 + size + " (>0)");
468 } else {
469 last.append(element);
470 last = element;
471 size++;
472 }
473 }
474
475 }
476
477 if (doStats) {
478 if (ok) {
479 if (addCounter % sampleInterval == 0) {
480 maxSizeSample = 0;
481 avgSizeSample = 0;
482 }
483 addCounter++;
484 if (size > maxSize) {
485 maxSize = size;
486 }
487 if (size > maxSizeSample) {
488 maxSizeSample = size;
489 }
490 avgSize += size;
491 avgSizeSample += size;
492 } else {
493 addErrorCounter++;
494 }
495 }
496
497 if (first == null) {
498 log.error("FastQueue.add: first is null, size is " + size
499 + " at end of add");
500 }
501 if (last == null) {
502 log.error("FastQueue.add: last is null, size is " + size
503 + " at end of add");
504 }
505
506 if (checkLock) {
507 if (!inMutex)
508 log.warn("FastQueue.add: Cancelled by other mutex in add");
509 inMutex = false;
510 if (!inAdd)
511 log.warn("FastQueue.add: Cancelled by other add");
512 inAdd = false;
513 }
514 if (log.isTraceEnabled()) {
515 log.trace("FastQueue.add: add ending with size " + size);
516 }
517
518 if (timeWait) {
519 time = System.currentTimeMillis();
520 }
521 } finally {
522 lock.unlockAdd(true);
523 }
524 if (timeWait) {
525 addWait += (System.currentTimeMillis() - time);
526 }
527 return ok;
528 }
529
530 /**
531 * remove the complete queued object list
532 * @see org.apache.catalina.cluster.util.IQueue#remove()
533 * FIXME extract some method
534 */
535 public LinkObject remove() {
536 LinkObject element;
537 boolean gotLock;
538 long time = 0;
539
540 if (!enabled) {
541 if (log.isInfoEnabled())
542 log.info("FastQueue.remove: queue disabled, remove aborted");
543 return null;
544 }
545
546 if (timeWait) {
547 time = System.currentTimeMillis();
548 }
549 gotLock = lock.lockRemove();
550 try {
551
552 if (!gotLock) {
553 if (enabled) {
554 if (timeWait) {
555 removeWait += (System.currentTimeMillis() - time);
556 }
557 if (doStats) {
558 removeErrorCounter++;
559 }
560 if (log.isInfoEnabled())
561 log.info("FastQueue.remove: Remove aborted although queue enabled");
562 } else {
563 if (log.isInfoEnabled())
564 log.info("FastQueue.remove: queue disabled, remove aborted");
565 }
566 return null;
567 }
568
569 if (timeWait) {
570 removeWait += (System.currentTimeMillis() - time);
571 }
572
573 if (log.isTraceEnabled()) {
574 log.trace("FastQueue.remove: remove starting with size " + size);
575 }
576 if (checkLock) {
577 if (inRemove)
578 log.warn("FastQueue.remove: Detected other remove");
579 inRemove = true;
580 if (inMutex)
581 log.warn("FastQueue.remove: Detected other mutex in remove");
582 inMutex = true;
583 }
584
585 element = first;
586
587 if (doStats) {
588 if (element != null) {
589 removeCounter++;
590 } else {
591 removeErrorCounter++;
592 log
593 .error("FastQueue.remove: Could not remove, since first is null although size is "
594 + size + " (>0)");
595 }
596 }
597
598 first = last = null;
599 size = 0;
600
601 if (checkLock) {
602 if (!inMutex)
603 log.warn("FastQueue.remove: Cancelled by other mutex in remove");
604 inMutex = false;
605 if (!inRemove)
606 log.warn("FastQueue.remove: Cancelled by other remove");
607 inRemove = false;
608 }
609 if (log.isTraceEnabled()) {
610 log.trace("FastQueue.remove: remove ending with size " + size);
611 }
612
613 if (timeWait) {
614 time = System.currentTimeMillis();
615 }
616 } finally {
617 lock.unlockRemove();
618 }
619 if (timeWait) {
620 removeWait += (System.currentTimeMillis() - time);
621 }
622 return element;
623 }
624
625 }