1 /*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19
20
21 package org.apache.axis2.engine;
22
23 import java.util.ArrayList;
24 import java.util.Iterator;
25 import java.util.List;
26
27 import javax.xml.namespace.QName;
28
29 import org.apache.axiom.soap.RolePlayer;
30 import org.apache.axiom.soap.SOAPEnvelope;
31 import org.apache.axiom.soap.SOAPHeaderBlock;
32 import org.apache.axis2.AxisFault;
33 import org.apache.axis2.Constants;
34 import org.apache.axis2.client.async.AxisCallback;
35 import org.apache.axis2.client.async.Callback;
36 import org.apache.axis2.context.ConfigurationContext;
37 import org.apache.axis2.context.MessageContext;
38 import org.apache.axis2.context.OperationContext;
39 import org.apache.axis2.description.AxisOperation;
40 import org.apache.axis2.description.TransportOutDescription;
41 import org.apache.axis2.description.WSDL2Constants;
42 import org.apache.axis2.engine.Handler.InvocationResponse;
43 import org.apache.axis2.i18n.Messages;
44 import org.apache.axis2.transport.TransportSender;
45 import org.apache.axis2.util.CallbackReceiver;
46 import org.apache.axis2.util.LoggingControl;
47 import org.apache.axis2.util.MessageContextBuilder;
48 import org.apache.axis2.wsdl.WSDLConstants;
49 import org.apache.commons.logging.Log;
50 import org.apache.commons.logging.LogFactory;
51
52 /**
53 * There is one engine for the Server and the Client. the send() and receive()
54 * Methods are the basic operations the Sync, Async messageing are build on top.
55 */
56 public class AxisEngine {
57
58 /**
59 * Field log
60 */
61 private static final Log log = LogFactory.getLog(AxisEngine.class);
62
63 private static boolean RESUMING_EXECUTION = true;
64 private static boolean NOT_RESUMING_EXECUTION = false;
65
66 private static void checkMustUnderstand(MessageContext msgContext) throws AxisFault {
67 List<QName> unprocessed = null;
68 SOAPEnvelope envelope = msgContext.getEnvelope();
69 if (envelope.getHeader() == null) {
70 return;
71 }
72 // Get all the headers targeted to us
73 Iterator headerBlocks = envelope.getHeader().getHeadersToProcess((RolePlayer)msgContext.getConfigurationContext().getAxisConfiguration().getParameterValue("rolePlayer"));
74 while (headerBlocks.hasNext()) {
75 SOAPHeaderBlock headerBlock = (SOAPHeaderBlock) headerBlocks.next();
76 QName headerName = headerBlock.getQName();
77 // if this header block has been processed or mustUnderstand isn't
78 // turned on then its cool
79 if (headerBlock.isProcessed() || !headerBlock.getMustUnderstand()) {
80 continue;
81 }
82
83 if(LoggingControl.debugLoggingAllowed && log.isDebugEnabled()){
84 log.debug("MustUnderstand header not processed or registered as understood"+headerName);
85 }
86 if(isReceiverMustUnderstandProcessor(msgContext)){
87 if(unprocessed == null){
88 unprocessed = new ArrayList<QName>();
89 }
90 if(!unprocessed.contains(headerName)){
91 unprocessed.add(headerName);
92 }
93 continue;
94 }
95 // Oops, throw an appropriate MustUnderstand fault!!
96 QName faultQName = headerBlock.getVersion().getMustUnderstandFaultCode();
97 throw new AxisFault(Messages.getMessage("mustunderstandfailed",
98 headerBlock.getNamespace().getNamespaceURI(),
99 headerBlock.getLocalName()), faultQName);
100 }
101 if(unprocessed !=null && unprocessed.size()>0){
102 //Adding HeaderQNames that failed MU check as AxisService Parameter
103 //They will be examined later by MessageReceivers.
104 if(log.isDebugEnabled()){
105 log.debug("Adding Unprocessed headers to MessageContext.");
106 }
107 msgContext.setProperty(Constants.UNPROCESSED_HEADER_QNAMES, unprocessed);
108 }
109 }
110
111 private static boolean isReceiverMustUnderstandProcessor(MessageContext msgContext){
112 MessageReceiver receiver = null;
113 if(msgContext.isServerSide()){
114 receiver = msgContext.getAxisOperation().getMessageReceiver();
115 }
116 return (receiver!=null && receiver.getClass().getName().endsWith("JAXWSMessageReceiver"));
117 }
118 /**
119 * This method is called to handle any error that occurs at inflow or outflow. But if the
120 * method is called twice, it implies that sending the error handling has failed, in which case
121 * the method logs the error and exists.
122 *
123 * @deprecated (post 1.1 branch)
124 */
125 public static MessageContext createFaultMessageContext(MessageContext processingContext, Throwable e)
126 throws AxisFault {
127 return MessageContextBuilder.createFaultMessageContext(processingContext, e);
128 }
129
130 /**
131 * This methods represents the inflow of the Axis, this could be either at the server side or the client side.
132 * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
133 * deployment time by the deployment module
134 *
135 * @throws AxisFault
136 * @see MessageContext
137 * @see Phase
138 * @see Handler
139 */
140 public static InvocationResponse receive(MessageContext msgContext) throws AxisFault {
141 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
142 log.trace(msgContext.getLogIDString() + " receive:" + msgContext.getMessageID());
143 }
144 ConfigurationContext confContext = msgContext.getConfigurationContext();
145 List<Phase> preCalculatedPhases;
146 if (msgContext.isFault() || msgContext.isProcessingFault()) {
147 preCalculatedPhases = confContext.getAxisConfiguration().getInFaultFlowPhases();
148 msgContext.setFLOW(MessageContext.IN_FAULT_FLOW);
149 } else {
150 preCalculatedPhases = confContext.getAxisConfiguration().getInFlowPhases();
151 msgContext.setFLOW(MessageContext.IN_FLOW);
152 }
153 // Set the initial execution chain in the MessageContext to a *copy* of what
154 // we got above. This allows individual message processing to change the chain without
155 // affecting later messages.
156 ArrayList<Handler> executionChain = new ArrayList<Handler>();
157 executionChain.addAll(preCalculatedPhases);
158 msgContext.setExecutionChain(executionChain);
159 try {
160 InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
161
162 if (pi.equals(InvocationResponse.CONTINUE)) {
163 checkMustUnderstand(msgContext);
164 if (msgContext.isServerSide()) {
165 // invoke the Message Receivers
166
167 MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
168 if (receiver == null) {
169 throw new AxisFault(Messages.getMessage(
170 "nomessagereciever",
171 msgContext.getAxisOperation().getName().toString()));
172 }
173 receiver.receive(msgContext);
174 }
175 flowComplete(msgContext);
176 } else if (pi.equals(InvocationResponse.SUSPEND)) {
177 return pi;
178 } else if (pi.equals(InvocationResponse.ABORT)) {
179 flowComplete(msgContext);
180 // Undo any partial work.
181 // Remove the incoming message context
182 if (log.isDebugEnabled()) {
183 log.debug("InvocationResponse is aborted. " +
184 "The incoming MessageContext is removed, " +
185 "and the OperationContext is marked as incomplete");
186 }
187 AxisOperation axisOp = msgContext.getAxisOperation();
188 if(axisOp!=null){
189 String mepURI = axisOp.getMessageExchangePattern();
190 if (WSDL2Constants.MEP_URI_OUT_IN.equals(mepURI)) {
191 OperationContext opCtx = msgContext.getOperationContext();
192 if (opCtx != null) {
193 opCtx.removeMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
194 }
195 }
196 }
197 else{
198 log.debug("Could not clean up op ctx for " + msgContext);
199 }
200 return pi;
201 } else {
202 String errorMsg =
203 "Unrecognized InvocationResponse encountered in AxisEngine.receive()";
204 log.error(msgContext.getLogIDString() + " " + errorMsg);
205 throw new AxisFault(errorMsg);
206 }
207 }
208 catch (AxisFault e) {
209 log.error(e.getMessage(), e);
210 msgContext.setFailureReason(e);
211 flowComplete(msgContext);
212 throw e;
213 }
214
215 return InvocationResponse.CONTINUE;
216 }
217
218
219 /**
220 * Take the execution chain from the msgContext , and then take the current Index
221 * and invoke all the phases in the arraylist
222 * if the msgContext is pauesd then the execution will be breaked
223 *
224 * @param msgContext
225 * @return An InvocationResponse that indicates what
226 * the next step in the message processing should be.
227 * @throws AxisFault
228 */
229 private static InvocationResponse invoke(MessageContext msgContext, boolean resuming)
230 throws AxisFault {
231
232 if (msgContext.getCurrentHandlerIndex() == -1) {
233 msgContext.setCurrentHandlerIndex(0);
234 }
235
236 InvocationResponse pi = InvocationResponse.CONTINUE;
237
238 while (msgContext.getCurrentHandlerIndex() < msgContext.getExecutionChain().size()) {
239 Handler currentHandler = (Handler) msgContext.getExecutionChain().
240 get(msgContext.getCurrentHandlerIndex());
241
242 try {
243 if (!resuming) {
244 msgContext.addExecutedPhase(currentHandler);
245 } else {
246 /* If we are resuming the flow, we don't want to add the phase
247 * again, as it has already been added.
248 */
249 resuming = false;
250 }
251 pi = currentHandler.invoke(msgContext);
252 }
253 catch (AxisFault e) {
254 if (msgContext.getCurrentPhaseIndex() == 0) {
255 /* If we got a fault, we still want to add the phase to the
256 list to be executed for flowComplete(...) unless this was
257 the first handler, as then the currentPhaseIndex will be
258 set to 0 and this will look like we've executed all of the
259 handlers. If, at some point, a phase really needs to get
260 notification of flowComplete, then we'll need to introduce
261 some more complex logic to keep track of what has been
262 executed.*/
263 msgContext.removeFirstExecutedPhase();
264 }
265 throw e;
266 }
267
268 if (pi.equals(InvocationResponse.SUSPEND) ||
269 pi.equals(InvocationResponse.ABORT)) {
270 break;
271 }
272
273 msgContext.setCurrentHandlerIndex(msgContext.getCurrentHandlerIndex() + 1);
274 }
275
276 return pi;
277 }
278
279 private static void flowComplete(MessageContext msgContext) {
280 Iterator<Handler> invokedPhaseIterator = msgContext.getExecutedPhases();
281
282 while (invokedPhaseIterator.hasNext()) {
283 Handler currentHandler = ((Handler) invokedPhaseIterator.next());
284 currentHandler.flowComplete(msgContext);
285 }
286
287 /*This is needed because the OutInAxisOperation currently invokes
288 * receive() even when a fault occurs, and we will have already executed
289 * the flowComplete on those before receiveFault() is called.
290 */
291 msgContext.resetExecutedPhases();
292 }
293
294 /**
295 * If the msgConetext is puased and try to invoke then
296 * first invoke the phase list and after the message receiver
297 *
298 * @param msgContext
299 * @return An InvocationResponse allowing the invoker to perhaps determine
300 * whether or not the message processing will ever succeed.
301 * @throws AxisFault
302 */
303 public static InvocationResponse resumeReceive(MessageContext msgContext) throws AxisFault {
304 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
305 log.trace(msgContext.getLogIDString() + " resumeReceive:" + msgContext.getMessageID());
306 }
307
308 //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeReceiveFault as well, when, in fact, this does both
309 //REVIEW: Unlike with receive, there is no wrapping try/catch clause which would
310 //fire off the flowComplete on an error, as we have to assume that the
311 //message will be resumed again, but perhaps we need to unwind back to
312 //the point at which the message was resumed and provide another API
313 //to allow the full unwind if the message is going to be discarded.
314 //invoke the phases
315 InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
316 //invoking the MR
317
318 if (pi.equals(InvocationResponse.CONTINUE)) {
319 checkMustUnderstand(msgContext);
320 if (msgContext.isServerSide()) {
321 // invoke the Message Receivers
322 MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
323 if (receiver == null) {
324 throw new AxisFault(Messages.getMessage(
325 "nomessagereciever",
326 msgContext.getAxisOperation().getName().toString()));
327 }
328 receiver.receive(msgContext);
329 }
330 flowComplete(msgContext);
331 }
332
333 return pi;
334 }
335
336 /**
337 * To resume the invocation at the send path , this is neened since it is require to call
338 * TransportSender at the end
339 *
340 * @param msgContext
341 * @return An InvocationResponse allowing the invoker to perhaps determine
342 * whether or not the message processing will ever succeed.
343 * @throws AxisFault
344 */
345 public static InvocationResponse resumeSend(MessageContext msgContext) throws AxisFault {
346 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
347 log.trace(msgContext.getLogIDString() + " resumeSend:" + msgContext.getMessageID());
348 }
349
350 //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
351 //REVIEW: Unlike with send, there is no wrapping try/catch clause which would
352 //fire off the flowComplete on an error, as we have to assume that the
353 //message will be resumed again, but perhaps we need to unwind back to
354 //the point at which the message was resumed and provide another API
355 //to allow the full unwind if the message is going to be discarded.
356 //invoke the phases
357 InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
358 //Invoking Transport Sender
359 if (pi.equals(InvocationResponse.CONTINUE)) {
360 // write the Message to the Wire
361 TransportOutDescription transportOut = msgContext.getTransportOut();
362 TransportSender sender = transportOut.getSender();
363 sender.invoke(msgContext);
364 flowComplete(msgContext);
365 }
366
367 return pi;
368 }
369
370 /**
371 * Resume processing of a message.
372 *
373 * @param msgctx
374 * @return An InvocationResponse allowing the invoker to perhaps determine
375 * whether or not the message processing will ever succeed.
376 * @throws AxisFault
377 */
378 public static InvocationResponse resume(MessageContext msgctx) throws AxisFault {
379 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
380 log.trace(msgctx.getLogIDString() + " resume:" + msgctx.getMessageID());
381 }
382
383 msgctx.setPaused(false);
384 if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
385 return resumeReceive(msgctx);
386 } else {
387 return resumeSend(msgctx);
388 }
389 }
390
391 /**
392 * This methods represents the outflow of the Axis, this could be either at the server side or the client side.
393 * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
394 * deployment time by the deployment module
395 *
396 * @param msgContext
397 * @throws AxisFault
398 * @see MessageContext
399 * @see Phase
400 * @see Handler
401 */
402 public static void send(MessageContext msgContext) throws AxisFault {
403 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
404 log.trace(msgContext.getLogIDString() + " send:" + msgContext.getMessageID());
405 }
406 // find and invoke the Phases
407 OperationContext operationContext = msgContext.getOperationContext();
408 ArrayList executionChain = operationContext.getAxisOperation().getPhasesOutFlow();
409 //rather than having two steps added both oparation and global chain together
410 ArrayList outPhases = new ArrayList();
411 outPhases.addAll(executionChain);
412 outPhases.addAll(msgContext.getConfigurationContext().getAxisConfiguration().getOutFlowPhases());
413 msgContext.setExecutionChain(outPhases);
414 msgContext.setFLOW(MessageContext.OUT_FLOW);
415 try {
416 InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
417
418 if (pi.equals(InvocationResponse.CONTINUE)) {
419 // write the Message to the Wire
420 TransportOutDescription transportOut = msgContext.getTransportOut();
421 if (transportOut == null) {
422 throw new AxisFault("Transport out has not been set");
423 }
424 TransportSender sender = transportOut.getSender();
425 // This boolean property only used in client side fireAndForget invocation
426 //It will set a property into message context and if some one has set the
427 //property then transport sender will invoke in a diffrent thread
428 Object isTransportNonBlocking = msgContext.getProperty(
429 MessageContext.TRANSPORT_NON_BLOCKING);
430 if (isTransportNonBlocking != null &&
431 ((Boolean) isTransportNonBlocking).booleanValue()) {
432 msgContext.getConfigurationContext().getThreadPool().execute(
433 new TransportNonBlockingInvocationWorker(msgContext, sender));
434 } else {
435 sender.invoke(msgContext);
436 }
437 //REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
438 flowComplete(msgContext);
439 } else if (pi.equals(InvocationResponse.SUSPEND)) {
440 } else if (pi.equals(InvocationResponse.ABORT)) {
441 flowComplete(msgContext);
442 } else {
443 String errorMsg =
444 "Unrecognized InvocationResponse encountered in AxisEngine.send()";
445 log.error(msgContext.getLogIDString() + " " + errorMsg);
446 throw new AxisFault(errorMsg);
447 }
448 } catch (AxisFault e) {
449 msgContext.setFailureReason(e);
450 flowComplete(msgContext);
451 throw e;
452 }
453 }
454
455 /**
456 * Sends the SOAP Fault to another SOAP node.
457 *
458 * @param msgContext
459 * @throws AxisFault
460 */
461 public static void sendFault(MessageContext msgContext) throws AxisFault {
462 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
463 log.trace(msgContext.getLogIDString() + " sendFault:" + msgContext.getMessageID());
464 }
465 OperationContext opContext = msgContext.getOperationContext();
466
467 //FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
468
469 // find and execute the Fault Out Flow Handlers
470 if (opContext != null) {
471 AxisOperation axisOperation = opContext.getAxisOperation();
472 ArrayList faultExecutionChain = axisOperation.getPhasesOutFaultFlow();
473
474 //adding both operation specific and global out fault flows.
475
476 ArrayList outFaultPhases = new ArrayList();
477 outFaultPhases.addAll((ArrayList) faultExecutionChain.clone());
478 msgContext.setExecutionChain((ArrayList) outFaultPhases.clone());
479 msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
480 try {
481 InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
482
483 if (pi.equals(InvocationResponse.SUSPEND)) {
484 log.warn(msgContext.getLogIDString() +
485 " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
486 return;
487 } else if (pi.equals(InvocationResponse.ABORT)) {
488 flowComplete(msgContext);
489 return;
490 } else if (!pi.equals(InvocationResponse.CONTINUE)) {
491 String errorMsg =
492 "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
493 log.error(msgContext.getLogIDString() + " " + errorMsg);
494 throw new AxisFault(errorMsg);
495 }
496 }
497 catch (AxisFault e) {
498 msgContext.setFailureReason(e);
499 flowComplete(msgContext);
500 throw e;
501 }
502 }
503
504 ArrayList<Handler> executionChain = new ArrayList<Handler>(msgContext.getConfigurationContext()
505 .getAxisConfiguration().getOutFaultFlowPhases());
506 msgContext.setExecutionChain(executionChain);
507 msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
508 InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
509
510 if (pi.equals(InvocationResponse.CONTINUE)) {
511 // Actually send the SOAP Fault
512 TransportOutDescription transportOut = msgContext.getTransportOut();
513 if (transportOut == null) {
514 throw new AxisFault("Transport out has not been set");
515 }
516 TransportSender sender = transportOut.getSender();
517
518 sender.invoke(msgContext);
519 flowComplete(msgContext);
520 } else if (pi.equals(InvocationResponse.SUSPEND)) {
521 } else if (pi.equals(InvocationResponse.ABORT)) {
522 flowComplete(msgContext);
523 } else {
524 String errorMsg =
525 "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
526 log.error(msgContext.getLogIDString() + " " + errorMsg);
527 throw new AxisFault(errorMsg);
528 }
529 }
530
531 /**
532 * here we assume that it is resume from an operation level handler
533 * @param msgContext
534 * @throws AxisFault
535 */
536 public static void resumeSendFault(MessageContext msgContext) throws AxisFault{
537 if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
538 log.trace(msgContext.getLogIDString() + " resumeSendFault:" + msgContext.getMessageID());
539 }
540 OperationContext opContext = msgContext.getOperationContext();
541
542 if (opContext != null) {
543
544 try {
545 InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
546
547 if (pi.equals(InvocationResponse.SUSPEND)) {
548 log.warn(msgContext.getLogIDString() +
549 " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
550 return;
551 } else if (pi.equals(InvocationResponse.ABORT)) {
552 flowComplete(msgContext);
553 return;
554 } else if (!pi.equals(InvocationResponse.CONTINUE)) {
555 String errorMsg =
556 "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
557 log.error(msgContext.getLogIDString() + " " + errorMsg);
558 throw new AxisFault(errorMsg);
559 }
560 } catch (AxisFault e) {
561 msgContext.setFailureReason(e);
562 flowComplete(msgContext);
563 throw e;
564 }
565 }
566
567 ArrayList<Handler> executionChain = new ArrayList<Handler>(msgContext.getConfigurationContext()
568 .getAxisConfiguration().getOutFaultFlowPhases());
569 msgContext.setExecutionChain(executionChain);
570 msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
571 InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
572
573 if (pi.equals(InvocationResponse.CONTINUE)) {
574 // Actually send the SOAP Fault
575 TransportOutDescription transportOut = msgContext.getTransportOut();
576 if (transportOut == null) {
577 throw new AxisFault("Transport out has not been set");
578 }
579 TransportSender sender = transportOut.getSender();
580
581 sender.invoke(msgContext);
582 flowComplete(msgContext);
583 } else if (pi.equals(InvocationResponse.SUSPEND)) {
584 } else if (pi.equals(InvocationResponse.ABORT)) {
585 flowComplete(msgContext);
586 } else {
587 String errorMsg =
588 "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
589 log.error(msgContext.getLogIDString() + " " + errorMsg);
590 throw new AxisFault(errorMsg);
591 }
592 }
593
594
595 /**
596 * This class is used when someone invoke a service invocation with two transports
597 * If we dont create a new thread then the main thread will block untill it gets the
598 * response . In the case of HTTP transportsender will block untill it gets HTTP 200
599 * So , main thread also block till transport sender rereases the tread. So there is no
600 * actual non-blocking. That is why when sending we creat a new thead and send the
601 * requset via that.
602 * <p/>
603 * So whole porpose of this class to send the requset via a new thread
604 * <p/>
605 * way transport.
606 */
607 private static class TransportNonBlockingInvocationWorker implements Runnable {
608 private MessageContext msgctx;
609 private TransportSender sender;
610
611 public TransportNonBlockingInvocationWorker(MessageContext msgctx,
612 TransportSender sender) {
613 this.msgctx = msgctx;
614 this.sender = sender;
615 }
616
617 public void run() {
618 try {
619 sender.invoke(msgctx);
620 } catch (Exception e) {
621 log.info(msgctx.getLogIDString() + " " + e.getMessage());
622 if (msgctx.getProperty(MessageContext.DISABLE_ASYNC_CALLBACK_ON_TRANSPORT_ERROR) ==
623 null) {
624 AxisOperation axisOperation = msgctx.getAxisOperation();
625 if (axisOperation != null) {
626 MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
627 if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
628 Object callback = ((CallbackReceiver) msgReceiver)
629 .lookupCallback(msgctx.getMessageID());
630 if (callback == null) return; // TODO: should we log this??
631
632 if (callback instanceof Callback) {
633 // Instances of Callback only expect onComplete to be called
634 // for a successful MEP. Errors are reported through the
635 // Async Response object, which the Callback implementations
636 // all use.
637 ((Callback)callback).onError(e);
638 } else {
639 // The AxisCallback (which is OutInAxisOperationClient$SyncCallBack
640 // used to support async-on-the-wire under a synchronous API
641 // operation) need to be told the MEP is complete after being told
642 // of the error.
643 ((AxisCallback)callback).onError(e);
644 ((AxisCallback)callback).onComplete();
645 }
646 }
647 }
648 }
649 }
650 }
651 }
652 }