Source code: com/iborg/net/CommunicationManager.java
1 /*
2 * CommunicationManager.java
3 *
4 * Created on March 18, 2002, 5:31 PM
5 */
6
7 package com.iborg.net;
8 import java.util.*;
9 import java.io.*;
10
11 /**
12 *
13 * @author Umax Customer
14 * @version
15 */
16 public class CommunicationManager {
17
18 public static long OPEN_WAIT=60000L;
19
20 //static HashMap accepted = new HashMap();
21 private static Map users = new HashMap();
22 private static Map currentSockets = new HashMap();
23 private static Map comBuffer = new HashMap();
24
25 private static Map socketInfo = new HashMap();
26
27 private static Thread cleanupThread;
28 static class Cleanup implements Runnable {
29 public void run() {
30 Set toClose = new TreeSet();
31 while(true) {
32 toClose.clear();
33 Map map = CommunicationManager.getCurrentSockets();
34 long timeStamp = (new Date()).getTime();
35 Set keys = map.keySet();
36 Iterator iterator = keys.iterator();
37 while( iterator.hasNext()) {
38 String socketName = (String)iterator.next();
39 SocketInfo socketInfo = (SocketInfo) CommunicationManager.getSocketInfo().get(socketName);
40 if(socketInfo.lastRead == 0)
41 socketInfo.lastRead = timeStamp;
42 if(socketInfo.lastWrite == 0)
43 socketInfo.lastWrite = timeStamp;
44 if( timeStamp - socketInfo.lastRead > 180000L &&
45 timeStamp - socketInfo.lastWrite > 180000L) {
46 toClose.add(socketName);
47 }
48 }
49 iterator = toClose.iterator();
50 while( iterator.hasNext()) {
51 String socketName = (String)iterator.next();
52 close(socketName);
53 }
54
55 try {
56 Thread.currentThread().sleep(60000L);
57 } catch (Exception e) {
58 }
59 }
60 }
61 }
62
63 static {
64 Cleanup cleanup = new Cleanup();
65 cleanupThread = new Thread(cleanup);
66 cleanupThread.start();
67 }
68
69 public static Map getUsers() {
70 return users;
71 }
72
73 public static Map getCurrentSockets() {
74 return currentSockets;
75 }
76
77 public static Map getSocketInfo() {
78 return socketInfo;
79 }
80
81 public static Set getUserShares(String userId) {
82 Set shares = null;
83 UserRecord userRecord = (UserRecord) users.get(userId);
84
85 if(userRecord != null) {
86 shares = userRecord.advertised;
87 }
88
89 return shares;
90 }
91
92 public static String accept(String userId, String key) {
93 String socket = null;
94 UserRecord userRecord = (UserRecord) users.get(userId);
95
96 if(userRecord == null) {
97 userRecord = new UserRecord();
98 users.put(userId, userRecord);
99 }
100
101 userRecord.advertised.add(key);
102
103 HashMap accepted = userRecord.accepted;
104
105 Object o = accepted.get(key);
106 if(o != null) {
107 List sockets = (List) o;
108 if(sockets.size() > 0) {
109 socket = (String)sockets.get(0);
110 synchronized(socket) {
111 sockets.remove(0);
112 socket.notify();
113 }
114 }
115 synchronized(accepted) {
116 if(sockets.size() > 0) {
117 accepted.remove(key);
118 }
119 }
120 }
121 return socket;
122 }
123
124 public static String open(String userId, String key) {
125 String clientSocket = null;
126 UserRecord userRecord = (UserRecord) users.get(userId);
127
128 if(userRecord != null) {
129 HashMap accepted = userRecord.accepted;
130
131 synchronized(accepted) {
132 List sockets = (List) accepted.get(key);
133 if(sockets == null) {
134 sockets = new ArrayList();
135 }
136 String socket = com.iborg.util.UniqueObject.createUniqueString();
137 String serverSocket = socket + "S";
138 clientSocket = socket + "C";
139
140 sockets.add(serverSocket);
141
142 accepted.put(key, sockets);
143
144 currentSockets.put(serverSocket, clientSocket);
145 currentSockets.put(clientSocket, serverSocket);
146
147 socketInfo.put(serverSocket, new SocketInfo());
148 socketInfo.put(clientSocket, new SocketInfo());
149
150 synchronized(serverSocket) {
151 try {
152 serverSocket.wait(OPEN_WAIT);
153 } catch (InterruptedException ie) {
154 }
155 if(sockets.contains(serverSocket)) {
156 sockets.remove(serverSocket);
157 currentSockets.remove(serverSocket);
158 currentSockets.remove(clientSocket);
159 return null;
160 }
161 }
162 comBuffer.put(serverSocket, new ArrayList());
163 comBuffer.put(clientSocket, new ArrayList());
164 }
165 }
166 return clientSocket;
167 }
168
169 public static void close(String key) {
170 comBuffer.remove(key);
171 currentSockets.remove(key);
172 socketInfo.remove(key);
173 }
174
175 public static int write(String socket, byte [] buffer, int length) throws IOException {
176 int len = 0;
177
178 Object o = currentSockets.get(socket);
179 if(o != null) {
180 List messages = (List)comBuffer.get(o);
181 if(messages == null) {
182 throw new IOException("Socket is being closed");
183 }
184 byte [] data = new byte[length];
185 for(int i = 0; i < length; i++) {
186 data[i] = buffer[i];
187 }
188 messages.add(data);
189 len = length;
190 } else {
191 throw new IOException("Socket is closed");
192 }
193
194 o = socketInfo.get(socket);
195 if(o != null) {
196 SocketInfo socketInfo = (SocketInfo) o;
197 socketInfo.updateWriteStats(len);
198 }
199
200 return len;
201 }
202
203 public static byte [] read(String socket) throws IOException {
204 byte [] data = null;
205 Object o = currentSockets.get(socket);
206 if(o != null) {
207 List messages = (List)comBuffer.get(socket);
208 if(messages.size() > 0) {
209 data = (byte [])messages.get(0);
210 messages.remove(0);
211 } else {
212 if(currentSockets.get(o) == null) {
213 throw new IOException("Socket is being closed");
214 }
215 }
216 } else {
217 throw new IOException("Socket is closed");
218 }
219 o = socketInfo.get(socket);
220 if(o != null) {
221 SocketInfo socketInfo = (SocketInfo) o;
222 int len = 0;
223 if(data != null)
224 len = data.length;
225 socketInfo.updateReadStats(len);
226 }
227 return data;
228 }
229
230 }
231