Docjar: A Java Source and Docuemnt Enginecom.*    java.*    javax.*    org.*    all    new    plug-in

Quick Search    Search Deep

Source code: com/go/trove/net/DistributedSocketFactory.java


1   /* ====================================================================
2    * Trove - Copyright (c) 1997-2000 Walt Disney Internet Group
3    * ====================================================================
4    * The Tea Software License, Version 1.1
5    *
6    * Copyright (c) 2000 Walt Disney Internet Group. All rights reserved.
7    *
8    * Redistribution and use in source and binary forms, with or without
9    * modification, are permitted provided that the following conditions
10   * are met:
11   *
12   * 1. Redistributions of source code must retain the above copyright
13   *    notice, this list of conditions and the following disclaimer.
14   *
15   * 2. Redistributions in binary form must reproduce the above copyright
16   *    notice, this list of conditions and the following disclaimer in
17   *    the documentation and/or other materials provided with the
18   *    distribution.
19   *
20   * 3. The end-user documentation included with the redistribution,
21   *    if any, must include the following acknowledgment:
22   *       "This product includes software developed by the
23   *        Walt Disney Internet Group (http://opensource.go.com/)."
24   *    Alternately, this acknowledgment may appear in the software itself,
25   *    if and wherever such third-party acknowledgments normally appear.
26   *
27   * 4. The names "Tea", "TeaServlet", "Kettle", "Trove" and "BeanDoc" must
28   *    not be used to endorse or promote products derived from this
29   *    software without prior written permission. For written
30   *    permission, please contact opensource@dig.com.
31   *
32   * 5. Products derived from this software may not be called "Tea",
33   *    "TeaServlet", "Kettle" or "Trove", nor may "Tea", "TeaServlet",
34   *    "Kettle", "Trove" or "BeanDoc" appear in their name, without prior
35   *    written permission of the Walt Disney Internet Group.
36   *
37   * THIS SOFTWARE IS PROVIDED ``AS IS'' AND ANY EXPRESSED OR IMPLIED
38   * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
39   * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
40   * DISCLAIMED.  IN NO EVENT SHALL THE WALT DISNEY INTERNET GROUP OR ITS
41   * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
42   * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
43   * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR 
44   * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
45   * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 
46   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
47   * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
48   * ====================================================================
49   *
50   * For more information about Tea, please see http://opensource.go.com/.
51   */
52  
53  package com.go.trove.net;
54  
55  import java.io.*;
56  import java.net.*;
57  import java.util.*;
58  import java.lang.ref.*;
59  import com.go.trove.util.IdentityMap;
60  
61  /******************************************************************************
62   * A SocketFactory implementation for distributing load among several
63   * SocketFactories. If an exception occurs on a socket, its pool is put into
64   * the "dead" list. A special thread will run in the background, trying to
65   * resurrect the dead SocketSocket. As soon as its able to create sockets
66   * again, its added back into the "live" list.
67   * <p>
68   * Consider wrapping with a {@link LazySocketFactory} for automatic checking
69   * against socket factories that may be dead.
70   *
71   * @author Brian S O'Neill
72   * @version
73   * <!--$$Revision:--> 7 <!-- $-->, <!--$$JustDate:--> 01/01/22 <!-- $-->
74   */
75  public class DistributedSocketFactory implements SocketFactory {
76      private final long mTimeout;
77  
78      private int mFactoryIndex;
79  
80      // Contains only the live SocketFactories.
81      private List mFactories;
82  
83      // Maps SocketPools to resurrector threads.
84      private Map mResurrectors;
85  
86      // Maps CheckedSockets to the SocketPools that they came from.
87      private Map mSocketSources;
88  
89      private CheckedSocket.ExceptionListener mListener;
90  
91      /**
92       * @param timeout Maximum time to wait (in milliseconds) for new
93       * connections to be established before throwing an exception
94       */
95      public DistributedSocketFactory(long timeout) {
96          mTimeout = timeout;
97          mFactories = Collections.synchronizedList(new ArrayList());
98          mResurrectors = Collections.synchronizedMap(new HashMap());
99          mSocketSources = Collections.synchronizedMap(new IdentityMap());
100 
101         mListener = new CheckedSocket.ExceptionListener() {
102             public void exceptionOccurred(CheckedSocket s, Exception e, int count) {
103                 if (count == 1) {
104                     deadFactory((SocketFactory)mSocketSources.get(s));
105                 }
106             }
107         };
108     }
109 
110     public void addSocketFactory(SocketFactory factory) {
111         mFactories.add(factory);
112     }
113 
114     public void removeSocketFactory(SocketFactory factory) {
115         mFactories.remove(factory);
116         Thread t = (Thread)mResurrectors.remove(factory);
117         if (t != null) {
118             t.interrupt();
119         }
120     }
121 
122     public InetAddressAndPort getInetAddressAndPort() {
123         try {
124             return getFactory(selectFactory(null)).getInetAddressAndPort();
125         }
126         catch (ConnectException e) {
127             return InetAddressAndPort.UNKNOWN;
128         }
129     }
130 
131     public InetAddressAndPort getInetAddressAndPort(Object session) {
132         try {
133             return getFactory(selectFactory(session))
134                 .getInetAddressAndPort(session);
135         }
136         catch (ConnectException e) {
137             return InetAddressAndPort.UNKNOWN;
138         }
139     }
140 
141     public long getDefaultTimeout() {
142         return mTimeout;
143     }
144 
145     public CheckedSocket createSocket()
146         throws ConnectException, SocketException
147     {
148         return createSocket(null, mTimeout);
149     }
150 
151     public CheckedSocket createSocket(Object session)
152         throws ConnectException, SocketException
153     {
154         return createSocket(session, mTimeout);
155     }
156 
157     public CheckedSocket createSocket(long timeout)
158         throws ConnectException, SocketException
159     {
160         return createSocket(null, timeout);
161     }
162 
163     public CheckedSocket createSocket(Object session, long timeout)
164         throws ConnectException, SocketException
165     {
166         long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
167         int index = selectFactory(session);
168         int count = mFactories.size();
169 
170         for (int i=0; i<count; i++) {
171             SocketFactory factory = null;
172             try {
173                 factory = getFactory(index++);
174                 CheckedSocket socket = factory.createSocket(session, timeout);
175                 socket.addExceptionListener(mListener);
176                 mSocketSources.put(socket, factory);
177                 return socket;
178             }
179             catch (SocketException e) {
180                 deadFactory(factory);
181                 
182                 if (timeout == 0) {
183                     throw e;
184                 }
185                 
186                 if (timeout > 0) {
187                     timeout -= (System.currentTimeMillis() - startTime);
188                     if (timeout < 0) {
189                         throw e;
190                     }
191                 }
192             }
193         }
194 
195         throw new ConnectException("Unable to create socket");
196     }
197 
198     public CheckedSocket getSocket() throws ConnectException, SocketException {
199         return getSocket(null, mTimeout);
200     }
201 
202     public CheckedSocket getSocket(Object session)
203         throws ConnectException, SocketException
204     {
205         return getSocket(session, mTimeout);
206     }
207 
208     public CheckedSocket getSocket(long timeout)
209         throws ConnectException, SocketException
210     {
211         return getSocket(null, timeout);
212     }
213 
214     public CheckedSocket getSocket(Object session, long timeout)
215         throws ConnectException, SocketException
216     {
217         long startTime = timeout > 0 ? System.currentTimeMillis() : 0;
218         int index = selectFactory(session);
219         int count = mFactories.size();
220 
221         for (int i=0; i<count; i++) {
222             SocketFactory factory = null;
223             try {
224                 factory = getFactory(index++);
225                 CheckedSocket socket = factory.getSocket(session, timeout);
226                 socket.addExceptionListener(mListener);
227                 mSocketSources.put(socket, factory);
228                 return socket;
229             }
230             catch (SocketException e) {
231                 deadFactory(factory);
232                 
233                 if (timeout == 0) {
234                     throw e;
235                 }
236                 
237                 if (timeout > 0) {
238                     timeout -= (System.currentTimeMillis() - startTime);
239                     if (timeout < 0) {
240                         throw e;
241                     }
242                 }
243             }
244         }
245 
246         throw new ConnectException("Unable to get socket");
247     }
248 
249     public void recycleSocket(CheckedSocket socket)
250         throws SocketException, IllegalArgumentException
251     {
252         if (socket == null) {
253             return;
254         }
255 
256         SocketFactory source = (SocketFactory)mSocketSources.remove(socket);
257 
258         if (source == null) {
259             throw new IllegalArgumentException
260                 ("Socket did not originate from this pool");
261         }
262 
263         socket.removeExceptionListener(mListener);
264         source.recycleSocket(socket);
265     }
266 
267     public void clear() {
268         synchronized (mFactories) {
269             for (int i = mFactories.size(); --i >= 0; ) {
270                 ((SocketFactory)mFactories.get(i)).clear();
271             }
272         }
273     }
274 
275     public int getAvailableCount() {
276         int count = 0;
277         synchronized (mFactories) {
278             for (int i = mFactories.size(); --i >= 0; ) {
279                 count += ((SocketFactory)mFactories.get(i))
280                     .getAvailableCount();
281             }
282         }
283         return count;
284     }
285 
286     /**
287      * The provided index must be positive, but it can be out of the factory
288      * list bounds.
289      */
290     private SocketFactory getFactory(int index) throws ConnectException {
291         synchronized (mFactories) {
292             int size = mFactories.size();
293             if (size <= 0) {
294                 throw new ConnectException("No SocketFactories available");
295             }
296             return (SocketFactory)mFactories.get(index % size);
297         }
298     }
299 
300     /**
301      * Returns an index which is positive, but may be out of the factory list
302      * bounds.
303      */
304     private int selectFactory(Object session) throws ConnectException {
305         if (session != null) {
306             return session.hashCode() & 0x7fffffff;
307         }
308         else {
309             synchronized (mFactories) {
310                 return mFactoryIndex++ & 0x7fffffff;
311             }
312         }
313     }
314 
315     private void deadFactory(SocketFactory factory) {
316         if (factory == null) {
317             return;
318         }
319 
320         synchronized (mFactories) {
321             // Only remove factory if its not the last one left.
322             if (mFactories.contains(factory) && mFactories.size() > 1) {
323                 mFactories.remove(factory);
324                 
325                 Resurrector r = new Resurrector(this, factory);
326                 Thread t = new Thread(null, r, "Resurrector " +
327                                       factory.getInetAddressAndPort());
328                 t.setDaemon(true);
329                 t.start();
330                 mResurrectors.put(factory, t);
331             }
332         }
333     }
334 
335     private static class Resurrector implements Runnable {
336         // Weakly references owner so that this thread won't prevent it from
337         // being garbage collected.
338         private final Reference mOwner;
339         private final SocketFactory mFactory;
340 
341         public Resurrector(DistributedSocketFactory owner,
342                            SocketFactory factory) {
343             mOwner = new WeakReference(owner);
344             mFactory = factory;
345         }
346 
347         public void run() {
348             DistributedSocketFactory owner = null;
349             try {
350                 while (!Thread.interrupted()) {
351                     owner = (DistributedSocketFactory)mOwner.get();
352                     if (owner == null) {
353                         break;
354                     }
355 
356                     try {
357                         mFactory.recycleSocket(mFactory.createSocket());
358                         owner.mFactories.add(mFactory);
359                         break;
360                     }
361                     catch (IOException e) {
362                     }
363                     
364                     owner = null;
365 
366                     // Wait at 5 seconds before trying again.
367                     try {
368                         Thread.sleep(5000);
369                     }
370                     catch (InterruptedException e) {
371                         break;
372                     }
373                 }
374             }
375             finally {
376                 owner = (DistributedSocketFactory)mOwner.get();
377                 if (owner != null) {
378                     owner.mResurrectors.remove(mFactory);
379                 }
380             }
381         }
382     }
383 }