1 /*
2 * Copyright 1999,2004 The Apache Software Foundation.
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17 package org.apache.catalina.cluster.tcp;
18
19 import java.io.IOException;
20 import java.net.InetAddress;
21 import java.util.LinkedList;
22
23 /**
24 * Send cluster messages with a pool of sockets (25).
25 *
26 * FIXME support processing stats
27 *
28 * @author Filip Hanik
29 * @author Peter Rossbach
30 * @version 1.2
31 */
32
33 public class PooledSocketSender extends DataSender {
34
35 private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
36 .getLog(org.apache.catalina.cluster.tcp.PooledSocketSender.class);
37
38 /**
39 * The descriptive information about this implementation.
40 */
41 private static final String info = "PooledSocketSender/2.0";
42
43 // ----------------------------------------------------- Instance Variables
44
45 private int maxPoolSocketLimit = 25;
46
47 private SenderQueue senderQueue = null;
48
49 // ----------------------------------------------------- Constructor
50
51 /**
52 * @param domain replication cluster domain (session domain)
53 * @param host replication node tcp address
54 * @param port replication node tcp port
55 */
56 public PooledSocketSender(String domain,InetAddress host, int port) {
57 super(domain,host, port);
58 senderQueue = new SenderQueue(this, maxPoolSocketLimit);
59 }
60
61 // ----------------------------------------------------- Public Properties
62
63 /**
64 * Return descriptive information about this implementation and the
65 * corresponding version number, in the format
66 * <code><description>/<version></code>.
67 */
68 public String getInfo() {
69
70 return (info);
71
72 }
73
74 public void setMaxPoolSocketLimit(int limit) {
75 maxPoolSocketLimit = limit;
76 senderQueue.setLimit(limit);
77 }
78
79 public int getMaxPoolSocketLimit() {
80 return maxPoolSocketLimit;
81 }
82
83 public int getInPoolSize() {
84 return senderQueue.getInPoolSize();
85 }
86
87 public int getInUsePoolSize() {
88 return senderQueue.getInUsePoolSize();
89 }
90
91 // ----------------------------------------------------- Public Methode
92
93 public synchronized void connect() throws java.io.IOException {
94 //do nothing, happens in the socket sender itself
95 senderQueue.open();
96 setSocketConnected(true);
97 connectCounter++;
98 }
99
100 public synchronized void disconnect() {
101 senderQueue.close();
102 setSocketConnected(false);
103 disconnectCounter++;
104 }
105
106 /**
107 * send message and use a pool of SocketSenders
108 *
109 * @param messageId Message unique identifier
110 * @param data Message data
111 * @throws java.io.IOException
112 */
113 public void sendMessage(ClusterData data) throws IOException {
114 //get a socket sender from the pool
115 if(!isConnected()) {
116 synchronized(this) {
117 if(!isConnected())
118 connect();
119 }
120 }
121 SocketSender sender = senderQueue.getSender(0);
122 if (sender == null) {
123 log.warn(sm.getString("PoolSocketSender.noMoreSender", this.getAddress(), new Integer(this.getPort())));
124 return;
125 }
126 //send the message
127 try {
128 sender.sendMessage(data);
129 } finally {
130 //return the connection to the pool
131 senderQueue.returnSender(sender);
132 }
133 addStats(data.getMessage().length);
134 }
135
136 public String toString() {
137 StringBuffer buf = new StringBuffer("PooledSocketSender[");
138 buf.append(getAddress()).append(":").append(getPort()).append("]");
139 return buf.toString();
140 }
141
142 // ----------------------------------------------------- Inner Class
143
144 private class SenderQueue {
145 private int limit = 25;
146
147 PooledSocketSender parent = null;
148
149 private LinkedList queue = new LinkedList();
150
151 private LinkedList inuse = new LinkedList();
152
153 private Object mutex = new Object();
154
155 private boolean isOpen = true;
156
157 public SenderQueue(PooledSocketSender parent, int limit) {
158 this.limit = limit;
159 this.parent = parent;
160 }
161
162 /**
163 * @return Returns the limit.
164 */
165 public int getLimit() {
166 return limit;
167 }
168 /**
169 * @param limit The limit to set.
170 */
171 public void setLimit(int limit) {
172 this.limit = limit;
173 }
174 /**
175 * @return
176 */
177 public int getInUsePoolSize() {
178 return inuse.size();
179 }
180
181 /**
182 * @return
183 */
184 public int getInPoolSize() {
185 return queue.size();
186 }
187
188 public SocketSender getSender(long timeout) {
189 SocketSender sender = null;
190 long start = System.currentTimeMillis();
191 long delta = 0;
192 do {
193 synchronized (mutex) {
194 if (!isOpen)
195 throw new IllegalStateException(
196 "Socket pool is closed.");
197 if (queue.size() > 0) {
198 sender = (SocketSender) queue.removeFirst();
199 } else if (inuse.size() < limit) {
200 sender = getNewSocketSender();
201 } else {
202 try {
203 mutex.wait(timeout);
204 } catch (Exception x) {
205 PooledSocketSender.log.warn(sm.getString("PoolSocketSender.senderQueue.sender.failed",parent.getAddress(),new Integer(parent.getPort())),x);
206 }//catch
207 }//end if
208 if (sender != null) {
209 inuse.add(sender);
210 }
211 }//synchronized
212 delta = System.currentTimeMillis() - start;
213 } while ((isOpen) && (sender == null)
214 && (timeout == 0 ? true : (delta < timeout)));
215 //to do
216 return sender;
217 }
218
219 public void returnSender(SocketSender sender) {
220 //to do
221 synchronized (mutex) {
222 queue.add(sender);
223 inuse.remove(sender);
224 mutex.notify();
225 }
226 }
227
228 private SocketSender getNewSocketSender() {
229 //new SocketSender(
230 SocketSender sender = new SocketSender(getDomain(),parent.getAddress(), parent
231 .getPort());
232 sender.setKeepAliveMaxRequestCount(parent
233 .getKeepAliveMaxRequestCount());
234 sender.setKeepAliveTimeout(parent.getKeepAliveTimeout());
235 sender.setAckTimeout(parent.getAckTimeout());
236 sender.setWaitForAck(parent.isWaitForAck());
237 sender.setResend(parent.isResend());
238 return sender;
239
240 }
241
242 public void close() {
243 synchronized (mutex) {
244 for (int i = 0; i < queue.size(); i++) {
245 SocketSender sender = (SocketSender) queue.get(i);
246 sender.disconnect();
247 }//for
248 for (int i = 0; i < inuse.size(); i++) {
249 SocketSender sender = (SocketSender) inuse.get(i);
250 sender.disconnect();
251 }//for
252 queue.clear();
253 inuse.clear();
254 isOpen = false;
255 mutex.notifyAll();
256 }
257 }
258
259 public void open() {
260 synchronized (mutex) {
261 isOpen = true;
262 mutex.notifyAll();
263 }
264 }
265 }
266 }