1 /*
2 * Copyright 1999,2004-2005 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
5 * use this file except in compliance with the License. You may obtain a copy of
6 * the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
12 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
13 * License for the specific language governing permissions and limitations under
14 * the License.
15 */
16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.io.ByteArrayOutputStream;
20 import java.io.IOException;
21 import java.io.ObjectOutputStream;
22 import java.util.HashMap;
23 import java.util.Iterator;
24 import java.util.Map;
25 import java.util.zip.GZIPOutputStream;
26
27 import javax.management.MBeanServer;
28 import javax.management.ObjectName;
29
30 import org.apache.catalina.Container;
31 import org.apache.catalina.cluster.ClusterMessage;
32 import org.apache.catalina.cluster.ClusterSender;
33 import org.apache.catalina.cluster.Member;
34 import org.apache.catalina.cluster.util.IDynamicProperty;
35 import org.apache.catalina.core.StandardHost;
36 import org.apache.catalina.util.StringManager;
37 import org.apache.tomcat.util.IntrospectionUtils;
38
39 /**
40 * Transmit message to ohter cluster members create sender from replicationMode
41 * type
42 * FIXME i18n log messages
43 * FIXME compress data depends on message type and size
44 * FIXME send very big messages at some block see FarmWarDeployer!
45 * TODO pause and resume senders
46 *
47 * @author Peter Rossbach
48 * @author Filip Hanik
49 * @version $Revision: 345567 $ $Date: 2005-11-18 16:07:23 -0500 (Fri, 18 Nov 2005) $
50 */
51 public class ReplicationTransmitter implements ClusterSender,IDynamicProperty {
52 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
53 .getLog(ReplicationTransmitter.class);
54
55 /**
56 * The descriptive information about this implementation.
57 */
58 private static final String info = "ReplicationTransmitter/3.0";
59
60 /**
61 * The string manager for this package.
62 */
63 protected StringManager sm = StringManager.getManager(Constants.Package);
64
65 private Map map = new HashMap();
66
67 public ReplicationTransmitter() {
68 }
69
70 /**
71 * number of transmitted messages>
72 */
73 private long nrOfRequests = 0;
74
75 /**
76 * number of transmitted bytes
77 */
78 private long totalBytes = 0;
79
80 /**
81 * number of failure
82 */
83 private long failureCounter = 0;
84
85 /**
86 * Iteration count for background processing.
87 */
88 private int count = 0;
89
90 /**
91 * Frequency of the check sender keepAlive Socket Status.
92 */
93 protected int processSenderFrequency = 2;
94
95 /**
96 * current sender replication mode
97 */
98 private String replicationMode;
99
100 /**
101 * sender default ackTimeout
102 */
103 private long ackTimeout = 15000; //15 seconds by default
104
105 /**
106 * enabled wait for ack
107 */
108 private boolean waitForAck = true;
109
110 /**
111 * autoConnect sender when next message send
112 */
113 private boolean autoConnect = false;
114
115 /**
116 * Compress message data bytes
117 */
118 private boolean compress = false;
119
120 /**
121 * doTransmitterProcessingStats
122 */
123 protected boolean doTransmitterProcessingStats = false;
124
125 /**
126 * proessingTime
127 */
128 protected long processingTime = 0;
129
130 /**
131 * min proessingTime
132 */
133 protected long minProcessingTime = Long.MAX_VALUE ;
134
135 /**
136 * max proessingTime
137 */
138 protected long maxProcessingTime = 0;
139
140 /**
141 * dynamic sender <code>properties</code>
142 */
143 private Map properties = new HashMap();
144
145 /**
146 * my cluster
147 */
148 private SimpleTcpCluster cluster;
149
150 /**
151 * Transmitter Mbean name
152 */
153 private ObjectName objectName;
154
155 // ------------------------------------------------------------- Properties
156
157 /**
158 * Return descriptive information about this implementation and the
159 * corresponding version number, in the format
160 * <code><description>/<version></code>.
161 */
162 public String getInfo() {
163 return (info);
164 }
165
166 /**
167 * @return Returns the nrOfRequests.
168 */
169 public long getNrOfRequests() {
170 return nrOfRequests;
171 }
172
173 /**
174 * @return Returns the totalBytes.
175 */
176 public long getTotalBytes() {
177 return totalBytes;
178 }
179
180 /**
181 * @return Returns the failureCounter.
182 */
183 public long getFailureCounter() {
184 return failureCounter;
185 }
186
187 /**
188 * current replication mode
189 *
190 * @return The mode
191 */
192 public String getReplicationMode() {
193 return replicationMode;
194 }
195
196 /**
197 * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
198 *
199 * @see IDataSenderFactory#validateMode(String)
200 * @param mode
201 */
202 public void setReplicationMode(String mode) {
203 String msg = IDataSenderFactory.validateMode(mode);
204 if (msg == null) {
205 if (log.isDebugEnabled())
206 log.debug("Setting replcation mode to " + mode);
207 this.replicationMode = mode;
208 } else
209 throw new IllegalArgumentException(msg);
210
211 }
212
213 /**
214 * @return Returns the avg processingTime/nrOfRequests.
215 */
216 public double getAvgProcessingTime() {
217 return ((double)processingTime) / nrOfRequests;
218 }
219
220 /**
221 * @return Returns the maxProcessingTime.
222 */
223 public long getMaxProcessingTime() {
224 return maxProcessingTime;
225 }
226
227 /**
228 * @return Returns the minProcessingTime.
229 */
230 public long getMinProcessingTime() {
231 return minProcessingTime;
232 }
233
234 /**
235 * @return Returns the processingTime.
236 */
237 public long getProcessingTime() {
238 return processingTime;
239 }
240
241 /**
242 * @return Returns the doTransmitterProcessingStats.
243 */
244 public boolean isDoTransmitterProcessingStats() {
245 return doTransmitterProcessingStats;
246 }
247
248 /**
249 * @param doProcessingStats The doTransmitterProcessingStats to set.
250 */
251 public void setDoTransmitterProcessingStats(boolean doProcessingStats) {
252 this.doTransmitterProcessingStats = doProcessingStats;
253 }
254
255
256 /**
257 * Transmitter ObjectName
258 *
259 * @param name
260 */
261 public void setObjectName(ObjectName name) {
262 objectName = name;
263 }
264
265 public ObjectName getObjectName() {
266 return objectName;
267 }
268
269 /**
270 * @return Returns the compress.
271 */
272 public boolean isCompress() {
273 return compress;
274 }
275
276 /**
277 * @param compressMessageData
278 * The compress to set.
279 */
280 public void setCompress(boolean compressMessageData) {
281 this.compress = compressMessageData;
282 }
283
284 /**
285 * @return Returns the autoConnect.
286 */
287 public boolean isAutoConnect() {
288 return autoConnect;
289 }
290
291 /**
292 * @param autoConnect
293 * The autoConnect to set.
294 */
295 public void setAutoConnect(boolean autoConnect) {
296 this.autoConnect = autoConnect;
297 setProperty("autoConnect", String.valueOf(autoConnect));
298
299 }
300
301 /**
302 * @return The ack timeout
303 */
304 public long getAckTimeout() {
305 return ackTimeout;
306 }
307
308 /**
309 * @param ackTimeout
310 */
311 public void setAckTimeout(long ackTimeout) {
312 this.ackTimeout = ackTimeout;
313 setProperty("ackTimeout", String.valueOf(ackTimeout));
314 }
315
316 /**
317 * @return Returns the waitForAck.
318 */
319 public boolean isWaitForAck() {
320 return waitForAck;
321 }
322
323 /**
324 * @param waitForAck
325 * The waitForAck to set.
326 */
327 public void setWaitForAck(boolean waitForAck) {
328 this.waitForAck = waitForAck;
329 setProperty("waitForAck", String.valueOf(waitForAck));
330 }
331
332
333 /**
334 * @return Returns the processSenderFrequency.
335 */
336 public int getProcessSenderFrequency() {
337 return processSenderFrequency;
338 }
339
340 /**
341 * @param processSenderFrequency The processSenderFrequency to set.
342 */
343 public void setProcessSenderFrequency(int processSenderFrequency) {
344 this.processSenderFrequency = processSenderFrequency;
345 }
346
347 /*
348 * configured in cluster
349 *
350 * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
351 */
352 public void setCatalinaCluster(SimpleTcpCluster cluster) {
353 this.cluster = cluster;
354
355 }
356
357 /**
358 * @return True if synchronized sender
359 * @deprecated since version 5.5.7
360 */
361 public boolean getIsSenderSynchronized() {
362 return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
363 || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
364 }
365
366 // ------------------------------------------------------------- dynamic
367 // sender property handling
368
369 /**
370 * set config attributes with reflect
371 *
372 * @param name
373 * @param value
374 */
375 public void setProperty(String name, Object value) {
376 if (log.isTraceEnabled())
377 log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
378 value, properties.get(name)));
379
380 properties.put(name, value);
381 }
382
383 /**
384 * get current config
385 *
386 * @param key
387 * @return The property
388 */
389 public Object getProperty(String key) {
390 if (log.isTraceEnabled())
391 log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
392 return properties.get(key);
393 }
394
395 /**
396 * Get all properties keys
397 *
398 * @return An iterator over the propery name set
399 */
400 public Iterator getPropertyNames() {
401 return properties.keySet().iterator();
402 }
403
404 /**
405 * remove a configured property.
406 *
407 * @param key
408 */
409 public void removeProperty(String key) {
410 properties.remove(key);
411 }
412
413 // ------------------------------------------------------------- public
414
415 /**
416 * Send data to one member
417 * FIXME set filtering messages
418 * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage, org.apache.catalina.cluster.Member)
419 */
420 public void sendMessage(ClusterMessage message, Member member)
421 throws java.io.IOException {
422 long time = 0 ;
423 if(doTransmitterProcessingStats) {
424 time = System.currentTimeMillis();
425 }
426 try {
427 ClusterData data = serialize(message);
428 String key = getKey(member);
429 IDataSender sender = (IDataSender) map.get(key);
430 sendMessageData(data, sender);
431 } finally {
432 if (doTransmitterProcessingStats) {
433 addProcessingStats(time);
434 }
435 }
436 }
437
438 /**
439 * Send to all senders at same cluster domain as message from address
440 * @param message Cluster message to send
441 * @since 5.5.10
442 */
443 public void sendMessageClusterDomain(ClusterMessage message)
444 throws java.io.IOException {
445 long time = 0;
446 if (doTransmitterProcessingStats) {
447 time = System.currentTimeMillis();
448 }
449 try {
450 String domain = message.getAddress().getDomain();
451 if(domain == null)
452 throw new RuntimeException("Domain at member not set");
453 ClusterData data = serialize(message);
454 IDataSender[] senders = getSenders();
455 for (int i = 0; i < senders.length; i++) {
456
457 IDataSender sender = senders[i];
458 if(domain.equals(sender.getDomain())) {
459 try {
460 sendMessageData(data, sender);
461 } catch (Exception x) {
462 if (!sender.getSuspect()) {
463 log.warn("Unable to send replicated message to "
464 + sender + ", is server down?", x);
465 sender.setSuspect(true);
466 }
467 }
468 }
469 }
470 } finally {
471 if (doTransmitterProcessingStats) {
472 addProcessingStats(time);
473 }
474 }
475
476 }
477
478 /**
479 * send message to all senders (broadcast)
480 * @see org.apache.catalina.cluster.ClusterSender#sendMessage(org.apache.catalina.cluster.ClusterMessage)
481 */
482 public void sendMessage(ClusterMessage message)
483 throws java.io.IOException {
484 long time = 0;
485 if (doTransmitterProcessingStats) {
486 time = System.currentTimeMillis();
487 }
488 try {
489 ClusterData data = serialize(message);
490 IDataSender[] senders = getSenders();
491 for (int i = 0; i < senders.length; i++) {
492
493 IDataSender sender = senders[i];
494 try {
495 sendMessageData(data, sender);
496 } catch (Exception x) {
497 if (!sender.getSuspect()) {
498 log.warn("Unable to send replicated message to "
499 + sender + ", is server down?", x);
500 sender.setSuspect(true);
501 }
502 }
503 }
504 } finally {
505 if (doTransmitterProcessingStats) {
506 addProcessingStats(time);
507 }
508 }
509 }
510
511 /**
512 * start the sender and register transmitter mbean
513 *
514 * @see org.apache.catalina.cluster.ClusterSender#start()
515 */
516 public void start() throws java.io.IOException {
517 if (cluster != null) {
518 ObjectName clusterName = cluster.getObjectName();
519 ObjectName transmitterName = null ;
520 try {
521 MBeanServer mserver = cluster.getMBeanServer();
522 Container container = cluster.getContainer();
523 String name = clusterName.getDomain() + ":type=ClusterSender";
524 if (container instanceof StandardHost) {
525 name += ",host=" + clusterName.getKeyProperty("host");
526 }
527 transmitterName = new ObjectName(name);
528 if (mserver.isRegistered(transmitterName)) {
529 if (log.isWarnEnabled())
530 log.warn(sm.getString(
531 "cluster.mbean.register.allready",
532 transmitterName));
533 return;
534 }
535 setObjectName(transmitterName);
536 mserver.registerMBean(cluster.getManagedBean(this),
537 getObjectName());
538 if(log.isInfoEnabled())
539 log.info(sm.getString("ReplicationTransmitter.started",
540 clusterName, transmitterName));
541
542 } catch (Exception e) {
543 log.warn(e);
544 }
545 }
546 }
547
548 /*
549 * stop the sender and deregister mbeans (transmitter, senders)
550 *
551 * @see org.apache.catalina.cluster.ClusterSender#stop()
552 */
553 public synchronized void stop() {
554 Iterator i = map.entrySet().iterator();
555 while (i.hasNext()) {
556 IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
557 .getValue();
558 try {
559 unregisterSenderMBean(sender);
560 sender.disconnect();
561 } catch (Exception x) {
562 }
563 i.remove();
564 }
565 if (cluster != null && getObjectName() != null) {
566 try {
567 MBeanServer mserver = cluster.getMBeanServer();
568 mserver.unregisterMBean(getObjectName());
569 } catch (Exception e) {
570 log.error(e);
571 }
572 if(log.isInfoEnabled())
573 log.info(sm.getString("ReplicationTransmitter.stopped",
574 cluster.getObjectName(), getObjectName()));
575 }
576
577 }
578
579 /**
580 * Call transmitter to check for sender socket status
581 *
582 * @see SimpleTcpCluster#backgroundProcess()
583 */
584 public void backgroundProcess() {
585 count = (count + 1) % processSenderFrequency;
586 if (count == 0) {
587 checkKeepAlive();
588 }
589 }
590
591 /**
592 * Check all DataSender Socket to close socket at keepAlive mode
593 * @see DataSender#checkKeepAlive()
594 */
595 public void checkKeepAlive() {
596 if (map.size() > 0) {
597 java.util.Iterator iter = map.entrySet().iterator();
598 while (iter.hasNext()) {
599 IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
600 .next()).getValue();
601 if (sender != null)
602 sender.checkKeepAlive();
603 }
604 }
605 }
606
607 /**
608 * get all current senders
609 *
610 * @return The senders
611 */
612 public IDataSender[] getSenders() {
613 java.util.Iterator iter = map.entrySet().iterator();
614 IDataSender[] array = new IDataSender[map.size()];
615 int i = 0;
616 while (iter.hasNext()) {
617 IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
618 .next()).getValue();
619 if (sender != null)
620 array[i] = sender;
621 i++;
622 }
623 return array;
624 }
625
626 /**
627 * get all current senders
628 *
629 * @return The sender object names
630 */
631 public ObjectName[] getSenderObjectNames() {
632 java.util.Iterator iter = map.entrySet().iterator();
633 ObjectName array[] = new ObjectName[map.size()];
634 int i = 0;
635 while (iter.hasNext()) {
636 IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
637 .next()).getValue();
638 if (sender != null)
639 array[i] = getSenderObjectName(sender);
640 i++;
641 }
642 return array;
643 }
644
645 /**
646 * Reset sender statistics
647 */
648 public synchronized void resetStatistics() {
649 nrOfRequests = 0;
650 totalBytes = 0;
651 failureCounter = 0;
652 processingTime = 0;
653 minProcessingTime = Long.MAX_VALUE;
654 maxProcessingTime = 0;
655 }
656
657 /**
658 * add new cluster member and create sender ( s. replicationMode) transfer
659 * current properties to sender
660 *
661 * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
662 */
663 public synchronized void add(Member member) {
664 try {
665 String key = getKey(member);
666 if (!map.containsKey(key)) {
667 IDataSender sender = IDataSenderFactory.getIDataSender(
668 replicationMode, member);
669 transferSenderProperty(sender);
670 map.put(key, sender);
671 registerSenderMBean(member, sender);
672 }
673 } catch (java.io.IOException x) {
674 log.error("Unable to create and add a IDataSender object.", x);
675 }
676 }
677
678 /**
679 * remove sender from transmitter. ( deregister mbean and disconnect sender )
680 *
681 * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
682 */
683 public synchronized void remove(Member member) {
684 String key = getKey(member);
685 IDataSender toberemoved = (IDataSender) map.get(key);
686 if (toberemoved == null)
687 return;
688 unregisterSenderMBean(toberemoved);
689 toberemoved.disconnect();
690 map.remove(key);
691
692 }
693
694 // ------------------------------------------------------------- protected
695
696 /**
697 * calc number of requests and transfered bytes. Log stats all 100 requets
698 *
699 * @param length
700 */
701 protected synchronized void addStats(int length) {
702 nrOfRequests++;
703 totalBytes += length;
704 if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
705 log.debug("Nr of bytes sent=" + totalBytes + " over "
706 + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
707 + " bytes/request; failures=" + failureCounter);
708 }
709
710 }
711
712 /**
713 * Transfer all properties from transmitter to concrete sender
714 *
715 * @param sender
716 */
717 protected void transferSenderProperty(IDataSender sender) {
718 for (Iterator iter = getPropertyNames(); iter.hasNext();) {
719 String pkey = (String) iter.next();
720 Object value = getProperty(pkey);
721 IntrospectionUtils.setProperty(sender, pkey, value.toString());
722 }
723 }
724
725 /**
726 * set unique key to find sender
727 *
728 * @param member
729 * @return concat member.host:member.port
730 */
731 protected String getKey(Member member) {
732 return member.getHost() + ":" + member.getPort();
733 }
734
735 /**
736 * unregsister sendern Mbean
737 *
738 * @see #getSenderObjectName(IDataSender)
739 * @param sender
740 */
741 protected void unregisterSenderMBean(IDataSender sender) {
742 try {
743 MBeanServer mserver = cluster.getMBeanServer();
744 if (mserver != null) {
745 mserver.unregisterMBean(getSenderObjectName(sender));
746 }
747 } catch (Exception e) {
748 log.warn(e);
749 }
750 }
751
752 /**
753 * register MBean and check it exist (big problem!)
754 *
755 * @param member
756 * @param sender
757 */
758 protected void registerSenderMBean(Member member, IDataSender sender) {
759 if (member != null && cluster != null) {
760 try {
761 MBeanServer mserver = cluster.getMBeanServer();
762 ObjectName senderName = getSenderObjectName(sender);
763 if (mserver.isRegistered(senderName)) {
764 if (log.isWarnEnabled())
765 log.warn(sm.getString(
766 "cluster.mbean.register.allready", senderName));
767 return;
768 }
769 mserver.registerMBean(cluster.getManagedBean(sender),
770 senderName);
771 } catch (Exception e) {
772 log.warn(e);
773 }
774 }
775 }
776
777
778 /**
779 * build sender ObjectName (
780 * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
781 *
782 * @param sender
783 * @return The sender object name
784 */
785 protected ObjectName getSenderObjectName(IDataSender sender) {
786 ObjectName senderName = null;
787 try {
788 ObjectName clusterName = cluster.getObjectName();
789 Container container = cluster.getContainer();
790 String name = clusterName.getDomain() + ":type=IDataSender";
791 if (container instanceof StandardHost) {
792 name += ",host=" + clusterName.getKeyProperty("host");
793 }
794 senderName = new ObjectName(name + ",senderAddress="
795 + sender.getAddress().getHostAddress() + ",senderPort="
796 + sender.getPort());
797 } catch (Exception e) {
798 log.warn(e);
799 }
800 return senderName;
801 }
802
803 /**
804 * serialize message and add timestamp
805 * @see GZIPOutputStream
806 * @param msg cluster message
807 * @return cluster message as byte array
808 * @throws IOException
809 * @since 5.5.10
810 */
811 protected ClusterData serialize(ClusterMessage msg) throws IOException {
812 msg.setTimestamp(System.currentTimeMillis());
813 ByteArrayOutputStream outs = new ByteArrayOutputStream();
814 ObjectOutputStream out;
815 GZIPOutputStream gout = null;
816 ClusterData data = new ClusterData();
817 data.setType(msg.getClass().getName());
818 data.setUniqueId(msg.getUniqueId());
819 data.setTimestamp(msg.getTimestamp());
820 data.setCompress(msg.getCompress());
821 data.setResend(msg.getResend());
822 // FIXME add Stats how much comress and uncompress messages and bytes are transfered
823 if ((isCompress() && msg.getCompress() != ClusterMessage.FLAG_FORBIDDEN)
824 || msg.getCompress() == ClusterMessage.FLAG_ALLOWED) {
825 gout = new GZIPOutputStream(outs);
826 out = new ObjectOutputStream(gout);
827 } else {
828 out = new ObjectOutputStream(outs);
829 }
830 out.writeObject(msg);
831 // flush out the gzip stream to byte buffer
832 if(gout != null) {
833 gout.flush();
834 gout.close();
835 }
836 data.setMessage(outs.toByteArray());
837 return data;
838 }
839
840
841 /**
842 * Send message to concrete sender. If autoConnect is true, check is
843 * connection broken and the reconnect the complete sender.
844 * <ul>
845 * <li>failure the suspect flag is set true. After successfully sending the
846 * suspect flag is set to false.</li>
847 * <li>Stats is only update after sussesfull sending</li>
848 * </ul>
849 *
850 * @param data message Data
851 * @param sender concrete message sender
852 * @throws java.io.IOException If an error occurs
853 */
854 protected void sendMessageData(ClusterData data,
855 IDataSender sender) throws java.io.IOException {
856 if (sender == null)
857 throw new java.io.IOException(
858 "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
859 try {
860 // deprecated not needed DataSender#pushMessage can handle connection
861 if (autoConnect) {
862 synchronized(sender) {
863 if(!sender.isConnected())
864 sender.connect();
865 }
866 }
867 sender.sendMessage(data);
868 sender.setSuspect(false);
869 addStats(data.getMessage().length);
870 } catch (Exception x) {
871 if (log.isWarnEnabled()) {
872 if (!sender.getSuspect()) {
873 log.warn("Unable to send replicated message, is server down?",x);
874 }
875 }
876 sender.setSuspect(true);
877 failureCounter++;
878 }
879
880 }
881 /**
882 * Add processing stats times
883 * @param startTime
884 */
885 protected void addProcessingStats(long startTime) {
886 long time = System.currentTimeMillis() - startTime ;
887 if(time < minProcessingTime)
888 minProcessingTime = time ;
889 if( time > maxProcessingTime)
890 maxProcessingTime = time ;
891 processingTime += time ;
892 }
893
894
895 }