Source code: org/apache/derby/impl/services/daemon/BasicDaemon.java
1 /*
2
3 Derby - Class org.apache.derby.impl.services.daemon.BasicDaemon
4
5 Copyright 1997, 2004 The Apache Software Foundation or its licensors, as applicable.
6
7 Licensed under the Apache License, Version 2.0 (the "License");
8 you may not use this file except in compliance with the License.
9 You may obtain a copy of the License at
10
11 http://www.apache.org/licenses/LICENSE-2.0
12
13 Unless required by applicable law or agreed to in writing, software
14 distributed under the License is distributed on an "AS IS" BASIS,
15 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 See the License for the specific language governing permissions and
17 limitations under the License.
18
19 */
20
21 package org.apache.derby.impl.services.daemon;
22
23 import org.apache.derby.iapi.services.context.ContextService;
24 import org.apache.derby.iapi.services.context.ContextManager;
25 import org.apache.derby.iapi.services.daemon.DaemonService;
26 import org.apache.derby.iapi.services.daemon.Serviceable;
27 import org.apache.derby.iapi.services.monitor.Monitor;
28 import org.apache.derby.iapi.services.monitor.ModuleFactory;
29 import org.apache.derby.iapi.services.sanity.SanityManager;
30
31 import org.apache.derby.iapi.error.StandardException;
32
33 import java.util.Vector;
34 import java.util.List;
35
36 /**
37 A BasicDaemon is a background worker thread which does asynchronous I/O and
38 general clean up. It should not be used as a general worker thread for
39 parallel execution.
40
41 One cannot count on the order of request or count on when the daemon will
42 wake up, even with serviceNow requests. Request are not persistent and not
43 recoverable, they are all lost when the system crashes or is shutdown.
44 System shutdown, even orderly ones, do not wait for daemons to finish its
45 work or empty its queue. Furthermore, any Serviceable subscriptions,
46 including onDemandOnly, must tolerate spurrious services. The BasicDaemon
47 will setup a context manager with no context on it. The Serviceable
48 object's performWork must provide useful context on the context manager to
49 do its work. The BasicDaemon will wrap performWork call with try / catch
50 block and will use the ContextManager's error handling to clean up any
51 error. The BasicDaemon will guarentee serviceNow request will not be lost
52 as long as the jbms does not crash - however, if N serviceNow requests are
53 made by the same client, it may only be serviced once, not N times.
54
55 Many Serviceable object will subscribe to the same BasicDaemon. Their
56 performWork method should be well behaved - in other words, it should not
57 take too long or hog too many resources or deadlock with anyone else. And
58 it cannot (should not) error out.
59
60 The BasicDaemon implementation manages the DaemonService's data structure,
61 handles subscriptions and enqueues requests, and determine the service
62 schedule for its Serviceable objects. The BasicDaemon keeps an array
63 (Vector) of Serviceable subscriptions it also keeps 2 queues for clients
64 that uses it for one time service - the 1st queue is for a serviceNow
65 enqueue request, the 2nd queue is for non serviceNow enqueue request.
66
67 This BasicDaemon services its clients in the following order:
68 1. any subscribed client that have made a serviceNow request that has not
69 been fulfilled
70 2. serviceable clients on the 1st queue
71 3. all subscribed clients that are not onDemandOnly
72 4. serviceable clients 2nd queue
73
74 */
75 public class BasicDaemon implements DaemonService, Runnable
76 {
77 private int numClients; // number of clients that needs services
78
79 private static final int OPTIMAL_QUEUE_SIZE = 100;
80
81 private final Vector subscription;
82
83 // the context this daemon should run with
84 protected final ContextService contextService;
85 protected final ContextManager contextMgr;
86
87 /**
88 Queues for the work to be done.
89 These are synchronized by this object.
90 */
91 private final List highPQ; // high priority queue
92 private final List normPQ; // normal priority queue
93
94 /**
95 which subscribed clients to service next?
96 only accessed by daemon thread
97 */
98 private int nextService;
99
100 /*
101 ** State for the sleep/wakeup routines.
102 */
103
104 private boolean awakened; // a wake up call has been issued
105 // MT - synchronized on this
106
107 /**
108 true if I'm waiting, if this is false then I am running and a notify is not required.
109 */
110 private boolean waiting;
111
112 private boolean inPause; // if true, don't do anything
113 private boolean running; // I am running now
114 private boolean stopRequested; // thread is requested to die
115 private boolean stopped; // we have stopped
116
117 private long lastServiceTime; // when did I last wake up on a timer
118 private int earlyWakeupCount; // if I am waken up a couple of times, check
119 // that lastServiceTime to make sure work
120 // scheduled on a timer gets done once in a
121 // while
122
123 /**
124 make a BasicDaemon
125
126 @param priority the priority of the daemon thread
127 @param delay the number of milliseconds between servcies to its clients
128 */
129 public BasicDaemon(ContextService contextService)
130 {
131 this.contextService = contextService;
132 this.contextMgr = contextService.newContextManager();
133
134 subscription = new Vector(1, 1);
135 highPQ = new java.util.LinkedList();
136 normPQ = new java.util.LinkedList();
137
138 lastServiceTime = System.currentTimeMillis();
139 }
140
141 public int subscribe(Serviceable newClient, boolean onDemandOnly)
142 {
143 int clientNumber;
144
145 ServiceRecord clientRecord;
146
147 synchronized(this)
148 {
149 clientNumber = numClients++;
150
151 clientRecord = new ServiceRecord(newClient, onDemandOnly, true);
152 subscription.insertElementAt(clientRecord, clientNumber);
153 }
154
155
156 if (SanityManager.DEBUG)
157 {
158 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
159 SanityManager.DEBUG(DaemonService.DaemonTrace,
160 "subscribed client # " + clientNumber + " : " +
161 clientRecord);
162 }
163
164 return clientNumber;
165 }
166
167 public void unsubscribe(int clientNumber)
168 {
169 if (clientNumber < 0 || clientNumber > subscription.size())
170 return;
171
172 // client number is never reused. Just null out the vector entry.
173 subscription.setElementAt(null, clientNumber);
174 }
175
176 public void serviceNow(int clientNumber)
177 {
178 if (clientNumber < 0 || clientNumber > subscription.size())
179 return;
180
181 ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(clientNumber);
182 if (clientRecord == null)
183 return;
184
185 clientRecord.called();
186 wakeUp();
187 }
188
189 public boolean enqueue(Serviceable newClient, boolean serviceNow)
190 {
191 ServiceRecord clientRecord = new ServiceRecord(newClient, false, false);
192
193 if (SanityManager.DEBUG)
194 {
195 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
196 SanityManager.DEBUG(DaemonService.DaemonTrace,
197 "enqueing work, urgent = " + serviceNow + ":" + newClient );
198 }
199
200
201 List queue = serviceNow ? highPQ : normPQ;
202
203 int highPQsize;
204 synchronized (this) {
205 queue.add(clientRecord);
206 highPQsize = highPQ.size();
207
208 if (SanityManager.DEBUG) {
209
210 if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
211
212 if (highPQsize > (OPTIMAL_QUEUE_SIZE * 2))
213 System.out.println("memoryLeakTrace:BasicDaemon " + highPQsize);
214 }
215 }
216 }
217
218 if (serviceNow && !awakened)
219 wakeUp();
220
221 if (serviceNow) {
222 return highPQsize > OPTIMAL_QUEUE_SIZE;
223 }
224 return false;
225 }
226
227 /**
228 Get rid of all queued up Serviceable tasks.
229 */
230 public synchronized void clear()
231 {
232 normPQ.clear();
233 highPQ.clear();
234 }
235
236 /*
237 * class specific methods
238 */
239
240 protected ServiceRecord nextAssignment(boolean urgent)
241 {
242 // first goes thru the subscription list, then goes thru highPQ;
243 ServiceRecord clientRecord;
244
245 while (nextService < subscription.size())
246 {
247 clientRecord = (ServiceRecord)subscription.elementAt(nextService++);
248 if (clientRecord != null && (clientRecord.needImmediateService() || (!urgent && clientRecord.needService())))
249 return clientRecord;
250 }
251
252 clientRecord = null;
253
254 synchronized(this)
255 {
256 if (!highPQ.isEmpty())
257 clientRecord = (ServiceRecord) highPQ.remove(0);
258 }
259
260 if (urgent || clientRecord != null)
261 {
262 if (SanityManager.DEBUG)
263 {
264 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
265 SanityManager.DEBUG(DaemonService.DaemonTrace,
266 clientRecord == null ?
267 "No more urgent assignment " :
268 "Next urgent assignment : " + clientRecord);
269 }
270
271 return clientRecord;
272 }
273
274 clientRecord = null;
275 synchronized(this)
276 {
277 if (!normPQ.isEmpty())
278 {
279 clientRecord = (ServiceRecord)normPQ.remove(0);
280
281 if (SanityManager.DEBUG)
282 {
283 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
284 SanityManager.DEBUG(DaemonService.DaemonTrace,
285 "Next normal enqueued : " + clientRecord);
286 }
287 }
288
289 // else no more work
290 }
291
292 if (SanityManager.DEBUG)
293 {
294 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
295 {
296 if (clientRecord == null)
297 SanityManager.DEBUG(DaemonService.DaemonTrace, "No more assignment");
298 }
299 }
300
301 return clientRecord;
302 }
303
304 protected void serviceClient(ServiceRecord clientRecord)
305 {
306 clientRecord.serviced();
307
308 Serviceable client = clientRecord.client;
309
310 // client may have unsubscribed while it had items queued
311 if (client == null)
312 return;
313
314 ContextManager cm = contextMgr;
315
316 if (SanityManager.DEBUG)
317 {
318 SanityManager.ASSERT(cm != null, "Context manager is null");
319 SanityManager.ASSERT(client != null, "client is null");
320 }
321
322 try
323 {
324 int status = client.performWork(cm);
325
326 if (clientRecord.subscriber)
327 return;
328
329 if (status == Serviceable.REQUEUE)
330 {
331 List queue = client.serviceASAP() ? highPQ : normPQ;
332 synchronized (this) {
333 queue.add(clientRecord);
334
335 if (SanityManager.DEBUG) {
336
337 if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
338
339 if (queue.size() > (OPTIMAL_QUEUE_SIZE * 2))
340 System.out.println("memoryLeakTrace:BasicDaemon " + queue.size());
341 }
342 }
343 }
344 }
345
346 return;
347 }
348 catch (Throwable e)
349 {
350 if (SanityManager.DEBUG)
351 SanityManager.showTrace(e);
352 cm.cleanupOnError(e);
353 }
354 }
355
356 /*
357 * Runnable methods
358 */
359 public void run()
360 {
361 contextService.setCurrentContextManager(contextMgr);
362
363 if (SanityManager.DEBUG)
364 {
365 if (SanityManager.DEBUG_ON(DaemonService.DaemonOff))
366 {
367 SanityManager.DEBUG(DaemonService.DaemonTrace, "DaemonOff is set in properties, background Daemon not run");
368 return;
369 }
370 SanityManager.DEBUG(DaemonService.DaemonTrace, "running");
371 }
372
373 // infinite loop of rest and work
374 while(true)
375 {
376 if (stopRequested())
377 break;
378
379 // if someone wake me up, only service the urgent requests.
380 // if I wake up by my regular schedule, service all clients
381 boolean urgentOnly = rest();
382
383 if (stopRequested())
384 break;
385
386 if (!inPause())
387 work(urgentOnly);
388 }
389
390 synchronized(this)
391 {
392 running = false;
393 stopped = true;
394 }
395 contextMgr.cleanupOnError(StandardException.normalClose());
396 contextService.resetCurrentContextManager(contextMgr);
397 }
398
399 /*
400 * Daemon Service method
401 */
402
403 /*
404 * pause the daemon. Wait till it is no running before it returns
405 */
406 public void pause()
407 {
408 if (SanityManager.DEBUG)
409 {
410 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
411 SanityManager.DEBUG(DaemonService.DaemonTrace, "pausing daemon");
412 }
413
414 synchronized(this)
415 {
416 inPause = true;
417 while(running)
418 {
419 if (SanityManager.DEBUG)
420 {
421 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
422 SanityManager.DEBUG(DaemonService.DaemonTrace,
423 "waiting for daemon run to finish");
424 }
425
426 try
427 {
428 wait();
429 }
430 catch (InterruptedException ie)
431 {
432 // someone interrrupt us, done running
433 }
434 }
435 }
436
437 if (SanityManager.DEBUG)
438 {
439 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
440 SanityManager.DEBUG(DaemonService.DaemonTrace,
441 "daemon paused");
442 }
443 }
444
445 public void resume()
446 {
447 synchronized(this)
448 {
449 inPause = false;
450 }
451
452 if (SanityManager.DEBUG)
453 {
454 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
455 SanityManager.DEBUG(DaemonService.DaemonTrace,
456 "daemon resumed");
457 }
458 }
459
460 /**
461 Finish what we are doing and at the next convenient moment, get rid of
462 the thread and make the daemon object goes away if possible.
463
464 remember we are calling from another thread
465 */
466 public void stop()
467 {
468 if (stopped) // already stopped
469 return;
470
471 synchronized(this)
472 {
473 stopRequested = true;
474 notifyAll(); // get sleeper to wake up and stop ASAP
475 }
476
477 pause(); // finish doing what we are doing first
478
479 }
480
481 /*
482 **Wait until the work in the high priority queue is done.
483 **Note: Used by tests only to make sure all the work
484 **assigned to the daemon is completed.
485 **/
486 public void waitUntilQueueIsEmpty()
487 {
488 while(true){
489 synchronized(this)
490 {
491 boolean noSubscriptionRequests = true;
492 for (int urgentServiced = 0; urgentServiced < subscription.size(); urgentServiced++)
493 {
494 ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(urgentServiced);
495 if (clientRecord != null && clientRecord.needService())
496 {
497 noSubscriptionRequests = false;
498 break;
499 }
500 }
501
502 if (highPQ.isEmpty() && noSubscriptionRequests &&!running){
503 return;
504 }else{
505
506 notifyAll(); //wake up the the daemon thread
507 //wait for the raw store daemon to wakeus up
508 //when it finihes work.
509 try{
510 wait();
511 }catch (InterruptedException ie)
512 {
513 // someone interrupt us, see what's going on
514 }
515 }
516 }
517 }
518 }
519
520 private synchronized boolean stopRequested()
521 {
522 return stopRequested;
523 }
524
525 private synchronized boolean inPause()
526 {
527 return inPause;
528 }
529
530 /*
531 * BasicDaemon method
532 */
533 protected synchronized void wakeUp()
534 {
535 if (!awakened) {
536 awakened = true; // I am being awakened for urgent work.
537
538 if (waiting) {
539 notifyAll();
540 }
541 }
542 }
543
544 /**
545 Returns true if awakened by some notification, false if wake up by timer
546 */
547 private boolean rest()
548 {
549 if (SanityManager.DEBUG)
550 {
551 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
552 SanityManager.DEBUG(DaemonService.DaemonTrace,
553 "going back to rest");
554 }
555
556 boolean urgentOnly;
557 boolean checkWallClock = false;
558 synchronized(this)
559 {
560 try
561 {
562 if (!awakened) {
563 waiting = true;
564 wait(DaemonService.TIMER_DELAY);
565 waiting = false;
566 }
567 }
568 catch (InterruptedException ie)
569 {
570 // someone interrupt us, see what's going on
571 }
572
573 nextService = 0;
574
575 urgentOnly = awakened;
576 if (urgentOnly) // check wall clock
577 {
578 // take a guess that each early request is services every 500ms.
579 if (earlyWakeupCount++ > (DaemonService.TIMER_DELAY / 500)) {
580 earlyWakeupCount = 0;
581 checkWallClock = true;
582 }
583 }
584 awakened = false; // reset this for next time
585 }
586
587 if (SanityManager.DEBUG)
588 {
589 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
590 SanityManager.DEBUG(DaemonService.DaemonTrace,
591 urgentOnly ?
592 "someone wakes me up" :
593 "wakes up by myself");
594 }
595
596 if (checkWallClock)
597 {
598 long currenttime = System.currentTimeMillis();
599 if ((currenttime - lastServiceTime) > DaemonService.TIMER_DELAY)
600 {
601 lastServiceTime = currenttime;
602 urgentOnly = false;
603
604 if (SanityManager.DEBUG)
605 {
606 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
607 SanityManager.DEBUG(DaemonService.DaemonTrace,
608 "wall clock check says service all");
609 }
610 }
611 }
612
613 return urgentOnly;
614 }
615
616 private void work(boolean urgentOnly)
617 {
618 if (SanityManager.DEBUG)
619 {
620 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
621 SanityManager.DEBUG(DaemonService.DaemonTrace,
622 "going back to work");
623 }
624
625 ServiceRecord work;
626
627
628 // while I am working, all serviceNow requests that comes in now will
629 // be taken care of when we get the next Assignment.
630 int serviceCount = 0;
631
632 int yieldFactor = 10;
633 if (urgentOnly && (highPQ.size() > OPTIMAL_QUEUE_SIZE))
634 yieldFactor = 2;
635
636 int yieldCount = OPTIMAL_QUEUE_SIZE / yieldFactor;
637
638
639 for (work = nextAssignment(urgentOnly);
640 work != null;
641 work = nextAssignment(urgentOnly))
642 {
643 if (SanityManager.DEBUG)
644 {
645 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
646 SanityManager.DEBUG(DaemonService.DaemonTrace,
647 "servicing " + work);
648 }
649
650
651 synchronized(this)
652 {
653 if (inPause || stopRequested)
654 break; // don't do anything more
655 running = true;
656 }
657
658 // do work
659 try
660 {
661 serviceClient(work);
662 serviceCount++;
663 }
664 finally
665 {
666 // catch run time exceptions
667 synchronized(this)
668 {
669 running = false;
670 notifyAll();
671 if (inPause || stopRequested)
672 break; // don't do anything more
673 }
674 }
675
676 if (SanityManager.DEBUG)
677 {
678 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
679 SanityManager.DEBUG(DaemonService.DaemonTrace,
680 "done " + work);
681 }
682
683 // ensure the subscribed clients get a look in once in a while
684 // when the queues are large.
685 if ((serviceCount % (OPTIMAL_QUEUE_SIZE / 2)) == 0) {
686 nextService = 0;
687 }
688
689 if ((serviceCount % yieldCount) == 0) {
690
691 yield();
692 }
693
694 if (SanityManager.DEBUG)
695 {
696 if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
697 SanityManager.DEBUG(DaemonService.DaemonTrace,
698 "come back from yield");
699 }
700 }
701 }
702
703
704 /* let everybody else run first */
705 private void yield()
706 {
707 Thread currentThread = Thread.currentThread();
708 int oldPriority = currentThread.getPriority();
709
710 if (oldPriority <= Thread.MIN_PRIORITY)
711 {
712 currentThread.yield();
713 }
714 else
715 {
716 ModuleFactory mf = Monitor.getMonitor();
717 if (mf != null)
718 mf.setThreadPriority(Thread.MIN_PRIORITY);
719 currentThread.yield();
720 if (mf != null)
721 mf.setThreadPriority(oldPriority);
722 }
723 }
724 }
725
726
727
728
729
730
731
732
733