RabbitMQ 之Java API解析 六(消费消息)

作者: poxiao 分类: RabbitMQ 发布时间: 2018-05-02 14:06

消费者相关API解析

basicConsume

消费者最主要的方法:basicConsume(开始订阅消息),下面是参数介绍:

queue   队列名称

autoAck  是否自动确认消息

consumerTag      该消费者别名(别名不能重复,否则调用此方法会有异常)

noLocal    是否忽略同一个Connection递送的消息;同一个客户端的通信一般不需要通过消息队列,就好像把左手的东西递给右手,不需要先把东西放在桌上,右手再去拿一样;所以,设计者认为有必要提供这样一个属性,让消费者自己确认是否忽略来自同一个客户端的消息)

exclusive    “排他消费者”,此参数仅对Queue有效。即如果Queue通道中,有多个consumer同时活跃时只会有一个consumer能够获取消息,对于broker而言,如果Queue中,有任意一个Consumer是“排他的”,那么消息只会转发给“exclusiveConsumer=true”的消费者;如果全部的消费者都是“排他的”,那么最新创建的consumer将会获取消息。我们通常在分布式环境中,为了避免对某些重要数据并发操作时使用此特性,比如:订单中心修改订单状态(来自各个系统的消息,都想修改订单状态,但是它们必须串行操作)。

arguments   附加参数

Consumer callback   消费者实现,消费逻辑类

至于DeliverCallback deliverCallback, CancelCallback cancelCallback, ConsumerShutdownSignalCallback shutdownSignalCallback  这些则是通过内部函数构造成Consumer :

关于Consumer 的使用,官方已经不再建议使用QueueingConsumer来进行消费,因为它使用死循环的方式获取消息,容易引起死锁;

现在建议使用实现Consumer 接口的方式,一般使用默认的DefaultConsumer,这种方式通过消息监听获得消息,设计更合理

如果同一个Channel有多个消费者监听的话会通过轮询的方式调用消费者,调用过程是阻塞的,如果想要并行执行,建议使用多个Channel,前面章节提到的创建连接时使用的线程池就是做这个用的

Channel数量最好和线程数量一致,要不然处理不过来。

 

消费确认

如果autoAck  为false的话 ,需要手动确认

1.basicAck   确认消费成功

第一个参数是 Delivery Tag。 Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。

RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。

第二个参数 multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认;如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认。(批量确认针对的是整个信道,参考gordon.study.rabbitmq.ack.TestBatchAckInOneChannel.java。)

对同一消息的重复确认,或者对不存在的消息的确认,会产生 IO 异常,导致信道关闭。

如果忘了确认,只要程序还在运行,消息就一直是 Unacked 状态,无法被 RabbitMQ 重新投递。更厉害的是,RabbitMQ 消息消费并没有超时机制,也就是说,程序不重启,消息就永远是 Unacked 状态。处理运维事件时不要忘了这些 Unacked 状态的消息

当程序关闭时(实际只要 Consumer 关闭就行),消息会恢复为 Ready 状态。

2. basicReject  取消确认

当消费消息出现异常时,我们需要取消确认,这时我们可以使用 Channel 的 basicReject 方法。

第一个参数指定 delivery tag,第二个参数说明如何处理这个失败消息。requeue 值为 true 表示该消息重新放回队列头,值为 false 表示放弃这条消息

一般来说,如果是系统无法处理的异常,我们一般是将 requeue 设为 false,例如消息格式错误,再处理多少次也是异常。调用第三方接口超时这类异常 requeue 应该设为 true。

3. basicNack   批量取消确认

从 basicReject 方法参数可见,取消确认不支持批量操作(类似于 basicAck 的 multiple 参数)。

所以,RabbitMQ 增加了 basicNack 方法以提供批量取消能力。参考 https://www.rabbitmq.com/nack.html

PS:Reject 的消息重新推送来时,delivery tag 就是新的值了。

4.basicRecover  重新投递

重新投递并没有所谓的像basicReject中的basicReject的deliveryTag参数,所以重新投递好像是将消费者还没有处理的所有的消息都重新放入到队列中,而不是将某一条消息放入到队列中,与basicReject不同的是,重新投递可以指定投递的消息是否允许当前消费者消费。

 

basicGet

与basicConsume不同,该方式是主动从队列中获取消息,获得一个GetResponse实例,里面包含消息的具体内容

 

basicCancel

该方法与basicConsume相反,用于取消订阅,消费者不再订阅该队列,

如果这个队列不想获取服务器推送可以直接取消订阅,可以改为使用basicGet主动拉取消息

basicQos

RabbitMQ提供了一种qos(服务质量保证)功能,即在非自动确认消息的前提下,如果一定数目的消息(通过基于consume或者channel设置Qos的值)未被确认前,不进行消费新的消息。

这种机制一方面可以实现限速(将消息暂存到RabbitMQ内存中)的作用,一方面可以保证消息确认质量(比如确认了但是处理有异常的情况)。

消费确认模式必须是非自动ACK机制(这个是使用baseQos的前提条件,否则会Qos不生效),然后设置basicQos的值;另外,还可以基于consume和channel的粒度进行设置(global);

prefetchSize 控制流量

prefetchCount 控制消息数量,未消费确认的消息超过该值,这不会向该消费者投递消息

global  是否应用于整个channel

 

参考资料:

https://blog.csdn.net/asdfsadfasdfsa/article/details/53501723

http://www.cnblogs.com/gordonkong/p/6952957.html

https://blog.csdn.net/vbirdbest/article/details/78699913

http://www.rabbitmq.com/consumer-prefetch.html

 

 

本文链接:RabbitMQ 之Java API解析 六(消费消息)

转载声明:本站文章若无特别说明,皆为原创,转载请注明来源:破晓(http://www.code2048.net),谢谢!^^


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

邮箱地址不会被公开。 必填项已用*标注