Save This Page
Home » axis2-1.5-src » org.apache » axis2 » engine » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements. See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership. The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License. You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing,
   13    * software distributed under the License is distributed on an
   14    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    * KIND, either express or implied. See the License for the
   16    * specific language governing permissions and limitations
   17    * under the License.
   18    */
   19   
   20   
   21   package org.apache.axis2.engine;
   22   
   23   import java.util.ArrayList;
   24   import java.util.Iterator;
   25   import java.util.List;
   26   
   27   import javax.xml.namespace.QName;
   28   
   29   import org.apache.axiom.soap.RolePlayer;
   30   import org.apache.axiom.soap.SOAPEnvelope;
   31   import org.apache.axiom.soap.SOAPHeaderBlock;
   32   import org.apache.axis2.AxisFault;
   33   import org.apache.axis2.Constants;
   34   import org.apache.axis2.client.async.AxisCallback;
   35   import org.apache.axis2.client.async.Callback;
   36   import org.apache.axis2.context.ConfigurationContext;
   37   import org.apache.axis2.context.MessageContext;
   38   import org.apache.axis2.context.OperationContext;
   39   import org.apache.axis2.description.AxisOperation;
   40   import org.apache.axis2.description.TransportOutDescription;
   41   import org.apache.axis2.description.WSDL2Constants;
   42   import org.apache.axis2.engine.Handler.InvocationResponse;
   43   import org.apache.axis2.i18n.Messages;
   44   import org.apache.axis2.transport.TransportSender;
   45   import org.apache.axis2.util.CallbackReceiver;
   46   import org.apache.axis2.util.LoggingControl;
   47   import org.apache.axis2.util.MessageContextBuilder;
   48   import org.apache.axis2.wsdl.WSDLConstants;
   49   import org.apache.commons.logging.Log;
   50   import org.apache.commons.logging.LogFactory;
   51   
   52   /**
   53    * There is one engine for the Server and the Client. the send() and receive()
   54    * Methods are the basic operations the Sync, Async messageing are build on top.
   55    */
   56   public class AxisEngine {
   57   
   58       /**
   59        * Field log
   60        */
   61       private static final Log log = LogFactory.getLog(AxisEngine.class);
   62   
   63       private static boolean RESUMING_EXECUTION = true;
   64       private static boolean NOT_RESUMING_EXECUTION = false;
   65   
   66       private static void checkMustUnderstand(MessageContext msgContext) throws AxisFault {
   67           List<QName> unprocessed = null;
   68           SOAPEnvelope envelope = msgContext.getEnvelope();
   69           if (envelope.getHeader() == null) {
   70               return;
   71           }
   72           // Get all the headers targeted to us
   73           Iterator headerBlocks = envelope.getHeader().getHeadersToProcess((RolePlayer)msgContext.getConfigurationContext().getAxisConfiguration().getParameterValue("rolePlayer"));
   74           while (headerBlocks.hasNext()) {
   75               SOAPHeaderBlock headerBlock = (SOAPHeaderBlock) headerBlocks.next();
   76               QName headerName = headerBlock.getQName();
   77               // if this header block has been processed or mustUnderstand isn't
   78               // turned on then its cool
   79               if (headerBlock.isProcessed() || !headerBlock.getMustUnderstand()) {
   80                   continue;
   81               }
   82   
   83               if(LoggingControl.debugLoggingAllowed && log.isDebugEnabled()){
   84                   log.debug("MustUnderstand header not processed or registered as understood"+headerName);
   85               }
   86               if(isReceiverMustUnderstandProcessor(msgContext)){
   87                   if(unprocessed == null){
   88                       unprocessed = new ArrayList<QName>();
   89                   }
   90                   if(!unprocessed.contains(headerName)){
   91                       unprocessed.add(headerName);
   92                   }
   93                   continue;
   94               }
   95               // Oops, throw an appropriate MustUnderstand fault!!
   96               QName faultQName = headerBlock.getVersion().getMustUnderstandFaultCode();
   97               throw new AxisFault(Messages.getMessage("mustunderstandfailed",
   98                   headerBlock.getNamespace().getNamespaceURI(),
   99                   headerBlock.getLocalName()), faultQName);
  100           }
  101           if(unprocessed !=null && unprocessed.size()>0){
  102               //Adding HeaderQNames that failed MU check as AxisService Parameter
  103               //They will be examined later by MessageReceivers.
  104               if(log.isDebugEnabled()){
  105                   log.debug("Adding Unprocessed headers to MessageContext.");
  106               }
  107               msgContext.setProperty(Constants.UNPROCESSED_HEADER_QNAMES, unprocessed);           
  108           }       
  109       }
  110   
  111       private static boolean isReceiverMustUnderstandProcessor(MessageContext msgContext){
  112           MessageReceiver receiver = null;
  113           if(msgContext.isServerSide()){
  114               receiver = msgContext.getAxisOperation().getMessageReceiver();
  115           }
  116           return (receiver!=null && receiver.getClass().getName().endsWith("JAXWSMessageReceiver"));
  117       }
  118       /**
  119        * This method is called to handle any error that occurs at inflow or outflow. But if the
  120        * method is called twice, it implies that sending the error handling has failed, in which case
  121        * the method logs the error and exists.
  122        *
  123        * @deprecated (post 1.1 branch)
  124        */
  125       public static MessageContext createFaultMessageContext(MessageContext processingContext, Throwable e)
  126               throws AxisFault {
  127           return MessageContextBuilder.createFaultMessageContext(processingContext, e);
  128       }
  129   
  130       /**
  131        * This methods represents the inflow of the Axis, this could be either at the server side or the client side.
  132        * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
  133        * deployment time by the deployment module
  134        *
  135        * @throws AxisFault
  136        * @see MessageContext
  137        * @see Phase
  138        * @see Handler
  139        */
  140       public static InvocationResponse receive(MessageContext msgContext) throws AxisFault {
  141           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  142               log.trace(msgContext.getLogIDString() + " receive:" + msgContext.getMessageID());
  143           }
  144           ConfigurationContext confContext = msgContext.getConfigurationContext();
  145           List<Phase> preCalculatedPhases;
  146           if (msgContext.isFault() || msgContext.isProcessingFault()) {
  147               preCalculatedPhases = confContext.getAxisConfiguration().getInFaultFlowPhases();
  148               msgContext.setFLOW(MessageContext.IN_FAULT_FLOW);
  149           } else {
  150               preCalculatedPhases = confContext.getAxisConfiguration().getInFlowPhases();
  151               msgContext.setFLOW(MessageContext.IN_FLOW);
  152           }
  153           // Set the initial execution chain in the MessageContext to a *copy* of what
  154           // we got above.  This allows individual message processing to change the chain without
  155           // affecting later messages.
  156           ArrayList<Handler> executionChain = new ArrayList<Handler>();
  157           executionChain.addAll(preCalculatedPhases);
  158           msgContext.setExecutionChain(executionChain);
  159           try {
  160               InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  161   
  162               if (pi.equals(InvocationResponse.CONTINUE)) {
  163                   checkMustUnderstand(msgContext);
  164                   if (msgContext.isServerSide()) {
  165                       // invoke the Message Receivers
  166   
  167                       MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
  168                       if (receiver == null) {
  169                           throw new AxisFault(Messages.getMessage(
  170                                   "nomessagereciever",
  171                                   msgContext.getAxisOperation().getName().toString()));
  172                       }
  173                       receiver.receive(msgContext);
  174                   }
  175                   flowComplete(msgContext);
  176               } else if (pi.equals(InvocationResponse.SUSPEND)) {
  177                   return pi;
  178               } else if (pi.equals(InvocationResponse.ABORT)) {
  179                   flowComplete(msgContext);
  180                   // Undo any partial work.
  181                   // Remove the incoming message context
  182                   if (log.isDebugEnabled()) {
  183                       log.debug("InvocationResponse is aborted.  " +
  184                                   "The incoming MessageContext is removed, " +
  185                                   "and the OperationContext is marked as incomplete");
  186                   }
  187   				AxisOperation axisOp = msgContext.getAxisOperation();
  188                   if(axisOp!=null){
  189   					String mepURI  = axisOp.getMessageExchangePattern();
  190   					if (WSDL2Constants.MEP_URI_OUT_IN.equals(mepURI)) {
  191   						OperationContext opCtx = msgContext.getOperationContext();
  192   						if (opCtx != null) {
  193   							opCtx.removeMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
  194   						}
  195   					}
  196   				}
  197   				else{
  198   					log.debug("Could not clean up op ctx for " + msgContext);
  199   				}
  200                   return pi;
  201               } else {
  202                   String errorMsg =
  203                           "Unrecognized InvocationResponse encountered in AxisEngine.receive()";
  204                   log.error(msgContext.getLogIDString() + " " + errorMsg);
  205                   throw new AxisFault(errorMsg);
  206               }
  207           }
  208           catch (AxisFault e) {
  209               log.error(e.getMessage(), e);
  210               msgContext.setFailureReason(e);
  211               flowComplete(msgContext);
  212               throw e;
  213           }
  214   
  215           return InvocationResponse.CONTINUE;
  216       }
  217   
  218   
  219       /**
  220        * Take the execution chain from the msgContext , and then take the current Index
  221        * and invoke all the phases in the arraylist
  222        * if the msgContext is pauesd then the execution will be breaked
  223        *
  224        * @param msgContext
  225        * @return An InvocationResponse that indicates what
  226        *         the next step in the message processing should be.
  227        * @throws AxisFault
  228        */
  229       private static InvocationResponse invoke(MessageContext msgContext, boolean resuming)
  230               throws AxisFault {
  231   
  232           if (msgContext.getCurrentHandlerIndex() == -1) {
  233               msgContext.setCurrentHandlerIndex(0);
  234           }
  235   
  236           InvocationResponse pi = InvocationResponse.CONTINUE;
  237   
  238           while (msgContext.getCurrentHandlerIndex() < msgContext.getExecutionChain().size()) {
  239               Handler currentHandler = (Handler) msgContext.getExecutionChain().
  240                       get(msgContext.getCurrentHandlerIndex());
  241   
  242               try {
  243                   if (!resuming) {
  244                       msgContext.addExecutedPhase(currentHandler);
  245                   } else {
  246                       /* If we are resuming the flow, we don't want to add the phase
  247                       * again, as it has already been added.
  248                       */
  249                       resuming = false;
  250                   }
  251                   pi = currentHandler.invoke(msgContext);
  252               }
  253               catch (AxisFault e) {
  254                   if (msgContext.getCurrentPhaseIndex() == 0) {
  255                       /* If we got a fault, we still want to add the phase to the
  256                       list to be executed for flowComplete(...) unless this was
  257                       the first handler, as then the currentPhaseIndex will be
  258                       set to 0 and this will look like we've executed all of the
  259                       handlers.  If, at some point, a phase really needs to get
  260                       notification of flowComplete, then we'll need to introduce
  261                       some more complex logic to keep track of what has been
  262                       executed.*/
  263                       msgContext.removeFirstExecutedPhase();
  264                   }
  265                   throw e;
  266               }
  267   
  268               if (pi.equals(InvocationResponse.SUSPEND) ||
  269                       pi.equals(InvocationResponse.ABORT)) {
  270                   break;
  271               }
  272   
  273               msgContext.setCurrentHandlerIndex(msgContext.getCurrentHandlerIndex() + 1);
  274           }
  275   
  276           return pi;
  277       }
  278   
  279       private static void flowComplete(MessageContext msgContext) {
  280           Iterator<Handler> invokedPhaseIterator = msgContext.getExecutedPhases();
  281   
  282           while (invokedPhaseIterator.hasNext()) {
  283               Handler currentHandler = ((Handler) invokedPhaseIterator.next());
  284               currentHandler.flowComplete(msgContext);
  285           }
  286   
  287           /*This is needed because the OutInAxisOperation currently invokes
  288           * receive() even when a fault occurs, and we will have already executed
  289           * the flowComplete on those before receiveFault() is called.
  290           */
  291           msgContext.resetExecutedPhases();
  292       }
  293   
  294       /**
  295        * If the msgConetext is puased and try to invoke then
  296        * first invoke the phase list and after the message receiver
  297        *
  298        * @param msgContext
  299        * @return An InvocationResponse allowing the invoker to perhaps determine
  300        *         whether or not the message processing will ever succeed.
  301        * @throws AxisFault
  302        */
  303       public static InvocationResponse resumeReceive(MessageContext msgContext) throws AxisFault {
  304           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  305               log.trace(msgContext.getLogIDString() + " resumeReceive:" + msgContext.getMessageID());
  306           }
  307   
  308           //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeReceiveFault as well, when, in fact, this does both
  309           //REVIEW: Unlike with receive, there is no wrapping try/catch clause which would
  310           //fire off the flowComplete on an error, as we have to assume that the
  311           //message will be resumed again, but perhaps we need to unwind back to
  312           //the point at which the message was resumed and provide another API
  313           //to allow the full unwind if the message is going to be discarded.
  314           //invoke the phases
  315           InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  316           //invoking the MR
  317   
  318           if (pi.equals(InvocationResponse.CONTINUE)) {
  319               checkMustUnderstand(msgContext);
  320               if (msgContext.isServerSide()) {
  321                   // invoke the Message Receivers
  322                   MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
  323                   if (receiver == null) {
  324                       throw new AxisFault(Messages.getMessage(
  325                               "nomessagereciever",
  326                               msgContext.getAxisOperation().getName().toString()));
  327                   }
  328                   receiver.receive(msgContext);
  329               }
  330               flowComplete(msgContext);
  331           }
  332   
  333           return pi;
  334       }
  335   
  336       /**
  337        * To resume the invocation at the send path , this is neened since it is require to call
  338        * TransportSender at the end
  339        *
  340        * @param msgContext
  341        * @return An InvocationResponse allowing the invoker to perhaps determine
  342        *         whether or not the message processing will ever succeed.
  343        * @throws AxisFault
  344        */
  345       public static InvocationResponse resumeSend(MessageContext msgContext) throws AxisFault {
  346           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  347               log.trace(msgContext.getLogIDString() + " resumeSend:" + msgContext.getMessageID());
  348           }
  349   
  350           //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
  351           //REVIEW: Unlike with send, there is no wrapping try/catch clause which would
  352           //fire off the flowComplete on an error, as we have to assume that the
  353           //message will be resumed again, but perhaps we need to unwind back to
  354           //the point at which the message was resumed and provide another API
  355           //to allow the full unwind if the message is going to be discarded.
  356           //invoke the phases
  357           InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  358           //Invoking Transport Sender
  359           if (pi.equals(InvocationResponse.CONTINUE)) {
  360               // write the Message to the Wire
  361               TransportOutDescription transportOut = msgContext.getTransportOut();
  362               TransportSender sender = transportOut.getSender();
  363               sender.invoke(msgContext);
  364               flowComplete(msgContext);
  365           }
  366   
  367           return pi;
  368       }
  369   
  370       /**
  371        * Resume processing of a message.
  372        *
  373        * @param msgctx
  374        * @return An InvocationResponse allowing the invoker to perhaps determine
  375        *         whether or not the message processing will ever succeed.
  376        * @throws AxisFault
  377        */
  378       public static InvocationResponse resume(MessageContext msgctx) throws AxisFault {
  379           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  380               log.trace(msgctx.getLogIDString() + " resume:" + msgctx.getMessageID());
  381           }
  382   
  383           msgctx.setPaused(false);
  384           if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
  385               return resumeReceive(msgctx);
  386           } else {
  387               return resumeSend(msgctx);
  388           }
  389       }
  390   
  391       /**
  392        * This methods represents the outflow of the Axis, this could be either at the server side or the client side.
  393        * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
  394        * deployment time by the deployment module
  395        *
  396        * @param msgContext
  397        * @throws AxisFault
  398        * @see MessageContext
  399        * @see Phase
  400        * @see Handler
  401        */
  402       public static void send(MessageContext msgContext) throws AxisFault {
  403           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  404               log.trace(msgContext.getLogIDString() + " send:" + msgContext.getMessageID());
  405           }
  406           // find and invoke the Phases
  407           OperationContext operationContext = msgContext.getOperationContext();
  408           ArrayList executionChain = operationContext.getAxisOperation().getPhasesOutFlow();
  409           //rather than having two steps added both oparation and global chain together
  410           ArrayList outPhases = new ArrayList();
  411           outPhases.addAll(executionChain);
  412           outPhases.addAll(msgContext.getConfigurationContext().getAxisConfiguration().getOutFlowPhases());
  413           msgContext.setExecutionChain(outPhases);
  414           msgContext.setFLOW(MessageContext.OUT_FLOW);
  415           try {
  416               InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  417   
  418               if (pi.equals(InvocationResponse.CONTINUE)) {
  419                   // write the Message to the Wire
  420                   TransportOutDescription transportOut = msgContext.getTransportOut();
  421                   if (transportOut == null) {
  422                       throw new AxisFault("Transport out has not been set");
  423                   }
  424                   TransportSender sender = transportOut.getSender();
  425                   // This boolean property only used in client side fireAndForget invocation
  426                   //It will set a property into message context and if some one has set the
  427                   //property then transport sender will invoke in a diffrent thread
  428                   Object isTransportNonBlocking = msgContext.getProperty(
  429                           MessageContext.TRANSPORT_NON_BLOCKING);
  430                   if (isTransportNonBlocking != null &&
  431                           ((Boolean) isTransportNonBlocking).booleanValue()) {
  432                       msgContext.getConfigurationContext().getThreadPool().execute(
  433                               new TransportNonBlockingInvocationWorker(msgContext, sender));
  434                   } else {
  435                       sender.invoke(msgContext);
  436                   }
  437                   //REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
  438                   flowComplete(msgContext);
  439               } else if (pi.equals(InvocationResponse.SUSPEND)) {
  440               } else if (pi.equals(InvocationResponse.ABORT)) {
  441                   flowComplete(msgContext);
  442               } else {
  443                   String errorMsg =
  444                           "Unrecognized InvocationResponse encountered in AxisEngine.send()";
  445                   log.error(msgContext.getLogIDString() + " " + errorMsg);
  446                   throw new AxisFault(errorMsg);
  447               }
  448           } catch (AxisFault e) {
  449               msgContext.setFailureReason(e);
  450               flowComplete(msgContext);
  451               throw e;
  452           }
  453       }
  454   
  455       /**
  456        * Sends the SOAP Fault to another SOAP node.
  457        *
  458        * @param msgContext
  459        * @throws AxisFault
  460        */
  461       public static void sendFault(MessageContext msgContext) throws AxisFault {
  462           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  463               log.trace(msgContext.getLogIDString() + " sendFault:" + msgContext.getMessageID());
  464           }
  465           OperationContext opContext = msgContext.getOperationContext();
  466   
  467           //FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
  468   
  469           // find and execute the Fault Out Flow Handlers
  470           if (opContext != null) {
  471               AxisOperation axisOperation = opContext.getAxisOperation();
  472               ArrayList faultExecutionChain = axisOperation.getPhasesOutFaultFlow();
  473   
  474               //adding both operation specific and global out fault flows.
  475   
  476               ArrayList outFaultPhases = new ArrayList();
  477               outFaultPhases.addAll((ArrayList) faultExecutionChain.clone());
  478               msgContext.setExecutionChain((ArrayList) outFaultPhases.clone());
  479               msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  480               try {
  481                   InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  482   
  483                   if (pi.equals(InvocationResponse.SUSPEND)) {
  484                       log.warn(msgContext.getLogIDString() +
  485                               " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
  486                       return;
  487                   } else if (pi.equals(InvocationResponse.ABORT)) {
  488                       flowComplete(msgContext);
  489                       return;
  490                   } else if (!pi.equals(InvocationResponse.CONTINUE)) {
  491                       String errorMsg =
  492                               "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  493                       log.error(msgContext.getLogIDString() + " " + errorMsg);
  494                       throw new AxisFault(errorMsg);
  495                   }
  496               }
  497               catch (AxisFault e) {
  498                   msgContext.setFailureReason(e);
  499                   flowComplete(msgContext);
  500                   throw e;
  501               }
  502           }
  503   
  504           ArrayList<Handler> executionChain = new ArrayList<Handler>(msgContext.getConfigurationContext()
  505                   .getAxisConfiguration().getOutFaultFlowPhases());
  506           msgContext.setExecutionChain(executionChain);
  507           msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  508           InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  509   
  510           if (pi.equals(InvocationResponse.CONTINUE)) {
  511               // Actually send the SOAP Fault
  512               TransportOutDescription transportOut = msgContext.getTransportOut();
  513               if (transportOut == null) {
  514                   throw new AxisFault("Transport out has not been set");
  515               }
  516               TransportSender sender = transportOut.getSender();
  517   
  518               sender.invoke(msgContext);
  519               flowComplete(msgContext);
  520           } else if (pi.equals(InvocationResponse.SUSPEND)) {
  521           } else if (pi.equals(InvocationResponse.ABORT)) {
  522               flowComplete(msgContext);
  523           } else {
  524               String errorMsg =
  525                       "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  526               log.error(msgContext.getLogIDString() + " " + errorMsg);
  527               throw new AxisFault(errorMsg);
  528           }
  529       }
  530   
  531       /**
  532        * here we assume that it is resume from an operation level handler
  533        * @param msgContext
  534        * @throws AxisFault
  535        */
  536       public static void resumeSendFault(MessageContext msgContext) throws AxisFault{
  537           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  538               log.trace(msgContext.getLogIDString() + " resumeSendFault:" + msgContext.getMessageID());
  539           }
  540           OperationContext opContext = msgContext.getOperationContext();
  541   
  542           if (opContext != null) {
  543   
  544               try {
  545                   InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  546   
  547                   if (pi.equals(InvocationResponse.SUSPEND)) {
  548                       log.warn(msgContext.getLogIDString() +
  549                               " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
  550                       return;
  551                   } else if (pi.equals(InvocationResponse.ABORT)) {
  552                       flowComplete(msgContext);
  553                       return;
  554                   } else if (!pi.equals(InvocationResponse.CONTINUE)) {
  555                       String errorMsg =
  556                               "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  557                       log.error(msgContext.getLogIDString() + " " + errorMsg);
  558                       throw new AxisFault(errorMsg);
  559                   }
  560               } catch (AxisFault e) {
  561                   msgContext.setFailureReason(e);
  562                   flowComplete(msgContext);
  563                   throw e;
  564               }
  565           }
  566   
  567           ArrayList<Handler> executionChain = new ArrayList<Handler>(msgContext.getConfigurationContext()
  568                   .getAxisConfiguration().getOutFaultFlowPhases());
  569           msgContext.setExecutionChain(executionChain);
  570           msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  571           InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  572   
  573           if (pi.equals(InvocationResponse.CONTINUE)) {
  574               // Actually send the SOAP Fault
  575               TransportOutDescription transportOut = msgContext.getTransportOut();
  576               if (transportOut == null) {
  577                   throw new AxisFault("Transport out has not been set");
  578               }
  579               TransportSender sender = transportOut.getSender();
  580   
  581               sender.invoke(msgContext);
  582               flowComplete(msgContext);
  583           } else if (pi.equals(InvocationResponse.SUSPEND)) {
  584           } else if (pi.equals(InvocationResponse.ABORT)) {
  585               flowComplete(msgContext);
  586           } else {
  587               String errorMsg =
  588                       "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  589               log.error(msgContext.getLogIDString() + " " + errorMsg);
  590               throw new AxisFault(errorMsg);
  591           }
  592       }
  593   
  594   
  595       /**
  596        * This class is used when someone invoke a service invocation with two transports
  597        * If we dont create a new thread then the main thread will block untill it gets the
  598        * response . In the case of HTTP transportsender will block untill it gets HTTP 200
  599        * So , main thread also block till transport sender rereases the tread. So there is no
  600        * actual non-blocking. That is why when sending we creat a new thead and send the
  601        * requset via that.
  602        * <p/>
  603        * So whole porpose of this class to send the requset via a new thread
  604        * <p/>
  605        * way transport.
  606        */
  607       private static class TransportNonBlockingInvocationWorker implements Runnable {
  608           private MessageContext msgctx;
  609           private TransportSender sender;
  610   
  611           public TransportNonBlockingInvocationWorker(MessageContext msgctx,
  612               TransportSender sender) {
  613               this.msgctx = msgctx;
  614               this.sender = sender;
  615           }
  616   
  617           public void run() {
  618               try {
  619                   sender.invoke(msgctx);
  620               } catch (Exception e) {
  621                   log.info(msgctx.getLogIDString() + " " + e.getMessage());
  622                   if (msgctx.getProperty(MessageContext.DISABLE_ASYNC_CALLBACK_ON_TRANSPORT_ERROR) ==
  623                           null) {
  624                       AxisOperation axisOperation = msgctx.getAxisOperation();
  625                       if (axisOperation != null) {
  626                           MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
  627                           if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
  628                               Object callback = ((CallbackReceiver) msgReceiver)
  629                                       .lookupCallback(msgctx.getMessageID());
  630                               if (callback == null) return; // TODO: should we log this??
  631                               
  632                               if (callback instanceof Callback) {
  633                                   // Instances of Callback only expect onComplete to be called
  634                                   // for a successful MEP.  Errors are reported through the
  635                                   // Async Response object, which the Callback implementations 
  636                                   // all use.
  637                                   ((Callback)callback).onError(e);
  638                               } else {
  639                                   // The AxisCallback (which is OutInAxisOperationClient$SyncCallBack
  640                                   // used to support async-on-the-wire under a synchronous API 
  641                                   // operation) need to be told the MEP is complete after being told
  642                                   // of the error.
  643                                   ((AxisCallback)callback).onError(e);
  644                                   ((AxisCallback)callback).onComplete();
  645                               }
  646                           }
  647                       }
  648                   }
  649               }
  650           }
  651       }
  652   }

Save This Page
Home » axis2-1.5-src » org.apache » axis2 » engine » [javadoc | source]