Source code: org/altara/mars/engine/Controller.java
1 /* MARS Network Monitoring Engine
2 Copyright (C) 1999 Brian H. Trammell
3 Copyright (C) 2002 Leapfrog Research & Development, LLC
4
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License
7 as published by the Free Software Foundation; either version 2
8 of the License, or (at your option) any later version.
9
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
14
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, it is available at
17 http:///www.gnu.org/copyleft/gpl.html, or by writing to the
18 Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 Boston, MA 02111-1307, USA.
20 */
21
22 package org.altara.mars.engine;
23
24 import org.altara.util.*;
25 import org.altara.mars.*;
26 import java.util.*;
27
28 /** This class controls the MARS monitoring engine. It manages the monitoring
29 work queue, the monitoring threads, and the service status change list.
30 */
31
32 public class Controller implements MarsModelListener {
33
34 private MarsModel model;
35 private Queue runQueue;
36 private HashSet statListeners;
37 private HashSet probeListeners;
38 private HashSet notifyListeners;
39 private ProbeWorker[] workers;
40 private int checkouts;
41 private boolean active;
42
43 public Controller() {
44 model = null;
45 runQueue = new Queue();
46 checkouts = 0;
47 statListeners = new HashSet();
48 probeListeners = new HashSet();
49 notifyListeners = new HashSet();
50 active = false;
51
52 // create a new Notifier to handle notification
53 new Notifier(this);
54 }
55
56 public synchronized void start() {
57 if (active) {
58 // we're already started, so fail silently.
59 return;
60 }
61
62 if (model == null) {
63 // no model means don't start
64 throw new RuntimeException("Can't start with null model");
65 }
66
67 // build the run queue (is this necessary?)
68 runQueue = new Queue();
69 int workerCount = buildRunQueue();
70
71 // create and start a static thread pool
72 workers = new ProbeWorker[workerCount];
73 for (int i = 0; i < workers.length; i++) {
74 workers[i] = new ProbeWorker(this);
75 workers[i].start();
76 }
77
78 // signify successful start
79 this.checkouts = 0;
80 this.active = true;
81
82 Main.getMain().showStatus("Started monitoring...");
83 }
84
85 public synchronized void stop() {
86 if (!active) {
87 // we're already stopped, so fail silently.
88 return;
89 }
90
91 // shut down each thread
92 for (int i = 0; i < workers.length; i++) {
93 workers[i].kill();
94 }
95
96 // signify successful stop
97 this.checkouts = 0;
98 this.active = false;
99
100 Main.getMain().showStatus("Stopped monitoring.");
101 }
102
103 public boolean isActive() {
104 return active;
105 }
106
107 public MarsModel getModel() {
108 return model;
109 }
110
111 public synchronized void setModel(MarsModel model) {
112 if (active) {
113 // programming error - must be stopped before setModel.
114 throw new RuntimeException("Can't change model while running");
115 }
116 this.model = model;
117 model.addMarsModelListener(this);
118 }
119
120 public void addNotificationListener(NotificationListener listener) {
121 notifyListeners.add(listener);
122 }
123
124 public void removeNotificationListener(NotificationListener listener) {
125 notifyListeners.remove(listener);
126 }
127
128 public void addStatusChangeListener(StatusChangeListener listener) {
129 statListeners.add(listener);
130 }
131
132 public void removeStatusChangeListener(StatusChangeListener listener) {
133 statListeners.remove(listener);
134 }
135
136 public void addProbeListener(ProbeListener listener) {
137 probeListeners.add(listener);
138 }
139
140 public void removeProbeListener(ProbeListener listener) {
141 probeListeners.remove(listener);
142 }
143
144 public void notifyStatusChanged(StatusChangeEvent sce) {
145 if (sce.getService().getHost().getModel() != model) {
146 // ignore events not attached to our current model
147 return;
148 }
149
150 // resend the event
151 Iterator listeners = notifyListeners.iterator();
152 while (listeners.hasNext()) {
153 ((NotificationListener)listeners.next()).notifyStatusChanged(sce);
154 }
155 }
156
157 public void statusChanged(Service service,
158 Status oldStatus, Status newStatus) {
159 if (service.getHost().getModel() != model) {
160 // ignore services not attached to our current model
161 return;
162 }
163
164 // construct a StatusChangedEvent to send to all our listeners
165 StatusChangeEvent ev =
166 new StatusChangeEvent(service,oldStatus,newStatus);
167 // send the event
168 Iterator listeners = statListeners.iterator();
169 while (listeners.hasNext()) {
170 ((StatusChangeListener)listeners.next()).statusChanged(ev);
171 }
172 }
173
174 public void probeRun(Service service, Status newStatus) {
175 if (service.getHost().getModel() != model) {
176 // ignore services not attached to our current model
177 return;
178 }
179
180 // construct a ProbeEvent to sent to all our listeners
181 ProbeEvent ev = new ProbeEvent(service, newStatus);
182 // send the event
183 Iterator listeners = probeListeners.iterator();
184 while (listeners.hasNext()) {
185 ((ProbeListener)listeners.next()).probeRun(ev);
186 }
187 }
188
189 private int buildRunQueue() {
190 // run through the tree, depth first.
191 long totalTimeout = 0;
192 long totalPeriod = 0;
193 Iterator hostNames = model.getHostNames();
194 while (hostNames.hasNext()) {
195 Host host = model.getHost(((String)hostNames.next()));
196 Iterator serviceNames = host.getServiceNames();
197 while (serviceNames.hasNext()) {
198 Service service =
199 host.getService(((String)serviceNames.next()));
200 runQueue.enqueue(service.getProbe());
201 totalTimeout += service.getTimeout();
202 totalPeriod += service.getPeriod();
203 }
204 }
205
206 // calculate the number of threads needed to service this queue
207 if (runQueue.size() == 0) {
208 return 1;
209 } else {
210 double avgPeriod = (double)totalPeriod / runQueue.size();
211 return (int)Math.ceil(totalTimeout/avgPeriod);
212 }
213 }
214
215 public void rebuildRunQueue() {
216 // first, flush the queue and block all the workers
217 // lock the queue
218 synchronized (runQueue) {
219 runQueue.lock();
220 // wait for checkouts to fall to zero
221 while (checkouts > 0) {
222 try {
223 runQueue.wait();
224 } catch (InterruptedException ign) {};
225 }
226 }
227
228 // all running threads should be stopped. first, flush the queue
229 runQueue.flush();
230
231 // build the queue (don't change the threadcount)
232 buildRunQueue();
233
234 // ... and unlock it
235 runQueue.unlock();
236 }
237
238 Probe getNextProbe() throws InterruptedException {
239 synchronized (runQueue) {
240 Probe out = (Probe)runQueue.dequeue();
241 checkouts++;
242 return out;
243 }
244 }
245
246 void returnProbe(Probe probe) {
247 synchronized (runQueue) {
248 checkouts--;
249 runQueue.enqueue(probe);
250 }
251 }
252
253 int queueSize() {
254 return runQueue.size();
255 }
256
257 public void hostChanged(Host host) {
258 rebuildRunQueue();
259 }
260
261 public void serviceChanged(Service service) {
262 rebuildRunQueue();
263 }
264
265 public void hostListChanged() {
266 rebuildRunQueue();
267 }
268
269 public void serviceListChanged(Host host) {
270 rebuildRunQueue();
271 }
272 }