Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/apache/derby/impl/services/daemon/BasicDaemon.java


1   /*
2   
3      Derby - Class org.apache.derby.impl.services.daemon.BasicDaemon
4   
5      Copyright 1997, 2004 The Apache Software Foundation or its licensors, as applicable.
6   
7      Licensed under the Apache License, Version 2.0 (the "License");
8      you may not use this file except in compliance with the License.
9      You may obtain a copy of the License at
10  
11        http://www.apache.org/licenses/LICENSE-2.0
12  
13     Unless required by applicable law or agreed to in writing, software
14     distributed under the License is distributed on an "AS IS" BASIS,
15     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16     See the License for the specific language governing permissions and
17     limitations under the License.
18  
19   */
20  
21  package org.apache.derby.impl.services.daemon;
22  
23  import org.apache.derby.iapi.services.context.ContextService;
24  import org.apache.derby.iapi.services.context.ContextManager;
25  import org.apache.derby.iapi.services.daemon.DaemonService;
26  import org.apache.derby.iapi.services.daemon.Serviceable;
27  import org.apache.derby.iapi.services.monitor.Monitor;
28  import org.apache.derby.iapi.services.monitor.ModuleFactory;
29  import org.apache.derby.iapi.services.sanity.SanityManager;
30  
31  import org.apache.derby.iapi.error.StandardException;
32  
33  import java.util.Vector;
34  import java.util.List;
35  
36  /**
37    A BasicDaemon is a background worker thread which does asynchronous I/O and
38    general clean up.  It should not be used as a general worker thread for
39    parallel execution. 
40  
41    One cannot count on the order of request or count on when the daemon will
42    wake up, even with serviceNow requests.  Request are not persistent and not
43    recoverable, they are all lost when the system crashes or is shutdown.
44    System shutdown, even orderly ones, do not wait for daemons to finish its
45    work or empty its queue.  Furthermore, any Serviceable subscriptions,
46    including onDemandOnly, must tolerate spurrious services.  The BasicDaemon
47    will setup a context manager with no context on it.  The Serviceable
48    object's performWork must provide useful context on the context manager to
49    do its work.  The BasicDaemon will wrap performWork call with try / catch
50    block and will use the ContextManager's error handling to clean up any
51    error.  The BasicDaemon will guarentee serviceNow request will not be lost
52    as long as the jbms does not crash - however, if N serviceNow requests are
53    made by the same client, it may only be serviced once, not N times.
54  
55    Many Serviceable object will subscribe to the same BasicDaemon.  Their
56    performWork method should be well behaved - in other words, it should not
57    take too long or hog too many resources or deadlock with anyone else.  And
58    it cannot (should not) error out.
59  
60    The BasicDaemon implementation manages the DaemonService's data structure,
61    handles subscriptions and enqueues requests, and determine the service
62    schedule for its Serviceable objects.  The BasicDaemon keeps an array
63    (Vector) of Serviceable subscriptions it also keeps 2 queues for clients
64    that uses it for one time service - the 1st queue is for a serviceNow
65    enqueue request, the 2nd queue is for non serviceNow enqueue request.
66  
67    This BasicDaemon services its clients in the following order:
68    1. any subscribed client that have made a serviceNow request that has not
69          been fulfilled 
70    2. serviceable clients on the 1st queue
71    3. all subscribed clients that are not onDemandOnly
72    4. serviceable clients 2nd queue
73  
74  */
75  public class BasicDaemon implements DaemonService, Runnable
76  {
77    private int numClients;    // number of clients that needs services
78  
79    private static final int OPTIMAL_QUEUE_SIZE = 100;
80  
81    private final Vector subscription;
82  
83    // the context this daemon should run with
84    protected final ContextService contextService;
85    protected final ContextManager contextMgr;
86  
87    /**
88      Queues for the work to be done.
89      These are synchronized by this object.
90    */
91    private final List highPQ;    // high priority queue
92    private final List normPQ;    // normal priority queue
93  
94    /**
95      which subscribed clients to service next?
96      only accessed by daemon thread
97    */
98    private int nextService;
99  
100   /*
101   ** State for the sleep/wakeup routines.
102   */
103 
104   private boolean awakened;      // a wake up call has been issued 
105                 // MT - synchronized on this
106 
107   /**
108     true if I'm waiting, if this is false then I am running and a notify is not required.
109   */
110   private boolean waiting;
111 
112   private boolean inPause;      // if true, don't do anything
113   private boolean running;      // I am running now
114   private boolean stopRequested;    // thread is requested to die
115   private boolean stopped;      // we have stopped
116 
117   private long lastServiceTime; // when did I last wake up on a timer
118   private int earlyWakeupCount;    // if I am waken up a couple of times, check
119                 // that lastServiceTime to make sure work
120                 // scheduled on a timer gets done once in a
121                 // while
122 
123   /**
124     make a BasicDaemon
125 
126     @param priority the priority of the daemon thread
127     @param delay the number of milliseconds between servcies to its clients
128   */
129   public BasicDaemon(ContextService contextService)
130   {
131     this.contextService = contextService;
132     this.contextMgr = contextService.newContextManager();
133 
134     subscription = new Vector(1, 1);
135     highPQ = new java.util.LinkedList();
136     normPQ = new java.util.LinkedList();
137     
138     lastServiceTime = System.currentTimeMillis();
139   }
140 
141   public int subscribe(Serviceable newClient, boolean onDemandOnly)
142   {
143     int clientNumber;
144 
145     ServiceRecord clientRecord;
146 
147     synchronized(this)
148     {
149       clientNumber = numClients++;
150 
151       clientRecord = new ServiceRecord(newClient, onDemandOnly, true);
152       subscription.insertElementAt(clientRecord, clientNumber);
153     }
154 
155 
156     if (SanityManager.DEBUG)
157     {
158       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
159         SanityManager.DEBUG(DaemonService.DaemonTrace, 
160                 "subscribed client # " + clientNumber + " : " +
161                 clientRecord);
162     }
163 
164     return clientNumber;
165   }
166 
167   public void unsubscribe(int clientNumber)
168   {
169     if (clientNumber < 0 || clientNumber > subscription.size())
170       return;
171 
172     // client number is never reused.  Just null out the vector entry.
173     subscription.setElementAt(null, clientNumber);
174   }
175 
176   public void serviceNow(int clientNumber)
177   {
178     if (clientNumber < 0 || clientNumber > subscription.size())
179       return;
180 
181     ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(clientNumber);
182     if (clientRecord == null)
183       return;
184 
185     clientRecord.called();
186     wakeUp();
187   }
188 
189   public boolean enqueue(Serviceable newClient, boolean serviceNow)
190   {
191     ServiceRecord clientRecord = new ServiceRecord(newClient, false, false);
192 
193     if (SanityManager.DEBUG)
194     {
195       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
196         SanityManager.DEBUG(DaemonService.DaemonTrace,
197                   "enqueing work, urgent = " + serviceNow + ":" + newClient );
198     }
199 
200 
201     List queue = serviceNow ? highPQ : normPQ;
202 
203     int highPQsize;
204     synchronized (this) {
205       queue.add(clientRecord);
206       highPQsize = highPQ.size();
207 
208       if (SanityManager.DEBUG) {
209 
210         if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
211 
212           if (highPQsize > (OPTIMAL_QUEUE_SIZE * 2))
213             System.out.println("memoryLeakTrace:BasicDaemon " + highPQsize);
214         }
215       }
216     }
217 
218     if (serviceNow && !awakened)
219       wakeUp();
220 
221     if (serviceNow) {
222       return highPQsize > OPTIMAL_QUEUE_SIZE;
223     }
224     return false;
225   }
226 
227   /**
228     Get rid of all queued up Serviceable tasks.
229    */
230   public synchronized void clear()
231   {
232     normPQ.clear();
233     highPQ.clear();
234   }
235 
236   /*
237    * class specific methods
238    */
239 
240   protected ServiceRecord nextAssignment(boolean urgent)
241   {
242     // first goes thru the subscription list, then goes thru highPQ;
243     ServiceRecord clientRecord;
244 
245     while (nextService < subscription.size())
246     {
247       clientRecord = (ServiceRecord)subscription.elementAt(nextService++);
248       if (clientRecord != null && (clientRecord.needImmediateService() || (!urgent && clientRecord.needService())))
249         return clientRecord;
250     }
251 
252     clientRecord = null;
253 
254     synchronized(this)
255     {
256       if (!highPQ.isEmpty())
257         clientRecord = (ServiceRecord) highPQ.remove(0);
258     }
259 
260     if (urgent || clientRecord != null)
261     {
262       if (SanityManager.DEBUG)
263       {
264         if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
265           SanityManager.DEBUG(DaemonService.DaemonTrace, 
266                   clientRecord == null ? 
267                   "No more urgent assignment " : 
268                   "Next urgent assignment : " + clientRecord);
269       }
270       
271       return clientRecord;
272     }
273 
274     clientRecord = null;
275     synchronized(this)
276     {
277       if (!normPQ.isEmpty())
278       {
279         clientRecord = (ServiceRecord)normPQ.remove(0);
280 
281         if (SanityManager.DEBUG)
282         {
283           if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
284             SanityManager.DEBUG(DaemonService.DaemonTrace, 
285                     "Next normal enqueued : " + clientRecord);
286         }
287       }
288 
289       // else no more work 
290     }
291 
292     if (SanityManager.DEBUG)
293     {
294       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
295       {
296         if (clientRecord == null)
297           SanityManager.DEBUG(DaemonService.DaemonTrace, "No more assignment");
298       }
299     }
300 
301     return clientRecord;
302   }
303 
304   protected void serviceClient(ServiceRecord clientRecord) 
305   {
306     clientRecord.serviced();
307 
308     Serviceable client = clientRecord.client;
309 
310     // client may have unsubscribed while it had items queued
311     if (client == null)
312       return;
313 
314     ContextManager cm = contextMgr;
315 
316     if (SanityManager.DEBUG)
317     {
318       SanityManager.ASSERT(cm != null, "Context manager is null");
319       SanityManager.ASSERT(client != null, "client is null");
320     }
321 
322     try
323     {
324       int status = client.performWork(cm);
325 
326       if (clientRecord.subscriber)
327         return;
328 
329       if (status == Serviceable.REQUEUE)
330       {
331         List queue = client.serviceASAP() ? highPQ : normPQ;
332         synchronized (this) {
333           queue.add(clientRecord);
334 
335           if (SanityManager.DEBUG) {
336 
337             if (SanityManager.DEBUG_ON("memoryLeakTrace")) {
338 
339               if (queue.size() > (OPTIMAL_QUEUE_SIZE * 2))
340                 System.out.println("memoryLeakTrace:BasicDaemon " + queue.size());
341             }
342           }
343         }
344       }
345 
346       return;
347     }
348     catch (Throwable e)
349     {
350       if (SanityManager.DEBUG)
351         SanityManager.showTrace(e);
352       cm.cleanupOnError(e);
353     }
354   }
355 
356   /*
357    * Runnable methods
358    */
359   public void run()
360   {
361     contextService.setCurrentContextManager(contextMgr);
362 
363     if (SanityManager.DEBUG)
364     {
365       if (SanityManager.DEBUG_ON(DaemonService.DaemonOff))
366       {
367         SanityManager.DEBUG(DaemonService.DaemonTrace, "DaemonOff is set in properties, background Daemon not run");
368         return;
369       }
370       SanityManager.DEBUG(DaemonService.DaemonTrace, "running");
371     }
372 
373     // infinite loop of rest and work
374     while(true)
375     {
376       if (stopRequested())
377         break;
378 
379       // if someone wake me up, only service the urgent requests.
380       // if I wake up by my regular schedule, service all clients
381       boolean urgentOnly = rest();
382 
383       if (stopRequested())
384         break;
385 
386       if (!inPause())
387         work(urgentOnly);
388     }
389 
390     synchronized(this)
391     {
392       running = false;
393       stopped = true;
394     }
395     contextMgr.cleanupOnError(StandardException.normalClose());
396     contextService.resetCurrentContextManager(contextMgr);
397   }
398 
399   /*
400    * Daemon Service method
401    */
402 
403   /*
404    * pause the daemon.  Wait till it is no running before it returns
405    */
406   public void pause()
407   {
408     if (SanityManager.DEBUG)
409     {
410       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
411         SanityManager.DEBUG(DaemonService.DaemonTrace, "pausing daemon");
412     }
413 
414     synchronized(this)
415     {
416       inPause = true;
417       while(running)
418       {
419         if (SanityManager.DEBUG)
420         {
421           if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
422             SanityManager.DEBUG(DaemonService.DaemonTrace, 
423                     "waiting for daemon run to finish");
424         }
425 
426         try
427         {
428           wait();
429         }
430         catch (InterruptedException ie)        
431         {
432           // someone interrrupt us, done running
433         }
434       }
435     }
436 
437     if (SanityManager.DEBUG)
438     {
439       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
440         SanityManager.DEBUG(DaemonService.DaemonTrace, 
441                 "daemon paused");
442     }
443   }
444 
445   public void resume()
446   {
447     synchronized(this)
448     {
449       inPause = false;
450     }
451 
452     if (SanityManager.DEBUG) 
453     {
454       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
455         SanityManager.DEBUG(DaemonService.DaemonTrace, 
456                 "daemon resumed");
457     }
458   }
459 
460   /**
461     Finish what we are doing and at the next convenient moment, get rid of
462     the thread and make the daemon object goes away if possible.
463 
464     remember we are calling from another thread
465    */
466   public void stop()
467   {
468     if (stopped)      // already stopped
469       return;
470 
471     synchronized(this)
472     {
473       stopRequested = true;
474       notifyAll(); // get sleeper to wake up and stop ASAP
475     }
476 
477     pause(); // finish doing what we are doing first
478 
479   }
480 
481   /*
482   **Wait until the work in the high priority queue is done.
483   **Note: Used by tests only to make sure all the work 
484   **assigned to the daemon is completed.
485   **/
486   public void waitUntilQueueIsEmpty()
487   {
488     while(true){
489       synchronized(this)
490       {
491         boolean noSubscriptionRequests = true; 
492         for (int urgentServiced = 0; urgentServiced < subscription.size(); urgentServiced++)
493         {
494           ServiceRecord clientRecord = (ServiceRecord)subscription.elementAt(urgentServiced);
495           if (clientRecord != null &&  clientRecord.needService())
496           {
497             noSubscriptionRequests = false;
498             break;
499           }
500         }
501 
502         if (highPQ.isEmpty() && noSubscriptionRequests &&!running){
503           return;
504         }else{
505 
506           notifyAll(); //wake up the the daemon thread
507           //wait for the raw store daemon to wakeus up   
508           //when it finihes work.
509           try{
510             wait();
511           }catch (InterruptedException ie)
512           {
513             // someone interrupt us, see what's going on
514           }
515         }
516       }
517     }
518   }
519 
520   private synchronized boolean stopRequested()
521   {
522     return stopRequested;
523   }
524 
525   private synchronized boolean inPause()
526   {
527     return inPause;
528   }
529 
530   /*
531    * BasicDaemon method
532    */
533   protected synchronized void wakeUp()
534   {
535     if (!awakened) {
536       awakened = true;  // I am being awakened for urgent work.
537 
538       if (waiting) {
539         notifyAll();
540       }
541     }
542   }
543 
544   /**
545     Returns true if awakened by some notification, false if wake up by timer
546   */
547   private boolean rest()
548   {
549     if (SanityManager.DEBUG)
550     {
551       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
552         SanityManager.DEBUG(DaemonService.DaemonTrace, 
553                 "going back to rest");
554     }
555 
556     boolean urgentOnly;
557     boolean checkWallClock = false;
558     synchronized(this)
559     {
560       try
561       {  
562         if (!awakened) {
563           waiting = true;
564           wait(DaemonService.TIMER_DELAY);
565           waiting = false;
566         }
567       }
568       catch (InterruptedException ie)
569       {
570         // someone interrupt us, see what's going on
571       }
572 
573       nextService = 0;
574 
575       urgentOnly = awakened;
576       if (urgentOnly)  // check wall clock
577       {
578         // take a guess that each early request is services every 500ms.
579         if (earlyWakeupCount++ > (DaemonService.TIMER_DELAY / 500)) {
580           earlyWakeupCount = 0;
581           checkWallClock = true;
582         }
583       }
584       awakened = false;      // reset this for next time
585     }
586 
587     if (SanityManager.DEBUG)
588     {
589       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace)) 
590         SanityManager.DEBUG(DaemonService.DaemonTrace, 
591                 urgentOnly ?
592                 "someone wakes me up" : 
593                 "wakes up by myself");
594     }  
595 
596     if (checkWallClock)
597     {
598       long currenttime = System.currentTimeMillis();
599       if ((currenttime - lastServiceTime) > DaemonService.TIMER_DELAY)
600       {
601         lastServiceTime = currenttime;
602         urgentOnly = false;
603 
604         if (SanityManager.DEBUG)
605         {
606           if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
607             SanityManager.DEBUG(DaemonService.DaemonTrace, 
608                     "wall clock check says service all");
609         }
610       }
611     }
612 
613     return urgentOnly;
614   }
615 
616   private void work(boolean urgentOnly)
617   {
618     if (SanityManager.DEBUG)
619     {
620       if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
621         SanityManager.DEBUG(DaemonService.DaemonTrace, 
622                 "going back to work");
623     }
624 
625     ServiceRecord work;
626 
627 
628     // while I am working, all serviceNow requests that comes in now will
629     // be taken care of when we get the next Assignment.
630     int serviceCount = 0;
631 
632     int yieldFactor = 10;
633     if (urgentOnly && (highPQ.size() > OPTIMAL_QUEUE_SIZE))
634       yieldFactor = 2;
635 
636     int yieldCount = OPTIMAL_QUEUE_SIZE / yieldFactor;
637 
638 
639     for (work = nextAssignment(urgentOnly);
640        work != null;
641        work = nextAssignment(urgentOnly))
642     {
643       if (SanityManager.DEBUG)
644       {
645         if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
646           SanityManager.DEBUG(DaemonService.DaemonTrace, 
647                     "servicing " + work);
648       }
649 
650 
651       synchronized(this)
652       {
653         if (inPause || stopRequested)
654           break;      // don't do anything more
655         running = true;
656       }
657 
658       // do work
659       try
660       {
661         serviceClient(work);
662         serviceCount++;
663       }
664       finally  
665       {
666         // catch run time exceptions
667         synchronized(this)
668         {
669           running = false;
670           notifyAll();
671           if (inPause || stopRequested)
672             break;  // don't do anything more
673         }
674       }
675 
676       if (SanityManager.DEBUG)
677       {
678         if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
679           SanityManager.DEBUG(DaemonService.DaemonTrace, 
680                     "done " + work);
681       }
682 
683       // ensure the subscribed clients get a look in once in a while
684       // when the queues are large.
685       if ((serviceCount % (OPTIMAL_QUEUE_SIZE / 2)) == 0) {
686         nextService = 0;
687       }
688 
689       if ((serviceCount % yieldCount) == 0) {
690 
691         yield();
692       }
693 
694       if (SanityManager.DEBUG)
695       {
696         if (SanityManager.DEBUG_ON(DaemonService.DaemonTrace))
697           SanityManager.DEBUG(DaemonService.DaemonTrace,
698                     "come back from yield");
699       }
700     }
701   }
702 
703 
704   /* let everybody else run first */
705   private void yield()
706   {
707     Thread currentThread = Thread.currentThread();
708     int oldPriority = currentThread.getPriority();
709 
710     if (oldPriority <= Thread.MIN_PRIORITY)
711     {
712       currentThread.yield();
713     }
714     else
715     {
716       ModuleFactory mf = Monitor.getMonitor();
717       if (mf != null)
718         mf.setThreadPriority(Thread.MIN_PRIORITY);
719       currentThread.yield();
720       if (mf != null)
721         mf.setThreadPriority(oldPriority);
722     }
723   }
724 }
725 
726 
727 
728 
729 
730 
731 
732 
733