/** * Start up the connection, including the MainLoop thread. * Sends the protocol * version negotiation header, and runs through * Connection.Start/.StartOk, Connection.Tune/.TuneOk, and then * calls Connection.Open and waits for the OpenOk. Sets heart-beat * and frame max values after tuning has taken place. * @throws IOException if an error is encountered * either before, or during, protocol negotiation; * sub-classes {@link ProtocolVersionMismatchException} and * {@link PossibleAuthenticationFailureException} will be thrown in the * corresponding circumstances. {@link AuthenticationFailureException} * will be thrown if the broker closes the connection with ACCESS_REFUSED. * If an exception is thrown, connection resources allocated can all be * garbage collected when the connection object is no longer referenced. */
首先来看看方法上的注释说了什么:
方法的作用是启动连接(start up the connection), 包括启动MainLoop线程,这个MainLoop线程主要是和broker进行通信交互处理通信帧(Frame)的一个线程(非常的重要!!!)。
publicvoidstart() throws IOException, TimeoutException { initializeConsumerWorkService(); initializeHeartbeatSender(); this._running = true; // Make sure that the first thing we do is to send the header, // which should cause any socket errors to show up for us, rather // than risking them pop out in the MainLoop AMQChannel.SimpleBlockingRpcContinuationconnStartBlocker= newAMQChannel.SimpleBlockingRpcContinuation(); // We enqueue an RPC continuation here without sending an RPC // request, since the protocol specifies that after sending // the version negotiation header, the client (connection // initiator) is to wait for a connection.start method to // arrive. _channel0.enqueueRpc(connStartBlocker);
try { // The following two lines are akin to AMQChannel's // transmit() method for this pseudo-RPC. _frameHandler.setTimeout(HANDSHAKE_TIMEOUT); _frameHandler.sendHeader(); } catch (IOException ioe) { _frameHandler.close(); throw ioe; }
// start the main loop going MainLooploop=newMainLoop(); finalStringname="AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); // after this point clear-up of MainLoop is triggered by closing the frameHandler.
// We can now respond to errors having finished tailoring the connection this._inConnectionNegotiation = false;
return; }
接着回顾MainLoop, 在start()方法中关于MainLoop的代码主要有:
1 2 3 4 5 6
// start the main loop going MainLooploop=newMainLoop(); finalStringname="AMQP Connection " + getHostAddress() + ":" + getPort(); mainLoopThread = Environment.newThread(threadFactory, loop, name); mainLoopThread.start(); // after this point clear-up of MainLoop is triggered by closing the frameHandler.
privateclassMainLoopimplementsRunnable { /** * Channel reader thread main loop. Reads a frame, and if it is * not a heartbeat frame, dispatches it to the channel it refers to. * Continues running until the "running" flag is set false by * shutdown(). */ publicvoidrun() { try { while (_running) { Frameframe= _frameHandler.readFrame();
if (frame != null) { _missedHeartbeats = 0; if (frame.type == AMQP.FRAME_HEARTBEAT) { // Ignore it: we've already just reset the heartbeat counter. } else { if (frame.channel == 0) { // the special channel _channel0.handleFrame(frame); } else { if (isOpen()) { // If we're still _running, but not isOpen(), then we // must be quiescing, which means any inbound frames // for non-zero channels (and any inbound commands on // channel zero that aren't Connection.CloseOk) must // be discarded. ChannelManagercm= _channelManager; if (cm != null) { cm.getChannel(frame.channel).handleFrame(frame); } } } } } else { // Socket timeout waiting for a frame. // Maybe missed heartbeat. handleSocketTimeout(); } } } catch (EOFException ex) { if (!_brokerInitiatedShutdown) shutdown(null, false, ex, true); } catch (Throwable ex) { _exceptionHandler.handleUnexpectedConnectionDriverException(AMQConnection.this, ex); shutdown(null, false, ex, true); } finally { // Finally, shut down our underlying data connection. _frameHandler.close(); _appContinuation.set(null); notifyListeners(); } } }
publicvoidhandleFrame(Frame frame)throws IOException { AMQCommandcommand= _command; if (command.handleFrame(frame)) { // a complete command has rolled off the assembly line _command = newAMQCommand(); // prepare for the next one handleCompleteInboundCommand(command); } }
publicvoidhandleCompleteInboundCommand(AMQCommand command)throws IOException { // First, offer the command to the asynchronous-command // handling mechanism, which gets to act as a filter on the // incoming command stream. If processAsync() returns true, // the command has been dealt with by the filter and so should // not be processed further. It will return true for // asynchronous commands (deliveries/returns/other events), // and false for commands that should be passed on to some // waiting RPC continuation. if (!processAsync(command)) { // The filter decided not to handle/consume the command, // so it must be some reply to an earlier RPC. nextOutstandingRpc().handleCommand(command); markRpcFinished(); } }
/** * Handles incoming control commands on channel zero. * @see ChannelN#processAsync */ @SuppressWarnings("unused") publicbooleanprocessControlCommand(Command c)throws IOException { // Similar trick to ChannelN.processAsync used here, except // we're interested in whole-connection quiescing.
// See the detailed comments in ChannelN.processAsync.
Methodmethod= c.getMethod();
if (isOpen()) { if (method instanceof AMQP.Connection.Close) { handleConnectionClose(c); returntrue; } elseif (method instanceof AMQP.Connection.Blocked) { AMQP.Connection.Blockedblocked= (AMQP.Connection.Blocked) method; try { for (BlockedListener l : this.blockedListeners) { l.handleBlocked(blocked.getReason()); } } catch (Throwable ex) { getExceptionHandler().handleBlockedListenerException(this, ex); } returntrue; } elseif (method instanceof AMQP.Connection.Unblocked) { try { for (BlockedListener l : this.blockedListeners) { l.handleUnblocked(); } } catch (Throwable ex) { getExceptionHandler().handleBlockedListenerException(this, ex); } returntrue; } else { returnfalse; } } else { if (method instanceof AMQP.Connection.Close) { // Already shutting down, so just send back a CloseOk. try { _channel0.quiescingTransmit(newAMQP.Connection.CloseOk.Builder().build()); } catch (IOException _e) { } // ignore returntrue; } elseif (method instanceof AMQP.Connection.CloseOk) { // It's our final "RPC". Time to shut down. _running = false; // If Close was sent from within the MainLoop we // will not have a continuation to return to, so // we treat this as processed in that case. return !_channel0.isOutstandingRpc(); } else { // Ignore all others. returntrue; } } }