八、RabbitMQ-客户端源码之ChannelN 云少 2020-10-08 2020-10-08 ChannelN是整个RabbitMQ客户端最核心的一个类了,其包含的功能点甚多,这里需要分类阐述。 首先来看看ChannelN的成员变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 private final Map<String, Consumer> _consumers = Collections.synchronizedMap(new HashMap <String, Consumer>());private volatile Consumer defaultConsumer = null ;private final ConsumerDispatcher dispatcher;private final Collection<ReturnListener> returnListeners = new CopyOnWriteArrayList <ReturnListener>();private final Collection<FlowListener> flowListeners = new CopyOnWriteArrayList <FlowListener>();private volatile CountDownLatch finishedShutdownFlag = null ;private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList <ConfirmListener>();private long nextPublishSeqNo = 0L ;private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet <Long>());private volatile boolean onlyAcksReceived = true ;
源代码中有关ChannelN的呈现顺序有所不同,这里博主为了区分开来,重新排了序。
processAsync(Command command) 在AMQChannel这个抽象类中唯一的抽象方法即为此方法,这个方法主要用来针对接受到broker的AMQCommand进行进一步的处理,至于怎么接受Socket,怎么封装成帧,怎么确定一个AMQComand已经封装完毕,都已在调用此方法前完成。此方法可以处理:Channel.Close, Basic.Deliver, Basic.Return, Channel.Flow, Basic.Ack, Basic.Nack, Basic.RecoverOk, Basic.Cancel, Channel.CloseOk等这些从broker端回传的AMQComand. 这个方法也比较长,下面也会涉及到这个方法内的内容。
Confirm.Select & Basic.Publish 在RabbitMQ之消息确认机制(事务+Confirm) 这篇文章中,博主就讲到RabbitMQ的producer端确认机制分为事务机制和Confirm机制,这里就来阐述下Confirm机制的内部实现。 和Confirm机制有关的成员变量有:
1 2 3 4 private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList <ConfirmListener>();private long nextPublishSeqNo = 0L ;private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet <Long>());private volatile boolean onlyAcksReceived = true ;
在使用Confirm机制的时候,首先要置Channel为Confirm模式,即向broker端发送Confirm.Select。 业务代码(DEMO实例):
1 2 3 4 5 6 7 8 9 10 11 12 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener () { public void handleAck (long deliveryTag, boolean multiple) throws IOException { } public void handleNack (long deliveryTag, boolean multiple) throws IOException { } }); String message = "RabbitMQ Demo Test:" + System.currentTimeMillis();channel.basicPublish(EXCHANGE_NAME, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); channel.waitForConfirms();
在创建完Channel之后调用channel.confirmSelect()方法即可,confirmSelect()代码如下:
1 2 3 4 5 6 7 public Confirm.SelectOk confirmSelect () throws IOException { if (nextPublishSeqNo == 0 ) nextPublishSeqNo = 1 ; return (Confirm.SelectOk) exnWrappingRpc(new Confirm .Select(false )).getMethod(); }
这里的成员变量nextPublishSeqNo是用来为Confirm机制服务的,当Channel开启Confirm模式的时候,nextPublishSeqNo=1,标记第一条publish的序号,当Publish时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void basicPublish (String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte [] body) throws IOException{ if (nextPublishSeqNo > 0 ) { unconfirmedSet.add(getNextPublishSeqNo()); nextPublishSeqNo++; } BasicProperties useProps = props; if (props == null ) { useProps = MessageProperties.MINIMAL_BASIC; } transmit(new AMQCommand (new Basic .Publish.Builder() .exchange(exchange) .routingKey(routingKey) .mandatory(mandatory) .immediate(immediate) .build(), useProps, body)); }
client端向broker端Basic.Pubish发送消息并将当前的序号加入到unconfirmedSet中,并自加nextPublishSeqNo++等待下一个消息的发送。
有关Confirm.Select的详细用法可以参考:RabbitMQ之消息确认机制(事务+Confirm)
之后等待broker的确认回复(Basic.Ack/.Nack):channel.waitForConfirms()
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 public boolean waitForConfirms (long timeout) throws InterruptedException, TimeoutException { if (nextPublishSeqNo == 0L ) throw new IllegalStateException ("Confirms not selected" ); long startTime = System.currentTimeMillis(); synchronized (unconfirmedSet) { while (true ) { if (getCloseReason() != null ) { throw Utility.fixStackTrace(getCloseReason()); } if (unconfirmedSet.isEmpty()) { boolean aux = onlyAcksReceived; onlyAcksReceived = true ; return aux; } if (timeout == 0L ) { unconfirmedSet.wait(); } else { long elapsed = System.currentTimeMillis() - startTime; if (timeout > elapsed) { unconfirmedSet.wait(timeout - elapsed); } else { throw new TimeoutException (); } } } } }
可以看到waitForConfirms其实本质上是在等待unconfirmedSet变成empty,否则就线程wait()。
当接收到broker端的ACK/NACK回复时,一步步的经过处理到达processAsync(Command command)方法,然后进而处理Basic.Ack/.Nack帧。
1 2 3 4 5 6 7 8 9 10 11 else if (method instanceof Basic.Ack) { Basic.Ack ack = (Basic.Ack) method; callConfirmListeners(command, ack); handleAckNack(ack.getDeliveryTag(), ack.getMultiple(), false ); return true ; } else if (method instanceof Basic.Nack) { Basic.Nack nack = (Basic.Nack) method; callConfirmListeners(command, nack); handleAckNack(nack.getDeliveryTag(), nack.getMultiple(), true ); return true ; }
首先是将相应的Method做一下转换,之后callConfirmListeners(),这个方法是调用成员变量confirmListeners这个list里的所有的ConfirmListener:
1 private final Collection<ConfirmListener> confirmListeners = new CopyOnWriteArrayList <ConfirmListener>();
这个ConfirmListener的list就需要在channel.basicPushlish()调用之前先:
1 2 3 4 5 6 7 8 9 channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener () { public void handleAck (long deliveryTag, boolean multiple) throws IOException { } public void handleNack (long deliveryTag, boolean multiple) throws IOException { } });
在调用完ConfirmListener之后继续调用handleAckNack方法:
1 2 3 4 5 6 7 8 9 10 11 12 private void handleAckNack (long seqNo, boolean multiple, boolean nack) { if (multiple) { unconfirmedSet.headSet(seqNo + 1 ).clear(); } else { unconfirmedSet.remove(seqNo); } synchronized (unconfirmedSet) { onlyAcksReceived = onlyAcksReceived && !nack; if (unconfirmedSet.isEmpty()) unconfirmedSet.notifyAll(); } }
这个方法本意上是对收到某条消息的ACK或者NACK的处理,发送消息时Basic.Publish的nextPublishNo对应于相应的ACK/NACK的deliveryTag,将其从unconfirmedSet中删除即可,如果有NACK帧,则将其相应的标识onlyAcksReceived设置为false,判断此时unconfirmedSet是否为空,如果条件成立则notifyAll(),将waitForConfirm唤起,返回onlyAcksReceived的状态。
如果channel.waitForConfirm()返回为false,则说明broker没有接受client发送的消息,此时需要在业务代码中做进一步处理,比如重发。
Basic.Qos 消费者在开启ACK的情况下,对接受到的消息可以根据业务的需要异步对消息进行确认。 然而在实际使用过程中,由于消费者自身处理能力有限,从RabbitMQ获取一定数量的消息好厚,希望rabbitmq不再将队列中的消息推送过来,当对消息处理完后(即对消息进行了ack,并且有能力处理更多的消息)再接受来自队列的消息。在这种场景下,我们可以设置Basic.Qos中的prefetch_count来达到这个效果。
Basic.Consume 与消费有关的成员变量:
1 2 3 4 private final Map<String, Consumer> _consumers = Collections.synchronizedMap(new HashMap <String, Consumer>()); private volatile Consumer defaultConsumer = null ;private final ConsumerDispatcher dispatcher;
源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 public String basicConsume (String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, final Consumer callback) throws IOException { BlockingRpcContinuation<String> k = new BlockingRpcContinuation <String>() { public String transformReply (AMQCommand replyCommand) { String actualConsumerTag = ((Basic.ConsumeOk) replyCommand.getMethod()).getConsumerTag(); _consumers.put(actualConsumerTag, callback); dispatcher.handleConsumeOk(callback, actualConsumerTag); return actualConsumerTag; } }; rpc(new Basic .Consume.Builder() .queue(queue) .consumerTag(consumerTag) .noLocal(noLocal) .noAck(autoAck) .exclusive(exclusive) .arguments(arguments) .build(), k); try { return k.getReply(); } catch (ShutdownSignalException ex) { throw wrap(ex); } }
这个方法最精简的只要两个参数,即String queue和Consumer callback:public String basicConsume(String queue, Consumer callback)。
方法主要是发送Basic.Consume帧,然后等待Basic.ConsumeOk帧。待收到broker端的Basic.ConsumeOk帧之后,触发BlockingRpcContinuation中的transformReply()方法。有关BlockingRpcContinuation在[五]RabbitMQ-客户端源码之AMQChannel 中有陈述。transformReply()方法先是提取consumerTag,这个consumerTag是在channel.basicConsume()方法中设置的,是其中的一个参数,如果设置了此参数,那么consumerTag就是这个参数的值;如果没有设置这个consumerTag,Broker会返回一个consumerTag,类似:amq.ctag-Mg0eSv2GgfG6UzfncD8E9g。然后作为key和Consumer这个回调函数一起放置到_consumer这个回调函数中以备后面检索调用。这个consumerTag还作为transformReply()方法的返回值,存入到BlockingRpcContinuation对象中,既而在basicConsume这个方法最后调用k.getReply()方法是获取其值,也就是说basicConsume方法的返回值就是consumerTag。
当发送Basic.Consume帧之后,由broker返回的是Basic.ConsumeOk帧+Basic.Deliver帧,Basic.ConsumerOk帧由上面方法处理,Basic.Deliver帧由processAsync处理。
说到basicConsume方法,还有一个重要的就是设置Consumer这个回调函数。一般为了方便直接使用RabbitMQ客户端自带的QueueingConsumer来处理,当然也可以实现一个自定义的Consumer,当然了需要实现Consumer这个接口,可以参考QueueingConsumer的父类DefaultConsumer, 有关Consumer相关的更多细节,可以参考:[九]RabbitMQ-客户端源码之Consumer 。
dispatcher.handleConsumeOk(callback, actualConsumerTag);这段代码实际上就是:callback.handleConsumeOk(actualConsumerTag),这个还是调用到Consumer的方法处理。
Basic.Get 上面的Basic.Consume是基于push模式的,而Basic.Get是基于pull模式的。相关的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public GetResponse basicGet (String queue, boolean autoAck) throws IOException { AMQCommand replyCommand = exnWrappingRpc(new Basic .Get.Builder() .queue(queue) .noAck(autoAck) .build()); Method method = replyCommand.getMethod(); if (method instanceof Basic.GetOk) { Basic.GetOk getOk = (Basic.GetOk)method; Envelope envelope = new Envelope (getOk.getDeliveryTag(), getOk.getRedelivered(), getOk.getExchange(), getOk.getRoutingKey()); BasicProperties props = (BasicProperties)replyCommand.getContentHeader(); byte [] body = replyCommand.getContentBody(); int messageCount = getOk.getMessageCount(); return new GetResponse (envelope, props, body, messageCount); } else if (method instanceof Basic.GetEmpty) { return null ; } else { throw new UnexpectedMethodError (method); } }
基本上就是客户端发送Basic.Get至Broker,Broker返回Basic.GetOK并携带数据。注意方法最后返回GetResponse对象,这个对象就是包装了一下数据。
事务 和事务有关的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public Tx.SelectOk txSelect () throws IOException { return (Tx.SelectOk) exnWrappingRpc(new Tx .Select()).getMethod(); } public Tx.CommitOk txCommit () throws IOException { return (Tx.CommitOk) exnWrappingRpc(new Tx .Commit()).getMethod(); } public Tx.RollbackOk txRollback () throws IOException { return (Tx.RollbackOk) exnWrappingRpc(new Tx .Rollback()).getMethod(); }
这里可以看到基本对于事务的处理是采用rpc的方法一对一的进行交互,有关RabbitMQ的事务机制可以参考:RabbitMQ之消息确认机制(事务+Confirm) 。
其余 ChannelN还有:
关于Exchange,Queue的申明创建,删除,绑定,解绑 关闭处理 Basic.Return Basic.Flow Basic.Recover Basic.Cancel Basic.Ack/.Nack/.Reject 这些就不做详细介绍了。有兴趣的同学可以继续翻阅源码,这些都比较简单。