Home » axis2-1.4-src » org.apache » axis2 » engine » [javadoc | source]
    1   /*
    2    * Licensed to the Apache Software Foundation (ASF) under one
    3    * or more contributor license agreements. See the NOTICE file
    4    * distributed with this work for additional information
    5    * regarding copyright ownership. The ASF licenses this file
    6    * to you under the Apache License, Version 2.0 (the
    7    * "License"); you may not use this file except in compliance
    8    * with the License. You may obtain a copy of the License at
    9    *
   10    * http://www.apache.org/licenses/LICENSE-2.0
   11    *
   12    * Unless required by applicable law or agreed to in writing,
   13    * software distributed under the License is distributed on an
   14    * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
   15    * KIND, either express or implied. See the License for the
   16    * specific language governing permissions and limitations
   17    * under the License.
   18    */
   19   
   20   
   21   package org.apache.axis2.engine;
   22   
   23   import org.apache.axiom.soap.RolePlayer;
   24   import org.apache.axiom.soap.SOAPEnvelope;
   25   import org.apache.axiom.soap.SOAPHeaderBlock;
   26   import org.apache.axis2.AxisFault;
   27   import org.apache.axis2.Constants;
   28   import org.apache.axis2.client.async.AxisCallback;
   29   import org.apache.axis2.client.async.Callback;
   30   import org.apache.axis2.context.ConfigurationContext;
   31   import org.apache.axis2.context.MessageContext;
   32   import org.apache.axis2.context.OperationContext;
   33   import org.apache.axis2.description.AxisOperation;
   34   import org.apache.axis2.description.TransportOutDescription;
   35   import org.apache.axis2.description.WSDL2Constants;
   36   import org.apache.axis2.engine.Handler.InvocationResponse;
   37   import org.apache.axis2.i18n.Messages;
   38   import org.apache.axis2.transport.TransportSender;
   39   import org.apache.axis2.util.CallbackReceiver;
   40   import org.apache.axis2.util.LoggingControl;
   41   import org.apache.axis2.util.MessageContextBuilder;
   42   import org.apache.axis2.wsdl.WSDLConstants;
   43   import org.apache.commons.logging.Log;
   44   import org.apache.commons.logging.LogFactory;
   45   
   46   import javax.xml.namespace.QName;
   47   import java.util.ArrayList;
   48   import java.util.Iterator;
   49   import java.util.List;
   50   
   51   /**
   52    * There is one engine for the Server and the Client. the send() and receive()
   53    * Methods are the basic operations the Sync, Async messageing are build on top.
   54    */
   55   public class AxisEngine {
   56   
   57       /**
   58        * Field log
   59        */
   60       private static final Log log = LogFactory.getLog(AxisEngine.class);
   61   
   62       private static boolean RESUMING_EXECUTION = true;
   63       private static boolean NOT_RESUMING_EXECUTION = false;
   64   
   65       /**
   66        * Constructor AxisEngine
   67        */
   68       public AxisEngine(ConfigurationContext engineContext) {
   69       }
   70   
   71       private static void checkMustUnderstand(MessageContext msgContext) throws AxisFault {
   72           List unprocessed = null;
   73           SOAPEnvelope envelope = msgContext.getEnvelope();
   74           if (envelope.getHeader() == null) {
   75               return;
   76           }
   77           // Get all the headers targeted to us
   78           Iterator headerBlocks = envelope.getHeader().getHeadersToProcess((RolePlayer)msgContext.getConfigurationContext().getAxisConfiguration().getParameterValue("rolePlayer"));
   79           while (headerBlocks.hasNext()) {
   80               SOAPHeaderBlock headerBlock = (SOAPHeaderBlock) headerBlocks.next();
   81               QName headerName = headerBlock.getQName();
   82               // if this header block has been processed or mustUnderstand isn't
   83               // turned on then its cool
   84               if (headerBlock.isProcessed() || !headerBlock.getMustUnderstand()) {
   85                   continue;
   86               }
   87   
   88               if(LoggingControl.debugLoggingAllowed && log.isDebugEnabled()){
   89                   log.debug("MustUnderstand header not processed or registered as understood"+headerName);
   90               }
   91               if(isReceiverMustUnderstandProcessor(msgContext)){
   92                   if(unprocessed == null){
   93                       unprocessed = new ArrayList();
   94                   }
   95                   if(!unprocessed.contains(headerName)){
   96                       unprocessed.add(headerName);
   97                   }
   98                   continue;
   99               }
  100               // Oops, throw an appropriate MustUnderstand fault!!
  101               QName faultQName = headerBlock.getVersion().getMustUnderstandFaultCode();
  102               throw new AxisFault(Messages.getMessage("mustunderstandfailed",
  103                   headerBlock.getNamespace().getNamespaceURI(),
  104                   headerBlock.getLocalName()), faultQName);
  105           }
  106           if(unprocessed !=null && unprocessed.size()>0){
  107               //Adding HeaderQNames that failed MU check as AxisService Parameter
  108               //They will be examined later by MessageReceivers.
  109               if(log.isDebugEnabled()){
  110                   log.debug("Adding Unprocessed headers to MessageContext.");
  111               }
  112               msgContext.setProperty(Constants.UNPROCESSED_HEADER_QNAMES, unprocessed);           
  113           }       
  114       }
  115   
  116       private static boolean isReceiverMustUnderstandProcessor(MessageContext msgContext){
  117           MessageReceiver receiver = null;
  118           if(msgContext.isServerSide()){
  119               receiver = msgContext.getAxisOperation().getMessageReceiver();
  120           }
  121           return (receiver!=null && receiver.getClass().getName().endsWith("JAXWSMessageReceiver"));
  122       }
  123       /**
  124        * This method is called to handle any error that occurs at inflow or outflow. But if the
  125        * method is called twice, it implies that sending the error handling has failed, in which case
  126        * the method logs the error and exists.
  127        *
  128        * @deprecated (post 1.1 branch)
  129        */
  130       public static MessageContext createFaultMessageContext(MessageContext processingContext, Throwable e)
  131               throws AxisFault {
  132           return MessageContextBuilder.createFaultMessageContext(processingContext, e);
  133       }
  134   
  135       /**
  136        * This methods represents the inflow of the Axis, this could be either at the server side or the client side.
  137        * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
  138        * deployment time by the deployment module
  139        *
  140        * @throws AxisFault
  141        * @see MessageContext
  142        * @see Phase
  143        * @see Handler
  144        */
  145       public static InvocationResponse receive(MessageContext msgContext) throws AxisFault {
  146           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  147               log.trace(msgContext.getLogIDString() + " receive:" + msgContext.getMessageID());
  148           }
  149           ConfigurationContext confContext = msgContext.getConfigurationContext();
  150           ArrayList preCalculatedPhases;
  151           if (msgContext.isFault() || msgContext.isProcessingFault()) {
  152               preCalculatedPhases = confContext.getAxisConfiguration().getInFaultFlowPhases();
  153               msgContext.setFLOW(MessageContext.IN_FAULT_FLOW);
  154           } else {
  155               preCalculatedPhases = confContext.getAxisConfiguration().getInFlowPhases();
  156               msgContext.setFLOW(MessageContext.IN_FLOW);
  157           }
  158           // Set the initial execution chain in the MessageContext to a *copy* of what
  159           // we got above.  This allows individual message processing to change the chain without
  160           // affecting later messages.
  161           msgContext.setExecutionChain((ArrayList) preCalculatedPhases.clone());
  162           try {
  163               InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  164   
  165               if (pi.equals(InvocationResponse.CONTINUE)) {
  166                   checkMustUnderstand(msgContext);
  167                   if (msgContext.isServerSide()) {
  168                       // invoke the Message Receivers
  169   
  170                       MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
  171                       if (receiver == null) {
  172                           throw new AxisFault(Messages.getMessage(
  173                                   "nomessagereciever",
  174                                   msgContext.getAxisOperation().getName().toString()));
  175                       }
  176                       receiver.receive(msgContext);
  177                   }
  178                   flowComplete(msgContext);
  179               } else if (pi.equals(InvocationResponse.SUSPEND)) {
  180                   return pi;
  181               } else if (pi.equals(InvocationResponse.ABORT)) {
  182                   flowComplete(msgContext);
  183                   // Undo any partial work.
  184                   // Remove the incoming message context
  185                   if (log.isDebugEnabled()) {
  186                       log.debug("InvocationResponse is aborted.  " +
  187                                   "The incoming MessageContext is removed, " +
  188                                   "and the OperationContext is marked as incomplete");
  189                   }
  190   				AxisOperation axisOp = msgContext.getAxisOperation();
  191                   if(axisOp!=null){
  192   					String mepURI  = axisOp.getMessageExchangePattern();
  193   					if (WSDL2Constants.MEP_URI_OUT_IN.equals(mepURI)) {
  194   						OperationContext opCtx = msgContext.getOperationContext();
  195   						if (opCtx != null) {
  196   							opCtx.removeMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
  197   						}
  198   					}
  199   				}
  200   				else{
  201   					log.debug("Could not clean up op ctx for " + msgContext);
  202   				}
  203                   return pi;
  204               } else {
  205                   String errorMsg =
  206                           "Unrecognized InvocationResponse encountered in AxisEngine.receive()";
  207                   log.error(msgContext.getLogIDString() + " " + errorMsg);
  208                   throw new AxisFault(errorMsg);
  209               }
  210           }
  211           catch (AxisFault e) {
  212               log.error(e.getMessage(), e);
  213               msgContext.setFailureReason(e);
  214               flowComplete(msgContext);
  215               throw e;
  216           }
  217   
  218           return InvocationResponse.CONTINUE;
  219       }
  220   
  221       private static void processFault(MessageContext msgContext, AxisFault e) {
  222           try {
  223               MessageContext faultMC = MessageContextBuilder.createFaultMessageContext(msgContext, e);
  224   
  225               // Figure out where this goes
  226               sendFault(faultMC);
  227           } catch (AxisFault axisFault) {
  228               log.error(axisFault.getMessage(), axisFault);
  229           }
  230       }
  231   
  232       /**
  233        * Take the execution chain from the msgContext , and then take the current Index
  234        * and invoke all the phases in the arraylist
  235        * if the msgContext is pauesd then the execution will be breaked
  236        *
  237        * @param msgContext
  238        * @return An InvocationResponse that indicates what
  239        *         the next step in the message processing should be.
  240        * @throws AxisFault
  241        */
  242       private static InvocationResponse invoke(MessageContext msgContext, boolean resuming)
  243               throws AxisFault {
  244   
  245           if (msgContext.getCurrentHandlerIndex() == -1) {
  246               msgContext.setCurrentHandlerIndex(0);
  247           }
  248   
  249           InvocationResponse pi = InvocationResponse.CONTINUE;
  250   
  251           while (msgContext.getCurrentHandlerIndex() < msgContext.getExecutionChain().size()) {
  252               Handler currentHandler = (Handler) msgContext.getExecutionChain().
  253                       get(msgContext.getCurrentHandlerIndex());
  254   
  255               try {
  256                   if (!resuming) {
  257                       msgContext.addExecutedPhase(currentHandler);
  258                   } else {
  259                       /* If we are resuming the flow, we don't want to add the phase
  260                       * again, as it has already been added.
  261                       */
  262                       resuming = false;
  263                   }
  264                   pi = currentHandler.invoke(msgContext);
  265               }
  266               catch (AxisFault e) {
  267                   if (msgContext.getCurrentPhaseIndex() == 0) {
  268                       /* If we got a fault, we still want to add the phase to the
  269                       list to be executed for flowComplete(...) unless this was
  270                       the first handler, as then the currentPhaseIndex will be
  271                       set to 0 and this will look like we've executed all of the
  272                       handlers.  If, at some point, a phase really needs to get
  273                       notification of flowComplete, then we'll need to introduce
  274                       some more complex logic to keep track of what has been
  275                       executed.*/
  276                       msgContext.removeFirstExecutedPhase();
  277                   }
  278                   throw e;
  279               }
  280   
  281               if (pi.equals(InvocationResponse.SUSPEND) ||
  282                       pi.equals(InvocationResponse.ABORT)) {
  283                   break;
  284               }
  285   
  286               msgContext.setCurrentHandlerIndex(msgContext.getCurrentHandlerIndex() + 1);
  287           }
  288   
  289           return pi;
  290       }
  291   
  292       private static void flowComplete(MessageContext msgContext) {
  293           Iterator invokedPhaseIterator = msgContext.getExecutedPhases();
  294   
  295           while (invokedPhaseIterator.hasNext()) {
  296               Handler currentHandler = ((Handler) invokedPhaseIterator.next());
  297               currentHandler.flowComplete(msgContext);
  298           }
  299   
  300           /*This is needed because the OutInAxisOperation currently invokes
  301           * receive() even when a fault occurs, and we will have already executed
  302           * the flowComplete on those before receiveFault() is called.
  303           */
  304           msgContext.resetExecutedPhases();
  305       }
  306   
  307       /**
  308        * If the msgConetext is puased and try to invoke then
  309        * first invoke the phase list and after the message receiver
  310        *
  311        * @param msgContext
  312        * @return An InvocationResponse allowing the invoker to perhaps determine
  313        *         whether or not the message processing will ever succeed.
  314        * @throws AxisFault
  315        */
  316       public static InvocationResponse resumeReceive(MessageContext msgContext) throws AxisFault {
  317           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  318               log.trace(msgContext.getLogIDString() + " resumeReceive:" + msgContext.getMessageID());
  319           }
  320   
  321           //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeReceiveFault as well, when, in fact, this does both
  322           //REVIEW: Unlike with receive, there is no wrapping try/catch clause which would
  323           //fire off the flowComplete on an error, as we have to assume that the
  324           //message will be resumed again, but perhaps we need to unwind back to
  325           //the point at which the message was resumed and provide another API
  326           //to allow the full unwind if the message is going to be discarded.
  327           //invoke the phases
  328           InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  329           //invoking the MR
  330   
  331           if (pi.equals(InvocationResponse.CONTINUE)) {
  332               checkMustUnderstand(msgContext);
  333               if (msgContext.isServerSide()) {
  334                   // invoke the Message Receivers
  335                   MessageReceiver receiver = msgContext.getAxisOperation().getMessageReceiver();
  336                   if (receiver == null) {
  337                       throw new AxisFault(Messages.getMessage(
  338                               "nomessagereciever",
  339                               msgContext.getAxisOperation().getName().toString()));
  340                   }
  341                   receiver.receive(msgContext);
  342               }
  343               flowComplete(msgContext);
  344           }
  345   
  346           return pi;
  347       }
  348   
  349       /**
  350        * To resume the invocation at the send path , this is neened since it is require to call
  351        * TransportSender at the end
  352        *
  353        * @param msgContext
  354        * @return An InvocationResponse allowing the invoker to perhaps determine
  355        *         whether or not the message processing will ever succeed.
  356        * @throws AxisFault
  357        */
  358       public static InvocationResponse resumeSend(MessageContext msgContext) throws AxisFault {
  359           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  360               log.trace(msgContext.getLogIDString() + " resumeSend:" + msgContext.getMessageID());
  361           }
  362   
  363           //REVIEW: This name is a little misleading, as it seems to indicate that there should be a resumeSendFault as well, when, in fact, this does both
  364           //REVIEW: Unlike with send, there is no wrapping try/catch clause which would
  365           //fire off the flowComplete on an error, as we have to assume that the
  366           //message will be resumed again, but perhaps we need to unwind back to
  367           //the point at which the message was resumed and provide another API
  368           //to allow the full unwind if the message is going to be discarded.
  369           //invoke the phases
  370           InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  371           //Invoking Transport Sender
  372           if (pi.equals(InvocationResponse.CONTINUE)) {
  373               // write the Message to the Wire
  374               TransportOutDescription transportOut = msgContext.getTransportOut();
  375               TransportSender sender = transportOut.getSender();
  376               sender.invoke(msgContext);
  377               flowComplete(msgContext);
  378           }
  379   
  380           return pi;
  381       }
  382   
  383       /**
  384        * Resume processing of a message.
  385        *
  386        * @param msgctx
  387        * @return An InvocationResponse allowing the invoker to perhaps determine
  388        *         whether or not the message processing will ever succeed.
  389        * @throws AxisFault
  390        */
  391       public static InvocationResponse resume(MessageContext msgctx) throws AxisFault {
  392           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  393               log.trace(msgctx.getLogIDString() + " resume:" + msgctx.getMessageID());
  394           }
  395   
  396           msgctx.setPaused(false);
  397           if (msgctx.getFLOW() == MessageContext.IN_FLOW) {
  398               return resumeReceive(msgctx);
  399           } else {
  400               return resumeSend(msgctx);
  401           }
  402       }
  403   
  404       /**
  405        * This methods represents the outflow of the Axis, this could be either at the server side or the client side.
  406        * Here the <code>ExecutionChain</code> is created using the Phases. The Handlers at the each Phases is ordered in
  407        * deployment time by the deployment module
  408        *
  409        * @param msgContext
  410        * @throws AxisFault
  411        * @see MessageContext
  412        * @see Phase
  413        * @see Handler
  414        */
  415       public static void send(MessageContext msgContext) throws AxisFault {
  416           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  417               log.trace(msgContext.getLogIDString() + " send:" + msgContext.getMessageID());
  418           }
  419           // find and invoke the Phases
  420           OperationContext operationContext = msgContext.getOperationContext();
  421           ArrayList executionChain = operationContext.getAxisOperation().getPhasesOutFlow();
  422           //rather than having two steps added both oparation and global chain together
  423           ArrayList outPhases = new ArrayList();
  424           outPhases.addAll(executionChain);
  425           outPhases.addAll(msgContext.getConfigurationContext().getAxisConfiguration().getOutFlowPhases());
  426           msgContext.setExecutionChain(outPhases);
  427           msgContext.setFLOW(MessageContext.OUT_FLOW);
  428           try {
  429               InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  430   
  431               if (pi.equals(InvocationResponse.CONTINUE)) {
  432                   // write the Message to the Wire
  433                   TransportOutDescription transportOut = msgContext.getTransportOut();
  434                   if (transportOut == null) {
  435                       throw new AxisFault("Transport out has not been set");
  436                   }
  437                   TransportSender sender = transportOut.getSender();
  438                   // This boolean property only used in client side fireAndForget invocation
  439                   //It will set a property into message context and if some one has set the
  440                   //property then transport sender will invoke in a diffrent thread
  441                   Object isTransportNonBlocking = msgContext.getProperty(
  442                           MessageContext.TRANSPORT_NON_BLOCKING);
  443                   if (isTransportNonBlocking != null &&
  444                           ((Boolean) isTransportNonBlocking).booleanValue()) {
  445                       msgContext.getConfigurationContext().getThreadPool().execute(
  446                               new TransportNonBlockingInvocationWorker(msgContext, sender));
  447                   } else {
  448                       sender.invoke(msgContext);
  449                   }
  450                   //REVIEW: In the case of the TransportNonBlockingInvocationWorker, does this need to wait until that finishes?
  451                   flowComplete(msgContext);
  452               } else if (pi.equals(InvocationResponse.SUSPEND)) {
  453               } else if (pi.equals(InvocationResponse.ABORT)) {
  454                   flowComplete(msgContext);
  455               } else {
  456                   String errorMsg =
  457                           "Unrecognized InvocationResponse encountered in AxisEngine.send()";
  458                   log.error(msgContext.getLogIDString() + " " + errorMsg);
  459                   throw new AxisFault(errorMsg);
  460               }
  461           } catch (AxisFault e) {
  462               msgContext.setFailureReason(e);
  463               flowComplete(msgContext);
  464               throw e;
  465           }
  466       }
  467   
  468       /**
  469        * Sends the SOAP Fault to another SOAP node.
  470        *
  471        * @param msgContext
  472        * @throws AxisFault
  473        */
  474       public static void sendFault(MessageContext msgContext) throws AxisFault {
  475           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  476               log.trace(msgContext.getLogIDString() + " sendFault:" + msgContext.getMessageID());
  477           }
  478           OperationContext opContext = msgContext.getOperationContext();
  479   
  480           //FIXME: If this gets paused in the operation-specific phases, the resume is not going to function correctly as the phases will not have all been set
  481   
  482           // find and execute the Fault Out Flow Handlers
  483           if (opContext != null) {
  484               AxisOperation axisOperation = opContext.getAxisOperation();
  485               ArrayList faultExecutionChain = axisOperation.getPhasesOutFaultFlow();
  486   
  487               //adding both operation specific and global out fault flows.
  488   
  489               ArrayList outFaultPhases = new ArrayList();
  490               outFaultPhases.addAll((ArrayList) faultExecutionChain.clone());
  491               msgContext.setExecutionChain((ArrayList) outFaultPhases.clone());
  492               msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  493               try {
  494                   InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  495   
  496                   if (pi.equals(InvocationResponse.SUSPEND)) {
  497                       log.warn(msgContext.getLogIDString() +
  498                               " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
  499                       return;
  500                   } else if (pi.equals(InvocationResponse.ABORT)) {
  501                       flowComplete(msgContext);
  502                       return;
  503                   } else if (!pi.equals(InvocationResponse.CONTINUE)) {
  504                       String errorMsg =
  505                               "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  506                       log.error(msgContext.getLogIDString() + " " + errorMsg);
  507                       throw new AxisFault(errorMsg);
  508                   }
  509               }
  510               catch (AxisFault e) {
  511                   msgContext.setFailureReason(e);
  512                   flowComplete(msgContext);
  513                   throw e;
  514               }
  515           }
  516   
  517           msgContext.setExecutionChain((ArrayList) msgContext.getConfigurationContext()
  518                   .getAxisConfiguration().getOutFaultFlowPhases().clone());
  519           msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  520           InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  521   
  522           if (pi.equals(InvocationResponse.CONTINUE)) {
  523               // Actually send the SOAP Fault
  524               TransportOutDescription transportOut = msgContext.getTransportOut();
  525               if (transportOut == null) {
  526                   throw new AxisFault("Transport out has not been set");
  527               }
  528               TransportSender sender = transportOut.getSender();
  529   
  530               sender.invoke(msgContext);
  531               flowComplete(msgContext);
  532           } else if (pi.equals(InvocationResponse.SUSPEND)) {
  533           } else if (pi.equals(InvocationResponse.ABORT)) {
  534               flowComplete(msgContext);
  535           } else {
  536               String errorMsg =
  537                       "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  538               log.error(msgContext.getLogIDString() + " " + errorMsg);
  539               throw new AxisFault(errorMsg);
  540           }
  541       }
  542   
  543       /**
  544        * here we assume that it is resume from an operation level handler
  545        * @param msgContext
  546        * @throws AxisFault
  547        */
  548       public static void resumeSendFault(MessageContext msgContext) throws AxisFault{
  549           if (LoggingControl.debugLoggingAllowed && log.isTraceEnabled()) {
  550               log.trace(msgContext.getLogIDString() + " resumeSendFault:" + msgContext.getMessageID());
  551           }
  552           OperationContext opContext = msgContext.getOperationContext();
  553   
  554           if (opContext != null) {
  555   
  556               try {
  557                   InvocationResponse pi = invoke(msgContext, RESUMING_EXECUTION);
  558   
  559                   if (pi.equals(InvocationResponse.SUSPEND)) {
  560                       log.warn(msgContext.getLogIDString() +
  561                               " The resumption of this flow may function incorrectly, as the OutFaultFlow will not be used");
  562                       return;
  563                   } else if (pi.equals(InvocationResponse.ABORT)) {
  564                       flowComplete(msgContext);
  565                       return;
  566                   } else if (!pi.equals(InvocationResponse.CONTINUE)) {
  567                       String errorMsg =
  568                               "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  569                       log.error(msgContext.getLogIDString() + " " + errorMsg);
  570                       throw new AxisFault(errorMsg);
  571                   }
  572               } catch (AxisFault e) {
  573                   msgContext.setFailureReason(e);
  574                   flowComplete(msgContext);
  575                   throw e;
  576               }
  577           }
  578   
  579           msgContext.setExecutionChain((ArrayList) msgContext.getConfigurationContext()
  580                   .getAxisConfiguration().getOutFaultFlowPhases().clone());
  581           msgContext.setFLOW(MessageContext.OUT_FAULT_FLOW);
  582           InvocationResponse pi = invoke(msgContext, NOT_RESUMING_EXECUTION);
  583   
  584           if (pi.equals(InvocationResponse.CONTINUE)) {
  585               // Actually send the SOAP Fault
  586               TransportOutDescription transportOut = msgContext.getTransportOut();
  587               if (transportOut == null) {
  588                   throw new AxisFault("Transport out has not been set");
  589               }
  590               TransportSender sender = transportOut.getSender();
  591   
  592               sender.invoke(msgContext);
  593               flowComplete(msgContext);
  594           } else if (pi.equals(InvocationResponse.SUSPEND)) {
  595           } else if (pi.equals(InvocationResponse.ABORT)) {
  596               flowComplete(msgContext);
  597           } else {
  598               String errorMsg =
  599                       "Unrecognized InvocationResponse encountered in AxisEngine.sendFault()";
  600               log.error(msgContext.getLogIDString() + " " + errorMsg);
  601               throw new AxisFault(errorMsg);
  602           }
  603       }
  604   
  605   
  606       /**
  607        * This class is used when someone invoke a service invocation with two transports
  608        * If we dont create a new thread then the main thread will block untill it gets the
  609        * response . In the case of HTTP transportsender will block untill it gets HTTP 200
  610        * So , main thread also block till transport sender rereases the tread. So there is no
  611        * actual non-blocking. That is why when sending we creat a new thead and send the
  612        * requset via that.
  613        * <p/>
  614        * So whole porpose of this class to send the requset via a new thread
  615        * <p/>
  616        * way transport.
  617        */
  618       private static class TransportNonBlockingInvocationWorker implements Runnable {
  619           private MessageContext msgctx;
  620           private TransportSender sender;
  621   
  622           public TransportNonBlockingInvocationWorker(MessageContext msgctx,
  623               TransportSender sender) {
  624               this.msgctx = msgctx;
  625               this.sender = sender;
  626           }
  627   
  628           public void run() {
  629               try {
  630                   sender.invoke(msgctx);
  631               } catch (Exception e) {
  632                   log.info(msgctx.getLogIDString() + " " + e.getMessage());
  633                   if (msgctx.getProperty(MessageContext.DISABLE_ASYNC_CALLBACK_ON_TRANSPORT_ERROR) ==
  634                           null) {
  635                       AxisOperation axisOperation = msgctx.getAxisOperation();
  636                       if (axisOperation != null) {
  637                           MessageReceiver msgReceiver = axisOperation.getMessageReceiver();
  638                           if ((msgReceiver != null) && (msgReceiver instanceof CallbackReceiver)) {
  639                               Object callback = ((CallbackReceiver) msgReceiver)
  640                                       .lookupCallback(msgctx.getMessageID());
  641                               if (callback == null) return; // TODO: should we log this??
  642   
  643                               if (callback instanceof Callback) {
  644                                   ((Callback)callback).onError(e);
  645                               } else {
  646                                   ((AxisCallback)callback).onError(e);
  647                               }
  648                           }
  649                       }
  650                   }
  651               }
  652           }
  653       }
  654   }

Save This Page
Home » axis2-1.4-src » org.apache » axis2 » engine » [javadoc | source]