Home » jackrabbit-2.1.0-src » org.apache.jackrabbit.core.observation » [javadoc | source]

    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one or more
    3    * contributor license agreements.  See the NOTICE file distributed with
    4    * this work for additional information regarding copyright ownership.
    5    * The ASF licenses this file to You under the Apache License, Version 2.0
    6    * (the "License"); you may not use this file except in compliance with
    7    * the License.  You may obtain a copy of the License at
    8    *
    9    *      http://www.apache.org/licenses/LICENSE-2.0
   10    *
   11    * Unless required by applicable law or agreed to in writing, software
   12    * distributed under the License is distributed on an "AS IS" BASIS,
   13    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
   14    * See the License for the specific language governing permissions and
   15    * limitations under the License.
   16    */
   17   package org.apache.jackrabbit.core.observation;
   18   
   19   import org.apache.commons.collections.Buffer;
   20   import org.apache.commons.collections.BufferUtils;
   21   import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
   22   import org.apache.jackrabbit.core.state.ChangeLog;
   23   import org.slf4j.Logger;
   24   import org.slf4j.LoggerFactory;
   25   
   26   import java.util.Collections;
   27   import java.util.HashSet;
   28   import java.util.Iterator;
   29   import java.util.Set;
   30   import java.util.concurrent.atomic.AtomicInteger;
   31   
   32   /**
   33    * Dispatcher for dispatching events to listeners within a single workspace.
   34    */
   35   public final class ObservationDispatcher extends EventDispatcher
   36           implements Runnable {
   37   
   38       /**
   39        * Logger instance for this class
   40        */
   41       private static final Logger log
   42               = LoggerFactory.getLogger(ObservationDispatcher.class);
   43   
   44       /**
   45        * Dummy DispatchAction indicating the notification thread to end
   46        */
   47       private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
   48   
   49       /**
   50        * The maximum number of queued asynchronous events. To avoid of of memory
   51        * problems, the default value is 200'000. To change the default, set the
   52        * system property jackrabbit.maxQueuedEvents to the required value. If more
   53        * events are in the queue, the current thread waits, unless the current thread is
   54        * the observation dispatcher itself (in which case only a warning is logged
   55        * - usually observation listeners shouldn't cause new events).
   56        */
   57       private static final int MAX_QUEUED_EVENTS = Integer.parseInt(System.getProperty("jackrabbit.maxQueuedEvents", "200000"));
   58   
   59       /**
   60        * Currently active <code>EventConsumer</code>s for notification.
   61        */
   62       private Set<EventConsumer> activeConsumers = new HashSet<EventConsumer>();
   63   
   64       /**
   65        * Currently active synchronous <code>EventConsumer</code>s for notification.
   66        */
   67       private Set<EventConsumer> synchronousConsumers = new HashSet<EventConsumer>();
   68   
   69       /**
   70        * Set of <code>EventConsumer</code>s for read only Set access
   71        */
   72       private Set<EventConsumer> readOnlyConsumers;
   73   
   74       /**
   75        * Set of synchronous <code>EventConsumer</code>s for read only Set access.
   76        */
   77       private Set<EventConsumer> synchronousReadOnlyConsumers;
   78   
   79       /**
   80        * synchronization monitor for listener changes
   81        */
   82       private Object consumerChange = new Object();
   83   
   84       /**
   85        * Contains the pending events that will be delivered to event listeners
   86        */
   87       private Buffer eventQueue
   88               = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
   89   
   90       private AtomicInteger eventQueueSize = new AtomicInteger();
   91   
   92       /**
   93        * The background notification thread
   94        */
   95       private Thread notificationThread;
   96   
   97       private long lastError;
   98   
   99       /**
  100        * Creates a new <code>ObservationDispatcher</code> instance
  101        * and starts the notification thread daemon.
  102        */
  103       public ObservationDispatcher() {
  104           notificationThread = new Thread(this, "ObservationManager");
  105           notificationThread.setDaemon(true);
  106           notificationThread.start();
  107       }
  108   
  109       /**
  110        * Disposes this <code>ObservationManager</code>. This will
  111        * effectively stop the background notification thread.
  112        */
  113       public void dispose() {
  114           // dispatch dummy event to mark end of notification
  115           eventQueue.add(DISPOSE_MARKER);
  116           try {
  117               notificationThread.join();
  118           } catch (InterruptedException e) {
  119               // FIXME log exception ?
  120           }
  121           log.info("Notification of EventListeners stopped.");
  122       }
  123   
  124       /**
  125        * Returns an unmodifieable <code>Set</code> of <code>EventConsumer</code>s.
  126        *
  127        * @return <code>Set</code> of <code>EventConsumer</code>s.
  128        */
  129       Set<EventConsumer> getAsynchronousConsumers() {
  130           synchronized (consumerChange) {
  131               if (readOnlyConsumers == null) {
  132                   readOnlyConsumers = Collections.unmodifiableSet(new HashSet<EventConsumer>(activeConsumers));
  133               }
  134               return readOnlyConsumers;
  135           }
  136       }
  137   
  138       Set<EventConsumer> getSynchronousConsumers() {
  139           synchronized (consumerChange) {
  140               if (synchronousReadOnlyConsumers == null) {
  141                   synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet<EventConsumer>(synchronousConsumers));
  142               }
  143               return synchronousReadOnlyConsumers;
  144           }
  145       }
  146   
  147       /**
  148        * Implements the run method of the background notification
  149        * thread.
  150        */
  151       public void run() {
  152           DispatchAction action;
  153           while ((action = (DispatchAction) eventQueue.remove()) != DISPOSE_MARKER) {
  154   
  155               eventQueueSize.getAndAdd(-action.getEventStates().size());
  156               log.debug("got EventStateCollection");
  157               log.debug("event delivery to " + action.getEventConsumers().size() + " consumers started...");
  158               for (Iterator<EventConsumer> it = action.getEventConsumers().iterator(); it.hasNext();) {
  159                   EventConsumer c = it.next();
  160                   try {
  161                       c.consumeEvents(action.getEventStates());
  162                   } catch (Throwable t) {
  163                       log.warn("EventConsumer threw exception: " + t.toString());
  164                       log.debug("Stacktrace: ", t);
  165                       // move on to the next consumer
  166                   }
  167               }
  168               log.debug("event delivery finished.");
  169   
  170           }
  171       }
  172   
  173       /**
  174        * {@inheritDoc}
  175        * <p/>
  176        * Gives this observation manager the oportunity to
  177        * prepare the events for dispatching.
  178        */
  179       void prepareEvents(EventStateCollection events) {
  180           Set<EventConsumer> consumers = new HashSet<EventConsumer>();
  181           consumers.addAll(getSynchronousConsumers());
  182           consumers.addAll(getAsynchronousConsumers());
  183           for (EventConsumer c : consumers) {
  184               c.prepareEvents(events);
  185           }
  186       }
  187   
  188       /**
  189        * {@inheritDoc}
  190        */
  191       void prepareDeleted(EventStateCollection events, ChangeLog changes) {
  192           Set<EventConsumer> consumers = new HashSet<EventConsumer>();
  193           consumers.addAll(getSynchronousConsumers());
  194           consumers.addAll(getAsynchronousConsumers());
  195           for (EventConsumer c : consumers) {
  196               c.prepareDeleted(events, changes.deletedStates());
  197           }
  198       }
  199   
  200       /**
  201        * {@inheritDoc}
  202        * <p/>
  203        * Dispatches the {@link EventStateCollection events} to all
  204        * registered {@link javax.jcr.observation.EventListener}s.
  205        */
  206       void dispatchEvents(EventStateCollection events) {
  207           // notify synchronous listeners
  208           Set<EventConsumer> synchronous = getSynchronousConsumers();
  209           if (log.isDebugEnabled()) {
  210               log.debug("notifying " + synchronous.size() + " synchronous listeners.");
  211           }
  212           for (EventConsumer c : synchronous) {
  213               try {
  214                   c.consumeEvents(events);
  215               } catch (Throwable t) {
  216                   log.error("Synchronous EventConsumer threw exception.", t);
  217                   // move on to next consumer
  218               }
  219           }
  220           eventQueue.add(new DispatchAction(events, getAsynchronousConsumers()));
  221           int size = eventQueueSize.addAndGet(events.size());
  222           if (size > MAX_QUEUED_EVENTS) {
  223               boolean logWarning = false;
  224               long now = System.currentTimeMillis();
  225               // log a warning at most every 5 seconds (to avoid filling the log file)
  226               if (lastError == 0 || now > lastError + 5000) {
  227                   logWarning = true;
  228                   log.warn("More than " + MAX_QUEUED_EVENTS + " events in the queue", new Exception("Stack Trace"));
  229                   lastError = now;
  230               }
  231               if (Thread.currentThread() == notificationThread) {
  232                   if (logWarning) {
  233                       log.warn("Recursive notification?");
  234                   }
  235               } else {
  236                   if (logWarning) {
  237                       log.warn("Waiting");
  238                   }
  239                   while (eventQueueSize.get() > MAX_QUEUED_EVENTS) {
  240                       try {
  241                           Thread.sleep(100);
  242                       } catch (InterruptedException e) {
  243                           // ignore
  244                       }
  245                   }
  246               }
  247           }
  248       }
  249   
  250       /**
  251        * Adds or replaces an event consumer.
  252        * @param consumer the <code>EventConsumer</code> to add or replace.
  253        */
  254       void addConsumer(EventConsumer consumer) {
  255           synchronized (consumerChange) {
  256               if (consumer.getEventListener() instanceof SynchronousEventListener) {
  257                   // remove existing if any
  258                   synchronousConsumers.remove(consumer);
  259                   // re-add it
  260                   synchronousConsumers.add(consumer);
  261                   // reset read only consumer set
  262                   synchronousReadOnlyConsumers = null;
  263               } else {
  264                   // remove existing if any
  265                   activeConsumers.remove(consumer);
  266                   // re-add it
  267                   activeConsumers.add(consumer);
  268                   // reset read only consumer set
  269                   readOnlyConsumers = null;
  270               }
  271           }
  272       }
  273   
  274       /**
  275        * Unregisters an event consumer from event notification.
  276        * @param consumer the consumer to deregister.
  277        */
  278       void removeConsumer(EventConsumer consumer) {
  279           synchronized (consumerChange) {
  280               if (consumer.getEventListener() instanceof SynchronousEventListener) {
  281                   synchronousConsumers.remove(consumer);
  282                   // reset read only listener set
  283                   synchronousReadOnlyConsumers = null;
  284               } else {
  285                   activeConsumers.remove(consumer);
  286                   // reset read only listener set
  287                   readOnlyConsumers = null;
  288               }
  289           }
  290       }
  291   
  292   }

Home » jackrabbit-2.1.0-src » org.apache.jackrabbit.core.observation » [javadoc | source]