1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements. See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License. You may obtain a copy of the License at
8 *
9 * http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17
18 package org.apache.jk.common;
19
20 import java.io.IOException;
21
22 import org.apache.coyote.OutputBuffer;
23 import org.apache.coyote.InputBuffer;
24 import org.apache.coyote.Request;
25 import org.apache.coyote.Response;
26
27 import org.apache.jk.core.Msg;
28 import org.apache.jk.core.MsgContext;
29
30 import org.apache.tomcat.util.buf.ByteChunk;
31 import org.apache.tomcat.util.buf.MessageBytes;
32 import org.apache.tomcat.util.buf.C2BConverter;
33 import org.apache.tomcat.util.http.HttpMessages;
34 import org.apache.tomcat.util.http.MimeHeaders;
35
36 /** Generic input stream impl on top of ajp
37 */
38 public class JkInputStream implements InputBuffer, OutputBuffer {
39 private static org.apache.juli.logging.Log log=
40 org.apache.juli.logging.LogFactory.getLog( JkInputStream.class );
41
42 private Msg bodyMsg;
43 private Msg outputMsg;
44 private MsgContext mc;
45
46
47 // Holds incoming chunks of request body data
48 private MessageBytes bodyBuff = MessageBytes.newInstance();
49 private MessageBytes tempMB = MessageBytes.newInstance();
50 private boolean end_of_stream=false;
51 private boolean isEmpty = true;
52 private boolean isFirst = true;
53 private boolean isReplay = false;
54 private boolean isReadRequired = false;
55
56 static {
57 // Make certain HttpMessages is loaded for SecurityManager
58 try {
59 Class.forName("org.apache.tomcat.util.http.HttpMessages");
60 } catch(Exception ex) {
61 // ignore
62 }
63 }
64
65 public JkInputStream(MsgContext context, int bsize) {
66 mc = context;
67 bodyMsg = new MsgAjp(bsize);
68 outputMsg = new MsgAjp(bsize);
69 }
70 /**
71 * @deprecated
72 */
73 public JkInputStream(MsgContext context) {
74 this(context, 8*1024);
75 }
76
77 // -------------------- Jk specific methods --------------------
78
79
80 /**
81 * Set the flag saying that the server is sending a body
82 */
83 public void setIsReadRequired(boolean irr) {
84 isReadRequired = irr;
85 }
86
87 /**
88 * Return the flag saying that the server is sending a body
89 */
90 public boolean isReadRequired() {
91 return isReadRequired;
92 }
93
94
95 /** Must be called before or after each request
96 */
97 public void recycle() {
98 if(isReadRequired && isFirst) {
99 // The Servlet never read the request body, so we need to junk it
100 try {
101 receive();
102 } catch(IOException iex) {
103 log.debug("Error consuming request body",iex);
104 }
105 }
106
107 end_of_stream = false;
108 isEmpty = true;
109 isFirst = true;
110 isReplay = false;
111 isReadRequired = false;
112 bodyBuff.recycle();
113 tempMB.recycle();
114 }
115
116
117 public void endMessage() throws IOException {
118 outputMsg.reset();
119 outputMsg.appendByte(AjpConstants.JK_AJP13_END_RESPONSE);
120 outputMsg.appendByte(1);
121 mc.getSource().send(outputMsg, mc);
122 mc.getSource().flush(outputMsg, mc);
123 }
124
125
126 // -------------------- OutputBuffer implementation --------------------
127
128
129 public int doWrite(ByteChunk chunk, Response res)
130 throws IOException {
131 if (!res.isCommitted()) {
132 // Send the connector a request for commit. The connector should
133 // then validate the headers, send them (using sendHeader) and
134 // set the filters accordingly.
135 res.sendHeaders();
136 }
137
138 int len=chunk.getLength();
139 byte buf[]=outputMsg.getBuffer();
140 // 4 - hardcoded, byte[] marshalling overhead
141 int chunkSize=buf.length - outputMsg.getHeaderLength() - 4;
142 int off=0;
143 while( len > 0 ) {
144 int thisTime=len;
145 if( thisTime > chunkSize ) {
146 thisTime=chunkSize;
147 }
148 len-=thisTime;
149
150 outputMsg.reset();
151 outputMsg.appendByte( AjpConstants.JK_AJP13_SEND_BODY_CHUNK);
152 if( log.isTraceEnabled() )
153 log.trace("doWrite " + off + " " + thisTime + " " + len );
154 outputMsg.appendBytes( chunk.getBytes(), chunk.getOffset() + off, thisTime );
155 off+=thisTime;
156 mc.getSource().send( outputMsg, mc );
157 }
158 return 0;
159 }
160
161 public int doRead(ByteChunk responseChunk, Request req)
162 throws IOException {
163
164 if( log.isDebugEnabled())
165 log.debug( "doRead " + end_of_stream+
166 " " + responseChunk.getOffset()+ " " + responseChunk.getLength());
167 if( end_of_stream ) {
168 return -1;
169 }
170
171 if( isFirst && isReadRequired ) {
172 // Handle special first-body-chunk, but only if httpd expects it.
173 if( !receive() ) {
174 return 0;
175 }
176 } else if(isEmpty) {
177 if ( !refillReadBuffer() ){
178 return -1;
179 }
180 }
181 ByteChunk bc = bodyBuff.getByteChunk();
182 responseChunk.setBytes( bc.getBuffer(), bc.getStart(), bc.getLength() );
183 isEmpty = true;
184 return responseChunk.getLength();
185 }
186
187 /** Receive a chunk of data. Called to implement the
188 * 'special' packet in ajp13 and to receive the data
189 * after we send a GET_BODY packet
190 */
191 public boolean receive() throws IOException {
192 isFirst = false;
193 bodyMsg.reset();
194 int err = mc.getSource().receive(bodyMsg, mc);
195 if( log.isDebugEnabled() )
196 log.info( "Receiving: getting request body chunk " + err + " " + bodyMsg.getLen() );
197
198 if(err < 0) {
199 throw new IOException();
200 }
201
202 // No data received.
203 if( bodyMsg.getLen() == 0 ) { // just the header
204 // Don't mark 'end of stream' for the first chunk.
205 // end_of_stream = true;
206 return false;
207 }
208 int blen = bodyMsg.peekInt();
209
210 if( blen == 0 ) {
211 return false;
212 }
213
214 if( log.isTraceEnabled() ) {
215 bodyMsg.dump("Body buffer");
216 }
217
218 bodyMsg.getBytes(bodyBuff);
219 if( log.isTraceEnabled() )
220 log.trace( "Data:\n" + bodyBuff);
221 isEmpty = false;
222 return true;
223 }
224
225 /**
226 * Get more request body data from the web server and store it in the
227 * internal buffer.
228 *
229 * @return true if there is more data, false if not.
230 */
231 private boolean refillReadBuffer() throws IOException
232 {
233 // If the server returns an empty packet, assume that that end of
234 // the stream has been reached (yuck -- fix protocol??).
235 if(isReplay) {
236 end_of_stream = true; // we've read everything there is
237 }
238 if (end_of_stream) {
239 if( log.isDebugEnabled() )
240 log.debug("refillReadBuffer: end of stream " );
241 return false;
242 }
243
244 // Why not use outBuf??
245 bodyMsg.reset();
246 bodyMsg.appendByte(AjpConstants.JK_AJP13_GET_BODY_CHUNK);
247 bodyMsg.appendInt(AjpConstants.MAX_READ_SIZE);
248
249 if( log.isDebugEnabled() )
250 log.debug("refillReadBuffer " + Thread.currentThread());
251
252 mc.getSource().send(bodyMsg, mc);
253 mc.getSource().flush(bodyMsg, mc); // Server needs to get it
254
255 // In JNI mode, response will be in bodyMsg. In TCP mode, response need to be
256 // read
257
258 boolean moreData=receive();
259 if( !moreData ) {
260 end_of_stream=true;
261 }
262 return moreData;
263 }
264
265 public void appendHead(Response res) throws IOException {
266 if( log.isDebugEnabled() )
267 log.debug("COMMIT sending headers " + res + " " + res.getMimeHeaders() );
268
269 C2BConverter c2b=mc.getConverter();
270
271 outputMsg.reset();
272 outputMsg.appendByte(AjpConstants.JK_AJP13_SEND_HEADERS);
273 outputMsg.appendInt( res.getStatus() );
274
275 String message=res.getMessage();
276 if( message==null ){
277 message= HttpMessages.getMessage(res.getStatus());
278 } else {
279 message = message.replace('\n', ' ').replace('\r', ' ');
280 }
281 tempMB.setString( message );
282 c2b.convert( tempMB );
283 outputMsg.appendBytes(tempMB);
284
285 // XXX add headers
286
287 MimeHeaders headers=res.getMimeHeaders();
288 String contentType = res.getContentType();
289 if( contentType != null ) {
290 headers.setValue("Content-Type").setString(contentType);
291 }
292 String contentLanguage = res.getContentLanguage();
293 if( contentLanguage != null ) {
294 headers.setValue("Content-Language").setString(contentLanguage);
295 }
296 long contentLength = res.getContentLengthLong();
297 if( contentLength >= 0 ) {
298 headers.setValue("Content-Length").setLong(contentLength);
299 }
300 int numHeaders = headers.size();
301 outputMsg.appendInt(numHeaders);
302 for( int i=0; i<numHeaders; i++ ) {
303 MessageBytes hN=headers.getName(i);
304 // no header to sc conversion - there's little benefit
305 // on this direction
306 c2b.convert ( hN );
307 outputMsg.appendBytes( hN );
308
309 MessageBytes hV=headers.getValue(i);
310 c2b.convert( hV );
311 outputMsg.appendBytes( hV );
312 }
313 mc.getSource().send( outputMsg, mc );
314 }
315
316 /**
317 * Set the replay buffer for Form auth
318 */
319 public void setReplay(ByteChunk replay) {
320 isFirst = false;
321 isEmpty = false;
322 isReplay = true;
323 bodyBuff.setBytes(replay.getBytes(), replay.getStart(), replay.getLength());
324 }
325
326
327 }