1 /*
2 * SSHTools - Java SSH2 API
3 *
4 * Copyright (C) 2002-2003 Lee David Painter and Contributors.
5 *
6 * Contributions made by:
7 *
8 * Brett Smith
9 * Richard Pernavas
10 * Erwin Bolwidt
11 *
12 * This program is free software; you can redistribute it and/or
13 * modify it under the terms of the GNU General Public License
14 * as published by the Free Software Foundation; either version 2
15 * of the License, or (at your option) any later version.
16 *
17 * This program is distributed in the hope that it will be useful,
18 * but WITHOUT ANY WARRANTY; without even the implied warranty of
19 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 * GNU General Public License for more details.
21 *
22 * You should have received a copy of the GNU General Public License
23 * along with this program; if not, write to the Free Software
24 * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
25 */
26 package com.sshtools.j2ssh.transport;
27
28 import com.sshtools.j2ssh.io.ByteArrayReader;
29
30 import org.apache.commons.logging.Log;
31 import org.apache.commons.logging.LogFactory;
32
33 import java.util.ArrayList;
34 import java.util.HashMap;
35 import java.util.Iterator;
36 import java.util.List;
37 import java.util.Map;
38 import java.util.Vector;
39
40
41 /**
42 * <p>
43 * This class implements a message store that can be used to provide a blocking
44 * mechanism for transport protocol messages.
45 * </p>
46 *
47 * @author Lee David Painter
48 * @version $Revision: 1.42 $
49 *
50 * @since 0.2.0
51 */
52 public final class SshMessageStore {
53 private static Log log = LogFactory.getLog(SshMessageStore.class);
54
55 // List to hold messages as they are received
56 private List messages = new ArrayList();
57 private Map register = new HashMap();
58 private boolean isClosed = false;
59 private int[] singleIdFilter = new int[1];
60 private int interrupt = 5000;
61 private Vector listeners = new Vector();
62
63 /**
64 * <p>
65 * Contructs the message store.
66 * </p>
67 *
68 * @since 0.2.0
69 */
70 public SshMessageStore() {
71 }
72
73 /**
74 * <p>
75 * Evaluate whether the message store is closed.
76 * </p>
77 *
78 * @return
79 *
80 * @since 0.2.0
81 */
82 public boolean isClosed() {
83 return isClosed;
84 }
85
86 public void addMessageListener(SshMessageListener listener) {
87 synchronized (listeners) {
88 listeners.add(listener);
89 }
90 }
91
92 /**
93 * <p>
94 * Get a message from the store. This method will block until a message
95 * with an id matching the supplied filter arrives, or the message store
96 * closes. The message is removed from the store.
97 * </p>
98 *
99 * @param messageIdFilter an array of message ids that are acceptable
100 *
101 * @return the next available message
102 *
103 * @throws MessageStoreEOFException if the message store is closed
104 * @throws InterruptedException if the thread was interrupted
105 *
106 * @since 0.2.0
107 */
108 public synchronized SshMessage getMessage(int[] messageIdFilter)
109 throws MessageStoreEOFException, InterruptedException {
110 try {
111 return getMessage(messageIdFilter, 0);
112 } catch (MessageNotAvailableException e) {
113 // This should never happen but throw just in case
114 throw new MessageStoreEOFException();
115 }
116 }
117
118 /**
119 * <p>
120 * Get a message from the store. This method will block until a message
121 * with an id matching the supplied filter arrives, the specified timeout
122 * is reached or the message store closes. The message is removed from the
123 * store.
124 * </p>
125 *
126 * @param messageIdFilter an array of message ids that are acceptable.
127 * @param timeout the maximum number of milliseconds to block before
128 * returning.
129 *
130 * @return the next available message
131 *
132 * @throws MessageStoreEOFException if the message store is closed
133 * @throws MessageNotAvailableException if the message is not available
134 * after a timeout
135 * @throws InterruptedException if the thread is interrupted
136 *
137 * @since 0.2.0
138 */
139 public synchronized SshMessage getMessage(int[] messageIdFilter, int timeout)
140 throws MessageStoreEOFException, MessageNotAvailableException,
141 InterruptedException {
142 if ((messages.size() <= 0) && isClosed) {
143 throw new MessageStoreEOFException();
144 }
145
146 if (messageIdFilter == null) {
147 return nextMessage();
148 }
149
150 SshMessage msg;
151 boolean firstPass = true;
152
153 if (timeout < 0) {
154 timeout = 0;
155 }
156
157 while ((messages.size() > 0) || !isClosed) {
158 // lookup the message
159 msg = lookupMessage(messageIdFilter, true);
160
161 if (msg != null) {
162 return msg;
163 } else {
164 // If this is the second time and there's no message, then throw
165 if (!firstPass && (timeout > 0)) {
166 throw new MessageNotAvailableException();
167 }
168 }
169
170 // Now wait
171 if (!isClosed) {
172 wait((timeout == 0) ? interrupt : timeout);
173 }
174
175 firstPass = false;
176 }
177
178 throw new MessageStoreEOFException();
179 }
180
181 /**
182 * <p>
183 * Get a message from the store. This method will block until a message
184 * with an id matching the supplied id arrives, or the message store
185 * closes. The message is removed from the store.
186 * </p>
187 *
188 * @param messageId the id of the message requried
189 *
190 * @return the next available message with the id supplied
191 *
192 * @throws MessageStoreEOFException if the message store closed
193 * @throws InterruptedException if the thread is interrupted
194 *
195 * @since 0.2.0
196 */
197 public synchronized SshMessage getMessage(int messageId)
198 throws MessageStoreEOFException, InterruptedException {
199 try {
200 return getMessage(messageId, 0);
201 } catch (MessageNotAvailableException e) {
202 // This should never happen by throw jsut in case
203 throw new MessageStoreEOFException();
204 }
205 }
206
207 /**
208 * <p>
209 * Get a message from the store. This method will block until a message
210 * with an id matching the supplied id arrives,the specified timeout is
211 * reached or the message store closes. The message will be removed from
212 * the store.
213 * </p>
214 *
215 * @param messageId the id of the message requried
216 * @param timeout the maximum number of milliseconds to block before
217 * returning.
218 *
219 * @return the next available message with the id supplied
220 *
221 * @throws MessageStoreEOFException if the message store closed
222 * @throws InterruptedException if the thread is interrupted
223 * @throws InterruptedException
224 *
225 * @since 0.2.0
226 */
227 public synchronized SshMessage getMessage(int messageId, int timeout)
228 throws MessageStoreEOFException, MessageNotAvailableException,
229 InterruptedException {
230 singleIdFilter[0] = messageId;
231
232 return getMessage(singleIdFilter, timeout);
233 }
234
235 /**
236 * <p>
237 * Evaluate whether the store has any messages.
238 * </p>
239 *
240 * @return true if messages exist, otherwise false
241 *
242 * @since 0.2.0
243 */
244 public boolean hasMessages() {
245 return messages.size() > 0;
246 }
247
248 /**
249 * <p>
250 * Returns the number of messages contained within this message store.
251 * </p>
252 *
253 * @return the number of messages
254 *
255 * @since 0.2.0
256 */
257 public int size() {
258 return messages.size();
259 }
260
261 /**
262 * <p>
263 * Determines if the message id is a registered message of this store.
264 * </p>
265 *
266 * @param messageId the message id
267 *
268 * @return true if the message id is registered, otherwise false
269 *
270 * @since 0.2.0
271 */
272 public boolean isRegisteredMessage(Integer messageId) {
273 return register.containsKey(messageId);
274 }
275
276 /**
277 * <p>
278 * Adds a raw message to the store and processes the data into a registered
279 * message.
280 * </p>
281 *
282 * @param msgdata the raw message data to process
283 *
284 * @throws MessageNotRegisteredException if the message id of the raw data
285 * is not a registered message
286 * @throws InvalidMessageException if the message is invalid
287 *
288 * @since 0.2.0
289 */
290 public void addMessage(byte[] msgdata)
291 throws MessageNotRegisteredException, InvalidMessageException {
292 Integer messageId = new Integer(msgdata[5]);
293
294 if (!isRegisteredMessage(messageId)) {
295 throw new MessageNotRegisteredException(messageId);
296 }
297
298 Class cls = (Class) register.get(SshMessage.getMessageId(msgdata));
299
300 try {
301 SshMessage msg = (SshMessage) cls.newInstance();
302 msg.fromByteArray(new ByteArrayReader(msgdata));
303 addMessage(msg);
304 } catch (IllegalAccessException iae) {
305 throw new InvalidMessageException(
306 "Illegal access for implementation class " + cls.getName());
307 } catch (InstantiationException ie) {
308 throw new InvalidMessageException("Instantiation failed for class " +
309 cls.getName());
310 }
311 }
312
313 /**
314 * <p>
315 * Add a formed message to the store.
316 * </p>
317 *
318 * @param msg the message to add to the store
319 *
320 * @throws MessageNotRegisteredException if the message type is not
321 * registered with the store
322 *
323 * @since 0.2.0
324 */
325 public synchronized void addMessage(SshMessage msg)
326 throws MessageNotRegisteredException {
327 // Add the message
328 messages.add(messages.size(), msg);
329
330 synchronized (listeners) {
331 if (listeners.size() > 0) {
332 for (Iterator it = listeners.iterator(); it.hasNext();) {
333 ((SshMessageListener) it.next()).messageReceived(msg);
334 }
335 }
336 }
337
338 // Notify the threads
339 notifyAll();
340 }
341
342 /**
343 * <p>
344 * Closes the store. This will cause any blocking operations on the message
345 * store to return.
346 * </p>
347 *
348 * @since 0.2.0
349 */
350 public synchronized void close() {
351 isClosed = true;
352
353 // We need to notify all anyway as if there are messages still available
354 // it should not affect the waiting threads as they are waiting for their
355 // own messages to be received because non were avaialable in the first place
356 //if (messages.size()<=0) {
357 notifyAll();
358
359 //}
360 }
361
362 /**
363 * <p>
364 * Get the next message in the store or wait until a new message arrives.
365 * The message is removed from the store.
366 * </p>
367 *
368 * @return the next available message.
369 *
370 * @throws MessageStoreEOFException if the message store is closed
371 * @throws InterruptedException if the thread is interrupted
372 *
373 * @since 0.2.0
374 */
375 public synchronized SshMessage nextMessage()
376 throws MessageStoreEOFException, InterruptedException {
377 if ((messages.size() <= 0) && isClosed) {
378 throw new MessageStoreEOFException();
379 }
380
381 // If there are no messages available then wait untill there are.
382 while ((messages.size() <= 0) && !isClosed) {
383 wait(interrupt);
384 }
385
386 if (messages.size() > 0) {
387 return (SshMessage) messages.remove(0);
388 } else {
389 throw new MessageStoreEOFException();
390 }
391 }
392
393 /**
394 *
395 */
396 public synchronized void breakWaiting() {
397 notifyAll();
398 }
399
400 /**
401 * <p>
402 * Get a message from the store without removing or blocking if the message
403 * does not exist.
404 * </p>
405 *
406 * @param messageIdFilter the id of the message requried
407 *
408 * @return the next available message with the id supplied
409 *
410 * @throws MessageStoreEOFException if the message store closed
411 * @throws MessageNotAvailableException if the message is not available
412 * @throws InterruptedException if the thread is interrupted
413 *
414 * @since 0.2.0
415 */
416 public synchronized SshMessage peekMessage(int[] messageIdFilter)
417 throws MessageStoreEOFException, MessageNotAvailableException,
418 InterruptedException {
419 return peekMessage(messageIdFilter, 0);
420 }
421
422 /**
423 * <p>
424 * Get a message from the store without removing it; only blocking for the
425 * number of milliseconds specified in the timeout field. If timeout is
426 * zero, the method will not block.
427 * </p>
428 *
429 * @param messageIdFilter an array of acceptable message ids
430 * @param timeout the number of milliseconds to wait
431 *
432 * @return the next available message of the acceptable message ids
433 *
434 * @throws MessageStoreEOFException if the message store is closed
435 * @throws MessageNotAvailableException if the message is not available
436 * @throws InterruptedException if the thread is interrupted
437 *
438 * @since 0.2.0
439 */
440 public synchronized SshMessage peekMessage(int[] messageIdFilter,
441 int timeout)
442 throws MessageStoreEOFException, MessageNotAvailableException,
443 InterruptedException {
444 SshMessage msg;
445
446 // Do a straight lookup
447 msg = lookupMessage(messageIdFilter, false);
448
449 if (msg != null) {
450 return msg;
451 }
452
453 // If were willing to wait the wait and look again
454 if (timeout > 0) {
455 if (log.isDebugEnabled()) {
456 log.debug("No message so waiting for " +
457 String.valueOf(timeout) + " milliseconds");
458 }
459
460 wait(timeout);
461 msg = lookupMessage(messageIdFilter, false);
462
463 if (msg != null) {
464 return msg;
465 }
466 }
467
468 // Nothing even after a wait so throw the relevant exception
469 if (isClosed) {
470 throw new MessageStoreEOFException();
471 } else {
472 throw new MessageNotAvailableException();
473 }
474 }
475
476 private SshMessage lookupMessage(int[] messageIdFilter, boolean remove) {
477 SshMessage msg;
478
479 for (int x = 0; x < messages.size(); x++) {
480 msg = (SshMessage) messages.get(x);
481
482 // Determine whether its one of the filtered messages
483 for (int i = 0; i < messageIdFilter.length; i++) {
484 if (msg.getMessageId() == messageIdFilter[i]) {
485 if (remove) {
486 messages.remove(msg);
487 }
488
489 return msg;
490 }
491 }
492 }
493
494 return null;
495 }
496
497 /**
498 * <p>
499 * Get a message from the store without removing it.
500 * </p>
501 *
502 * @param messageId the acceptable message id
503 *
504 * @return the next available message.
505 *
506 * @throws MessageStoreEOFException if the message store is closed.
507 * @throws MessageNotAvailableException if the message is not available.
508 * @throws InterruptedException if the thread is interrupted
509 *
510 * @since 0.2.0
511 */
512 public synchronized SshMessage peekMessage(int messageId)
513 throws MessageStoreEOFException, MessageNotAvailableException,
514 InterruptedException {
515 return peekMessage(messageId, 0);
516 }
517
518 /**
519 * <p>
520 * Removes a message from the message store.
521 * </p>
522 *
523 * @param msg the message to remove
524 *
525 * @since 0.2.0
526 */
527 public synchronized void removeMessage(SshMessage msg) {
528 messages.remove(msg);
529 }
530
531 /**
532 * <p>
533 * Get a message from the store without removing it, only blocking for the
534 * number of milliseconds specified in the timeout field.
535 * </p>
536 *
537 * @param messageId the acceptable message id
538 * @param timeout the timeout setting in milliseconds
539 *
540 * @return the next available message
541 *
542 * @throws MessageStoreEOFException if the message store is closed
543 * @throws MessageNotAvailableException if the message is not available
544 * @throws InterruptedException if the thread is interrupted
545 *
546 * @since 0.2.0
547 */
548 public synchronized SshMessage peekMessage(int messageId, int timeout)
549 throws MessageStoreEOFException, MessageNotAvailableException,
550 InterruptedException {
551 singleIdFilter[0] = messageId;
552
553 return peekMessage(singleIdFilter, timeout);
554 }
555
556 /**
557 * <p>
558 * Register a message implementation with the store.
559 * </p>
560 *
561 * @param messageId the id of the message
562 * @param implementor the class of the implementation
563 *
564 * @since 0.2.0
565 */
566 public void registerMessage(int messageId, Class implementor) {
567 Integer id = new Integer(messageId);
568 register.put(id, implementor);
569 }
570
571 /**
572 * <p>
573 * Returns an Object array (Integers) of the registered message ids.
574 * </p>
575 *
576 * @return the registered message id array
577 *
578 * @since 0.2.0
579 */
580 public Object[] getRegisteredMessageIds() {
581 return register.keySet().toArray();
582 }
583
584 /**
585 * <p>
586 * Create a formed message from raw message data.
587 * </p>
588 *
589 * @param msgdata the raw message data
590 *
591 * @return the formed message
592 *
593 * @throws MessageNotRegisteredException if the message is not a registered
594 * message
595 * @throws InvalidMessageException if the message is invalid
596 *
597 * @since 0.2.0
598 */
599 public SshMessage createMessage(byte[] msgdata)
600 throws MessageNotRegisteredException, InvalidMessageException {
601 Integer messageId = SshMessage.getMessageId(msgdata);
602
603 if (!isRegisteredMessage(messageId)) {
604 throw new MessageNotRegisteredException(messageId);
605 }
606
607 Class cls = (Class) register.get(SshMessage.getMessageId(msgdata));
608
609 try {
610 SshMessage msg = (SshMessage) cls.newInstance();
611 msg.fromByteArray(new ByteArrayReader(msgdata));
612
613 return msg;
614 } catch (IllegalAccessException iae) {
615 throw new InvalidMessageException(
616 "Illegal access for implementation class " + cls.getName());
617 } catch (InstantiationException ie) {
618 throw new InvalidMessageException("Instantiation failed for class " +
619 cls.getName());
620 }
621 }
622 }