/** * Protected API - called by nextCommand to check possibly handle an incoming Command before it is returned to the caller of nextCommand. If this method * returns true, the command is considered handled and is not passed back to nextCommand's caller; if it returns false, nextCommand returns the command as * usual. This is used in subclasses to implement handling of Basic.Return and Basic.Deliver messages, as well as Channel.Close and Connection.Close. * @param command the command to handle asynchronously * @return true if we handled the command; otherwise the caller should consider it "unhandled" */ publicabstractbooleanprocessAsync(Command command)throws IOException;
protectedfinalObject_channelMutex=newObject(); /** The connection this channel is associated with. */ privatefinal AMQConnection _connection; /** This channel's channel number. */ privatefinalint _channelNumber; /** Command being assembled */ privateAMQCommand_command=newAMQCommand(); /** The current outstanding RPC request, if any. (Could become a queue in future.) */ privateRpcContinuation_activeRpc=null; /** Whether transmission of content-bearing methods should be blocked */ publicvolatileboolean_blockContent=false;
/** * Private API - When the Connection receives a Frame for this * channel, it passes it to this method. * @param frame the incoming frame * @throws IOException if an error is encountered */ publicvoidhandleFrame(Frame frame)throws IOException { AMQCommandcommand= _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = newAMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); } }
/** * Private API - handle a command which has been assembled * @throws IOException if there's any problem * * @param command the incoming command * @throws IOException */ publicvoidhandleCompleteInboundCommand(AMQCommand command)throws IOException { // First, offer the command to the asynchronous-command // handling mechanism, which gets to act as a filter on the // incoming command stream. If processAsync() returns true, // the command has been dealt with by the filter and so should // not be processed further. It will return true for // asynchronous commands (deliveries/returns/other events), // and false for commands that should be passed on to some // waiting RPC continuation. if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. nextOutstandingRpc().handleCommand(command); markRpcFinished(); } }
/** * Protected API - sends a {@link Method} to the broker and waits for the * next in-bound Command from the broker: only for use from * non-connection-MainLoop threads! */ public AMQCommand rpc(Method m) throws IOException, ShutdownSignalException { return privateRpc(m); }
public AMQCommand rpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { return privateRpc(m, timeout); }
private AMQCommand privateRpc(Method m) throws IOException, ShutdownSignalException { SimpleBlockingRpcContinuationk=newSimpleBlockingRpcContinuation(); rpc(m, k); // At this point, the request method has been sent, and we // should wait for the reply to arrive. // // Calling getReply() on the continuation puts us to sleep // until the connection's reader-thread throws the reply over // the fence. return k.getReply(); }
private AMQCommand privateRpc(Method m, int timeout) throws IOException, ShutdownSignalException, TimeoutException { SimpleBlockingRpcContinuationk=newSimpleBlockingRpcContinuation(); rpc(m, k);
// This is to catch a situation when the thread wakes up during // shutdown. Currently, no command that has content is allowed // to send anything in a closing state. ensureIsOpen(); } } c.transmit(this); } }
客户端将Method封装成Connection.StartOk帧之后等待broker返回Connection.Tune帧。 此时调用了AMQChannel的rpc(Method m, int timeout)方法,其间接调用了AMQChannel的privateRpc(Method, int timeout)方法。代码详情上面已经罗列出来。
注意privateRpc(Method, int timeout)方法的最有一句返回:return k.getReply(timeout);这句代码的意思是SimpleBlockingRpcContinuation对象在等待broker的返回,确切的来说是MainLoop线程处理之后返回,即AMQChannel类中handleCompleteInboundCommand方法的nextOutstandingRpc().handleCommand(command)这行代码。