1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.jk.common;
19
20 import java.net.URLEncoder;
21 import java.io.File;
22 import java.io.FileOutputStream;
23 import java.io.IOException;
24 import javax.management.ObjectName;
25
26 import org.apache.jk.core.JkHandler;
27 import org.apache.jk.core.Msg;
28 import org.apache.jk.core.MsgContext;
29 import org.apache.jk.core.JkChannel;
30 import org.apache.jk.core.WorkerEnv;
31 import org.apache.coyote.Request;
32 import org.apache.coyote.RequestGroupInfo;
33 import org.apache.coyote.RequestInfo;
34 import org.apache.tomcat.util.modeler.Registry;
35 import org.apache.tomcat.util.threads.ThreadPool;
36 import org.apache.tomcat.util.threads.ThreadPoolRunnable;
37
38
39 /** Pass messages using unix domain sockets.
40 *
41 * @author Costin Manolache
42 */
43 public class ChannelUn extends JniHandler implements JkChannel {
44 static final int CH_OPEN=4;
45 static final int CH_CLOSE=5;
46 static final int CH_READ=6;
47 static final int CH_WRITE=7;
48
49 String file;
50 ThreadPool tp = ThreadPool.createThreadPool(true);
51
52 /* ==================== Tcp socket options ==================== */
53
54 public ThreadPool getThreadPool() {
55 return tp;
56 }
57
58 public void setFile( String f ) {
59 file=f;
60 }
61
62 public String getFile() {
63 return file;
64 }
65
66 /* ==================== ==================== */
67 int socketNote=1;
68 int isNote=2;
69 int osNote=3;
70
71 int localId=0;
72
73 public void init() throws IOException {
74 if( file==null ) {
75 log.debug("No file, disabling unix channel");
76 return;
77 //throw new IOException( "No file for the unix socket channel");
78 }
79 if( wEnv!=null && wEnv.getLocalId() != 0 ) {
80 localId=wEnv.getLocalId();
81 }
82
83 if( localId != 0 ) {
84 file=file+ localId;
85 }
86 File socketFile=new File( file );
87 if( !socketFile.isAbsolute() ) {
88 String home=wEnv.getJkHome();
89 if( home==null ) {
90 log.debug("No jkhome");
91 } else {
92 File homef=new File( home );
93 socketFile=new File( homef, file );
94 log.debug( "Making the file absolute " +socketFile);
95 }
96 }
97
98 if( ! socketFile.exists() ) {
99 try {
100 FileOutputStream fos=new FileOutputStream(socketFile);
101 fos.write( 1 );
102 fos.close();
103 } catch( Throwable t ) {
104 log.error("Attempting to create the file failed, disabling channel"
105 + socketFile);
106 return;
107 }
108 }
109 // The socket file cannot be removed ...
110 if (!socketFile.delete()) {
111 log.error( "Can't remove socket file " + socketFile);
112 return;
113 }
114
115
116 super.initNative( "channel.un:" + file );
117
118 if( apr==null || ! apr.isLoaded() ) {
119 log.debug("Apr is not available, disabling unix channel ");
120 apr=null;
121 return;
122 }
123
124 // Set properties and call init.
125 setNativeAttribute( "file", file );
126 // unixListenSocket=apr.unSocketListen( file, 10 );
127
128 setNativeAttribute( "listen", "10" );
129 // setNativeAttribute( "debug", "10" );
130
131 // Initialize the thread pool and execution chain
132 if( next==null && wEnv!=null ) {
133 if( nextName!=null )
134 setNext( wEnv.getHandler( nextName ) );
135 if( next==null )
136 next=wEnv.getHandler( "dispatch" );
137 if( next==null )
138 next=wEnv.getHandler( "request" );
139 }
140
141 super.initJkComponent();
142 JMXRequestNote =wEnv.getNoteId( WorkerEnv.ENDPOINT_NOTE, "requestNote");
143 // Run a thread that will accept connections.
144 if( this.domain != null ) {
145 try {
146 tpOName=new ObjectName(domain + ":type=ThreadPool,name=" +
147 getChannelName());
148
149 Registry.getRegistry(null, null)
150 .registerComponent(tp, tpOName, null);
151
152 rgOName = new ObjectName
153 (domain+":type=GlobalRequestProcessor,name=" + getChannelName());
154 Registry.getRegistry(null, null)
155 .registerComponent(global, rgOName, null);
156 } catch (Exception e) {
157 log.error("Can't register threadpool" );
158 }
159 }
160 tp.start();
161 AprAcceptor acceptAjp=new AprAcceptor( this );
162 tp.runIt( acceptAjp);
163 log.info("JK: listening on unix socket: " + file );
164
165 }
166
167 ObjectName tpOName;
168 ObjectName rgOName;
169 RequestGroupInfo global=new RequestGroupInfo();
170 int count = 0;
171 int JMXRequestNote;
172
173 public void start() throws IOException {
174 }
175
176 public void destroy() throws IOException {
177 if( apr==null ) return;
178 try {
179 if( tp != null )
180 tp.shutdown();
181
182 //apr.unSocketClose( unixListenSocket,3);
183 super.destroyJkComponent();
184
185 if(tpOName != null) {
186 Registry.getRegistry(null, null).unregisterComponent(tpOName);
187 }
188 if(rgOName != null) {
189 Registry.getRegistry(null, null).unregisterComponent(rgOName);
190 }
191 } catch(Exception e) {
192 log.error("Error in destroy",e);
193 }
194 }
195
196 public void registerRequest(Request req, MsgContext ep, int count) {
197 if(this.domain != null) {
198 try {
199
200 RequestInfo rp=req.getRequestProcessor();
201 rp.setGlobalProcessor(global);
202 ObjectName roname = new ObjectName
203 (getDomain() + ":type=RequestProcessor,worker="+
204 getChannelName()+",name=JkRequest" +count);
205 ep.setNote(JMXRequestNote, roname);
206
207 Registry.getRegistry(null, null).registerComponent( rp, roname, null);
208 } catch( Exception ex ) {
209 log.warn("Error registering request");
210 }
211 }
212 }
213
214
215 /** Open a connection - since we're listening that will block in
216 accept
217 */
218 public int open(MsgContext ep) throws IOException {
219 // Will associate a jk_endpoint with ep and call open() on it.
220 // jk_channel_un will accept a connection and set the socket info
221 // in the endpoint. MsgContext will represent an active connection.
222 return super.nativeDispatch( ep.getMsg(0), ep, CH_OPEN, 1 );
223 }
224
225 public void close(MsgContext ep) throws IOException {
226 super.nativeDispatch( ep.getMsg(0), ep, CH_CLOSE, 1 );
227 }
228
229 public int send( Msg msg, MsgContext ep)
230 throws IOException
231 {
232 return super.nativeDispatch( msg, ep, CH_WRITE, 0 );
233 }
234
235 public int receive( Msg msg, MsgContext ep )
236 throws IOException
237 {
238 int rc=super.nativeDispatch( msg, ep, CH_READ, 1 );
239
240 if( rc!=0 ) {
241 log.error("receive error: " + rc, new Throwable());
242 return -1;
243 }
244
245 msg.processHeader();
246
247 if (log.isDebugEnabled())
248 log.debug("receive: total read = " + msg.getLen());
249
250 return msg.getLen();
251 }
252
253 public int flush( Msg msg, MsgContext ep) throws IOException {
254 return OK;
255 }
256
257 public boolean isSameAddress( MsgContext ep ) {
258 return false; // Not supporting shutdown on this channel.
259 }
260
261 boolean running=true;
262
263 /** Accept incoming connections, dispatch to the thread pool
264 */
265 void acceptConnections() {
266 if( apr==null ) return;
267
268 if( log.isDebugEnabled() )
269 log.debug("Accepting ajp connections on " + file);
270
271 while( running ) {
272 try {
273 MsgContext ep=this.createMsgContext();
274
275 // blocking - opening a server connection.
276 int status=this.open(ep);
277 if( status != 0 && status != 2 ) {
278 log.error( "Error acceptin connection on " + file );
279 break;
280 }
281
282 // if( log.isDebugEnabled() )
283 // log.debug("Accepted ajp connections ");
284
285 AprConnection ajpConn= new AprConnection(this, ep);
286 tp.runIt( ajpConn );
287 } catch( Exception ex ) {
288 ex.printStackTrace();
289 }
290 }
291 }
292
293 /** Process a single ajp connection.
294 */
295 void processConnection(MsgContext ep) {
296 if( log.isDebugEnabled() )
297 log.debug( "New ajp connection ");
298 try {
299 MsgAjp recv=new MsgAjp();
300 while( running ) {
301 int res=this.receive( recv, ep );
302 if( res<0 ) {
303 // EOS
304 break;
305 }
306 ep.setType(0);
307 log.debug( "Process msg ");
308 int status=next.invoke( recv, ep );
309 }
310 if( log.isDebugEnabled() )
311 log.debug( "Closing un channel");
312 try{
313 Request req = (Request)ep.getRequest();
314 if( req != null ) {
315 ObjectName roname = (ObjectName)ep.getNote(JMXRequestNote);
316 if( roname != null ) {
317 Registry.getRegistry(null, null).unregisterComponent(roname);
318 }
319 req.getRequestProcessor().setGlobalProcessor(null);
320 }
321 } catch( Exception ee) {
322 log.error( "Error, releasing connection",ee);
323 }
324 this.close( ep );
325 } catch( Exception ex ) {
326 ex.printStackTrace();
327 }
328 }
329
330 public int invoke( Msg msg, MsgContext ep ) throws IOException {
331 int type=ep.getType();
332
333 switch( type ) {
334 case JkHandler.HANDLE_RECEIVE_PACKET:
335 return receive( msg, ep );
336 case JkHandler.HANDLE_SEND_PACKET:
337 return send( msg, ep );
338 case JkHandler.HANDLE_FLUSH:
339 return flush( msg, ep );
340 }
341
342 // return next.invoke( msg, ep );
343 return OK;
344 }
345
346 public String getChannelName() {
347 String encodedAddr = "";
348 String address = file;
349 if (address != null) {
350 encodedAddr = "" + address;
351 if (encodedAddr.startsWith("/"))
352 encodedAddr = encodedAddr.substring(1);
353 encodedAddr = URLEncoder.encode(encodedAddr) ;
354 }
355 return ("jk-" + encodedAddr);
356 }
357
358 private static org.apache.juli.logging.Log log=
359 org.apache.juli.logging.LogFactory.getLog( ChannelUn.class );
360 }
361
362 class AprAcceptor implements ThreadPoolRunnable {
363 ChannelUn wajp;
364
365 AprAcceptor(ChannelUn wajp ) {
366 this.wajp=wajp;
367 }
368
369 public Object[] getInitData() {
370 return null;
371 }
372
373 public void runIt(Object thD[]) {
374 wajp.acceptConnections();
375 }
376 }
377
378 class AprConnection implements ThreadPoolRunnable {
379 ChannelUn wajp;
380 MsgContext ep;
381
382 AprConnection(ChannelUn wajp, MsgContext ep) {
383 this.wajp=wajp;
384 this.ep=ep;
385 }
386
387
388 public Object[] getInitData() {
389 return null;
390 }
391
392 public void runIt(Object perTh[]) {
393 wajp.processConnection(ep);
394 }
395 }