九、RabbitMQ-客户端源码之Consumer
云少在[八]RabbitMQ-客户端源码之ChannelN中讲述basicConsume的方法时设计到Consumer这个回调函数,Consumer其实是一个接口,真正实现它的是QueueingConsumer和DefaultConsumer,且DefaultConsumer是QueueingConsumer的父类,里面都是空方法。在用户使用时可以简单的采用QueueingConsumer或者采用DefaultConsumer来重写某些方法。
这里先来看下消费者客户端的关键代码:
1 2 3 4 5 6 7 8 9 10
| QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicQos(32); channel.basicConsume(QUEUE_NAME, false, "consumer_zzh",consumer)
while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [X] Received '" + message + "'"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); }
|
可以看到QueueingConsumer作为channel.basicConsume的回调函数,之后再进行处理。
在AMQConnection中有关MainLoop的主线程,专门用来”第一线”的处理Broker发送回客户端从帧。当Basic.Consume/.ConsumeOk开启消费模式之后,Broker主动的向客户端发送Basic.Delivery帧,MainLoop线程一步步的调用,最后到ChannelN的processAsync()方法中有:
1 2 3 4
| if (method instanceof Basic.Deliver) { processDelivery(command, (Basic.Deliver) method); return true; }
|
之后调用processDelivery方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| protected void processDelivery(Command command, Basic.Deliver method) { Basic.Deliver m = method;
Consumer callback = _consumers.get(m.getConsumerTag()); if (callback == null) { if (defaultConsumer == null) { throw new IllegalStateException("Unsolicited delivery -" + " see Channel.setDefaultConsumer to handle this" + " case."); } else { callback = defaultConsumer; } }
Envelope envelope = new Envelope(m.getDeliveryTag(), m.getRedelivered(),m.getExchange(),m.getRoutingKey()); try { this.dispatcher.handleDelivery(callback, m.getConsumerTag(),envelope, (BasicProperties) command.getContentHeader(),command.getContentBody()); } catch (Throwable ex) { getConnection().getExceptionHandler().handleConsumerException(this, ex,callback,m.getConsumerTag(), "handleDelivery"); } }
|
这个方法首先根据consumerTag从ChannelN中的_consumer这个HashMap中获取相应的Consumer回调函数,然后调用这个回调函数的handleDeliver()方法进行处理,这里有些同学会有疑问,明明是调用ConsumerDispatcher dispatcher的handleDeliver()方法,其实这里只是包了一层皮,ConsumerDispatcher的handleDeliver()方法就是调用了Consumer的handleDeliver()方法。
我们接下去看看QueueingConsumer这个实现Consumer接口的类是怎么处理的:
1 2 3 4 5 6 7 8 9
| @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { checkShutdown(); this._queue.add(new Delivery(envelope, properties, body)); }
|
这里的queue就是一个LinkedBlockingQueue,客户端程序通过调用nextDelivery()方法来获取数据:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { return handle(_queue.take()); }
private Delivery handle(Delivery delivery) { if (delivery == POISON || delivery == null && (_shutdown != null || _cancelled != null)) { if (delivery == POISON) { _queue.add(POISON); if (_shutdown == null && _cancelled == null) { throw new IllegalStateException( "POISON in queue, but null _shutdown and null _cancelled. " + "This should never happen, please report as a BUG"); } } if (null != _shutdown) throw Utility.fixStackTrace(_shutdown); if (null != _cancelled) throw Utility.fixStackTrace(_cancelled); } return delivery; }
|
这个nextDelivery方法说白就是一个LinkedBlockingQueue的take()操作,也就是一个可能会阻塞等待的操作。