Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: org/altara/mars/engine/Controller.java


1   /* MARS Network Monitoring Engine
2      Copyright (C) 1999 Brian H. Trammell
3      Copyright (C) 2002 Leapfrog Research & Development, LLC
4   
5     This program is free software; you can redistribute it and/or
6     modify it under the terms of the GNU General Public License
7     as published by the Free Software Foundation; either version 2
8     of the License, or (at your option) any later version.
9   
10    This program is distributed in the hope that it will be useful,
11    but WITHOUT ANY WARRANTY; without even the implied warranty of
12    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13    GNU General Public License for more details.
14    
15    You should have received a copy of the GNU General Public License
16    along with this program; if not, it is available at 
17    http:///www.gnu.org/copyleft/gpl.html, or by writing to the
18    Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19    Boston, MA  02111-1307, USA.
20  */
21  
22  package org.altara.mars.engine;
23  
24  import org.altara.util.*;
25  import org.altara.mars.*;
26  import java.util.*;
27  
28  /** This class controls the MARS monitoring engine. It manages the monitoring
29    work queue, the monitoring threads, and the service status change list.
30  */
31  
32  public class Controller implements MarsModelListener {
33  
34    private MarsModel model;
35    private Queue runQueue;
36    private HashSet statListeners;
37    private HashSet probeListeners;
38    private HashSet notifyListeners;
39    private ProbeWorker[] workers;
40    private int checkouts;
41    private boolean active;
42  
43    public Controller() {
44      model = null;
45      runQueue = new Queue();
46      checkouts = 0;
47      statListeners = new HashSet();
48      probeListeners = new HashSet();
49      notifyListeners = new HashSet();
50      active = false;
51  
52          // create a new Notifier to handle notification
53          new Notifier(this);
54    }
55  
56    public synchronized void start() {
57      if (active) {
58        // we're already started, so fail silently.
59        return;
60      }
61  
62      if (model == null) {
63        // no model means don't start
64        throw new RuntimeException("Can't start with null model");
65      }
66  
67      // build the run queue (is this necessary?)
68      runQueue = new Queue();
69      int workerCount = buildRunQueue();
70  
71      // create and start a static thread pool
72      workers = new ProbeWorker[workerCount];
73      for (int i = 0; i < workers.length; i++) {
74        workers[i] = new ProbeWorker(this);
75        workers[i].start();
76      }
77  
78      // signify successful start
79      this.checkouts = 0;
80      this.active = true;
81  
82          Main.getMain().showStatus("Started monitoring...");
83    }
84  
85    public synchronized void stop() {
86      if (!active) {
87        // we're already stopped, so fail silently.
88        return;
89      }
90  
91      // shut down each thread
92      for (int i = 0; i < workers.length; i++) {
93        workers[i].kill();
94      }
95  
96      // signify successful stop
97      this.checkouts = 0;
98      this.active = false;
99  
100         Main.getMain().showStatus("Stopped monitoring.");
101   }
102 
103   public boolean isActive() {
104     return active;
105   }
106 
107   public MarsModel getModel() {
108     return model;
109   }
110 
111   public synchronized void setModel(MarsModel model) {
112     if (active) {
113       // programming error - must be stopped before setModel.
114       throw new RuntimeException("Can't change model while running");
115     }
116     this.model = model;
117     model.addMarsModelListener(this);
118   }
119 
120   public void addNotificationListener(NotificationListener listener) {
121     notifyListeners.add(listener);
122   }
123 
124   public void removeNotificationListener(NotificationListener listener) {
125     notifyListeners.remove(listener);
126   }
127 
128   public void addStatusChangeListener(StatusChangeListener listener) {
129     statListeners.add(listener);
130   }
131 
132   public void removeStatusChangeListener(StatusChangeListener listener) {
133     statListeners.remove(listener);
134   }
135 
136   public void addProbeListener(ProbeListener listener) {
137     probeListeners.add(listener);
138   }
139 
140   public void removeProbeListener(ProbeListener listener) {
141     probeListeners.remove(listener);
142   }
143 
144   public void notifyStatusChanged(StatusChangeEvent sce) {
145     if (sce.getService().getHost().getModel() != model) {
146       // ignore events not attached to our current model
147       return;
148     }
149 
150     // resend the event
151     Iterator listeners = notifyListeners.iterator();
152     while (listeners.hasNext()) {
153       ((NotificationListener)listeners.next()).notifyStatusChanged(sce);
154     }
155   }
156 
157   public void statusChanged(Service service,
158       Status oldStatus, Status newStatus) {
159     if (service.getHost().getModel() != model) {
160       // ignore services not attached to our current model
161       return;
162     }
163 
164     // construct a StatusChangedEvent to send to all our listeners
165     StatusChangeEvent ev =
166       new StatusChangeEvent(service,oldStatus,newStatus);
167     // send the event
168     Iterator listeners = statListeners.iterator();
169     while (listeners.hasNext()) {
170       ((StatusChangeListener)listeners.next()).statusChanged(ev);
171     }
172   }
173 
174   public void probeRun(Service service, Status newStatus) {
175     if (service.getHost().getModel() != model) {
176       // ignore services not attached to our current model
177       return;
178     }
179 
180     // construct a ProbeEvent to sent to all our listeners
181     ProbeEvent ev = new ProbeEvent(service, newStatus);
182     // send the event
183     Iterator listeners = probeListeners.iterator();
184     while (listeners.hasNext()) {
185       ((ProbeListener)listeners.next()).probeRun(ev);
186     }
187   }
188 
189   private int buildRunQueue() {
190     // run through the tree, depth first.
191     long totalTimeout = 0;
192     long totalPeriod = 0;
193     Iterator hostNames = model.getHostNames();
194     while (hostNames.hasNext()) {
195       Host host = model.getHost(((String)hostNames.next()));
196       Iterator serviceNames = host.getServiceNames();
197       while (serviceNames.hasNext()) {
198         Service service =
199           host.getService(((String)serviceNames.next()));
200         runQueue.enqueue(service.getProbe());
201         totalTimeout += service.getTimeout();
202         totalPeriod += service.getPeriod();
203       }
204     }
205 
206     // calculate the number of threads needed to service this queue
207     if (runQueue.size() == 0) {
208       return 1;
209     } else {
210       double avgPeriod = (double)totalPeriod / runQueue.size();
211       return (int)Math.ceil(totalTimeout/avgPeriod);
212     }
213   }
214 
215   public void rebuildRunQueue() {
216     // first, flush the queue and block all the workers
217     // lock the queue
218     synchronized (runQueue) {
219       runQueue.lock();
220       // wait for checkouts to fall to zero
221       while (checkouts > 0) {
222         try {
223           runQueue.wait();
224         } catch (InterruptedException ign) {};
225       }
226     }
227 
228     // all running threads should be stopped. first, flush the queue
229     runQueue.flush();
230 
231     // build the queue (don't change the threadcount)
232     buildRunQueue();
233 
234     // ... and unlock it
235     runQueue.unlock();
236   }
237 
238   Probe getNextProbe() throws InterruptedException {
239     synchronized (runQueue) {
240       Probe out = (Probe)runQueue.dequeue();
241       checkouts++;
242       return out;
243     }
244   }
245 
246   void returnProbe(Probe probe) {
247     synchronized (runQueue) {
248       checkouts--;
249       runQueue.enqueue(probe);
250     }
251   }
252 
253   int queueSize() {
254     return runQueue.size();
255   }
256 
257   public void hostChanged(Host host) {
258     rebuildRunQueue();
259   }
260 
261   public void serviceChanged(Service service) {
262     rebuildRunQueue();
263   }
264 
265   public void hostListChanged() {
266     rebuildRunQueue();
267   }
268 
269   public void serviceListChanged(Host host) {
270     rebuildRunQueue();
271   }
272 }