RabbitMQ 之Java API解析 六(消费消息)
消费者相关API解析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
Consumer getDefaultConsumer(); void setDefaultConsumer(Consumer consumer); void basicQos(int prefetchCount) throws IOException; void basicQos(int prefetchCount, boolean global) throws IOException; void basicQos(int prefetchSize, int prefetchCount, boolean global) throws IOException; String basicConsume(String queue, Consumer callback) throws IOException; String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; String basicConsume(String queue, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException; String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, Consumer callback) throws IOException; String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, Consumer callback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, Consumer callback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; String basicConsume(String queue, boolean autoAck, String consumerTag, boolean noLocal, boolean exclusive, Map<String, Object> arguments, DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback) throws IOException; GetResponse basicGet(String queue, boolean autoAck) throws IOException; void basicAck(long deliveryTag, boolean multiple) throws IOException; void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException; void basicCancel(String consumerTag) throws IOException; Basic.RecoverOk basicRecover() throws IOException; Basic.RecoverOk basicRecover(boolean requeue) throws IOException; |
basicConsume
消费者最主要的方法:basicConsume(开始订阅消息),下面是参数介绍:
queue 队列名称
autoAck 是否自动确认消息
consumerTag 该消费者别名(别名不能重复,否则调用此方法会有异常)
noLocal 是否忽略同一个Connection递送的消息;同一个客户端的通信一般不需要通过消息队列,就好像把左手的东西递给右手,不需要先把东西放在桌上,右手再去拿一样;所以,设计者认为有必要提供这样一个属性,让消费者自己确认是否忽略来自同一个客户端的消息)
exclusive “排他消费者”,此参数仅对Queue有效。即如果Queue通道中,有多个consumer同时活跃时只会有一个consumer能够获取消息,对于broker而言,如果Queue中,有任意一个Consumer是“排他的”,那么消息只会转发给“exclusiveConsumer=true”的消费者;如果全部的消费者都是“排他的”,那么最新创建的consumer将会获取消息。我们通常在分布式环境中,为了避免对某些重要数据并发操作时使用此特性,比如:订单中心修改订单状态(来自各个系统的消息,都想修改订单状态,但是它们必须串行操作)。
arguments 附加参数
Consumer callback 消费者实现,消费逻辑类
至于DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback 这些则是通过内部函数构造成Consumer :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
private Consumer consumerFromDeliverCancelShutdownCallbacks(final DeliverCallback deliverCallback, final CancelCallback cancelCallback, final ConsumerShutdownSignalCallback shutdownSignalCallback) { return new Consumer() { @Override public void handleConsumeOk(String consumerTag) { } @Override public void handleCancelOk(String consumerTag) { } @Override public void handleCancel(String consumerTag) throws IOException { cancelCallback.handle(consumerTag); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { shutdownSignalCallback.handleShutdownSignal(consumerTag, sig); } @Override public void handleRecoverOk(String consumerTag) { } @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { deliverCallback.handle(consumerTag, new Delivery(envelope, properties, body)); } }; } |
关于Consumer 的使用,官方已经不再建议使用QueueingConsumer来进行消费,因为它使用死循环的方式获取消息,容易引起死锁;
现在建议使用实现Consumer 接口的方式,一般使用默认的DefaultConsumer,这种方式通过消息监听获得消息,设计更合理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 |
public class TestConsumer extends DefaultConsumer { public TestConsumer(Channel channel) { super(channel); // TODO Auto-generated constructor stub } @Override public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { // TODO Auto-generated method stub String routingKey = envelope.getRoutingKey(); String contentType = properties.getContentType(); System.out.println("消费的路由键:" + routingKey); System.out.println("消费的内容类型:" + contentType); long deliveryTag = envelope.getDeliveryTag(); //确认消息 getChannel().basicAck(deliveryTag, false); System.out.println("消费的消息体内容:"); try { System.out.println(consumerTag + "=======" + ParamDecoder.getInstance().decoder(body)); Thread.sleep(5000); } catch (Exception e) { getChannel().basicNack(deliveryTag, false, true); e.printStackTrace(); } } } |
1 |
channel.basicConsume(queue, false, consumerTag, new TestConsumer(channel)); |
如果同一个Channel有多个消费者监听的话会通过轮询的方式调用消费者,调用过程是阻塞的,如果想要并行执行,建议使用多个Channel,前面章节提到的创建连接时使用的线程池就是做这个用的
1 2 3 |
ExecutorService service = Executors.newFixedThreadPool(10); factory.setSharedExecutor(service); Connection connection = factory.newConnection(); |
Channel数量最好和线程数量一致,要不然处理不过来。
消费确认
如果autoAck 为false的话 ,需要手动确认
1.basicAck 确认消费成功
第一个参数是 Delivery Tag。 Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。
RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。
第二个参数 multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。(批量确认针对的是整个信道,参考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)
对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。
如果忘了确认,只要程序还在运行,消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息。
当程序关闭时(实际只要 Consumer 关闭就行),消息会恢复为 Ready 状态。
2. basicReject 取消确认
当消费消息出现异常时,我们需要取消确认,这时我们可以使用 Channel 的 basicReject 方法。
第一个参数指定 delivery tag,第二个参数说明如何处理这个失败消息。requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息。
一般来说,如果是系统无法处理的异常,我们一般是将 requeue 设为 false,例如消息格式错误,再处理多少次也是异常。调用第三方接口超时这类异常 requeue 应该设为 true。
3. basicNack 批量取消确认
从 basicReject 方法参数可见,取消确认不支持批量操作(类似于 basicAck 的 multiple 参数)。
所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。参考 https://www.rabbitmq.com/nack.html
PS:Reject 的消息重新推送来时,delivery tag 就是新的值了。
4.basicRecover 重新投递
重新投递并没有所谓的像basicReject中的basicReject的deliveryTag参数,所以重新投递好像是将消费者还没有处理的所有的消息都重新放入到队列中,而不是将某一条消息放入到队列中,与basicReject不同的是,重新投递可以指定投递的消息是否允许当前消费者消费。
basicGet
与basicConsume不同,该方式是主动从队列中获取消息,获得一个GetResponse实例,里面包含消息的具体内容
1 2 3 4 5 6 7 |
public GetResponse(Envelope envelope, BasicProperties props, byte[] body, int messageCount) { this.envelope = envelope; this.props = props; this.body = body; this.messageCount = messageCount; } |
basicCancel
该方法与basicConsume相反,用于取消订阅,消费者不再订阅该队列,
如果这个队列不想获取服务器推送可以直接取消订阅,可以改为使用basicGet主动拉取消息
basicQos
RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。
这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。
消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global);
prefetchSize 控制流量
prefetchCount 控制消息数量,未消费确认的消息超过该值,这不会向该消费者投递消息
global 是否应用于整个channel
参考资料:
https://blog.csdn.net/asdfsadfasdfsa/article/details/53501723
http://www.cnblogs.com/gordonkong/p/6952957.html
https://blog.csdn.net/vbirdbest/article/details/78699913
http://www.rabbitmq.com/consumer-prefetch.html