RabbitMQ 之Java API解析 五(生产消息)
生产者相关API解析
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException; void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException; void addReturnListener(ReturnListener listener); ReturnListener addReturnListener(ReturnCallback returnCallback); boolean removeReturnListener(ReturnListener listener); void clearReturnListeners(); void addConfirmListener(ConfirmListener listener); ConfirmListener addConfirmListener(ConfirmCallback ackCallback, ConfirmCallback nackCallback); boolean removeConfirmListener(ConfirmListener listener); void clearConfirmListeners(); Confirm.SelectOk confirmSelect() throws IOException; long getNextPublishSeqNo(); boolean waitForConfirms() throws InterruptedException; boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException; void waitForConfirmsOrDie() throws IOException, InterruptedException; void waitForConfirmsOrDie(long timeout) throws IOException, InterruptedException, TimeoutException; Tx.SelectOk txSelect() throws IOException; Tx.CommitOk txCommit() throws IOException; Tx.RollbackOk txRollback() throws IOException; |
basicPublish
生产者中最重要的就是basicPublish 发送消息,它的参数分别为:
exchange 交换机名称
routingKey 路由Key
mandatory 当mandatory标志位设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,那么会调用basic.return方法将消息返回给生产者(Basic.Return + Content-Header + Content-Body);当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
immediate 当immediate标志位设置为true时,如果exchange在将消息路由到queue(s)时发现对于的queue上么有消费者,那么这条消息不会放入队列中。当与消息routeKey关联的所有queue(一个或者多个)都没有消费者时,该消息会通过basic.return方法返还给生产者。
props 消息属性字段,比如消息头部信息等等
body 消息内容
exchange、routingKey和body不用多说
mandatory 和immediate 概括来说,mandatory标志告诉服务器至少将该消息route到一个队列中,否则将消息返还给生产者;immediate标志告诉服务器如果该消息关联的queue上有消费者,则马上将消息投递给它,如果所有queue都没有消费者,直接把消息返还给生产者,不用将消息入队列等待消费者了。immediate 已经不再推荐使用,影响镜像队列的性能。那么如何获取mandatory 标记返回的消息呢?
使用 addReturnListener 获取投递失败的消息
|
1 2 3 4 5 6 7 8 9 |
channel.addReturnListener(new ReturnListener() { @Override public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, BasicProperties properties, byte[] body) throws IOException { String bodyStr = new String(body, "UTF-8"); System.out.println("返回" + bodyStr); } }); |
重点看一下props 参数吧
props 通过建造者模式构建
|
1 2 3 4 5 |
Builder properties=new BasicProperties.Builder(); properties.expiration("10000"); properties.deliveryMode(2);//持久化 BasicProperties props = properties.build(); |
BasicProperties定义如下:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
public BasicProperties( String contentType,//消息mime类型如:text/plain String contentEncoding,//编码 如UTF-8 Map<String,Object> headers, Integer deliveryMode,//1:nonpersistent 2:persistent(持久化) Integer priority,//优先级 String correlationId,//此次请求的标识号,服务器处理完成后需要将此属性返还,客户端将根据这个id了解哪条请求被成功执行了或执行失败 String replyTo,//反馈队列:一个Queue名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中 String expiration,//消息过期时间 String messageId, // 消息ID Date timestamp,//时间戳 String type, String userId, String appId, String clusterId) |
下文引自:RabbitMQ之消息确认机制(事务+Confirm)
消息确认事务机制
RabbitMQ中与事务机制有关的方法有三个:txSelect(), txCommit()以及txRollback(), txSelect用于将当前channel设置成transaction模式,txCommit用于提交事务,txRollback用于回滚事务,在通过txSelect开启事务之后,我们便可以发布消息给broker代理服务器了,如果txCommit提交成功了,则消息一定到达了broker了,如果在txCommit执行之前broker异常崩溃或者由于其他原因抛出异常,这个时候我们便可以捕获异常通过txRollback回滚事务了。
示例代码:
|
1 2 3 4 5 6 7 |
try{ channel.txSelect(); channel.basicPublish("e1", "testKey", props, body); channel.txCommit(); } catch (Exception e) { channel.txRollback(); } |
代码中先是发送了消息至broker中但是这时候发生了异常,之后在捕获异常的过程中进行事务回滚。
事务确实能够解决producer与broker之间消息确认的问题,只有消息成功被broker接受,事务提交才能成功,否则我们便可以在捕获异常进行事务回滚操作同时进行消息重发,但是使用事务机制的话会降低RabbitMQ的性能,那么有没有更好的方法既能保障producer知道消息已经正确送到,又能基本上不带来性能上的损失呢?从AMQP协议的层面看是没有更好的方法,但是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。
Confirm模式
概述
上面我们介绍了RabbitMQ可能会遇到的一个问题,即生成者不知道消息是否真正到达broker,随后通过AMQP协议层面为我们提供了事务机制解决了这个问题,但是采用事务机制实现会降低RabbitMQ的消息吞吐量,那么有没有更加高效的解决方式呢?答案是采用Confirm模式。
producer端confirm模式的实现原理
生产者将信道设置成confirm模式,一旦信道进入confirm模式,所有在该信道上面发布的消息都会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,broker就会发送一个确认给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会将消息写入磁盘之后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也可以设置basic.ack的multiple域,表示到这个序列号之前的所有消息都已经得到了处理。
confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就可以在等信道返回确认的同时继续发送下一条消息,当消息最终得到确认之后,生产者应用便可以通过回调方法来处理该确认消息,如果RabbitMQ因为自身内部错误导致消息丢失,就会发送一条nack消息,生产者应用程序同样可以在回调方法中处理该nack消息。
在channel 被设置成 confirm 模式之后,所有被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。但是没有对消息被 confirm 的快慢做任何保证,并且同一条消息不会既被 confirm又被nack 。
开启confirm模式的方法
生产者通过调用channel的confirmSelect方法将channel设置为confirm模式,如果没有设置no-wait标志的话,broker会返回confirm.select-ok表示同意发送者将当前channel信道设置为confirm模式(从目前RabbitMQ最新版本3.6来看,如果调用了channel.confirmSelect方法,默认情况下是直接将no-wait设置成false的,也就是默认情况下broker是必须回传confirm.select-ok的)。
编程模式
对于固定消息体大小和线程数,如果消息持久化,生产者confirm(或者采用事务机制),消费者ack那么对性能有很大的影响.
消息持久化的优化没有太好方法,用更好的物理存储(SAS, SSD, RAID卡)总会带来改善。生产者confirm这一环节的优化则主要在于客户端程序的优化之上。归纳起来,客户端实现生产者confirm有三种编程方式:
- 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。实际上是一种串行confirm了。
- 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
- 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。
从编程实现的复杂度上来看:
第一种
普通confirm模式最简单,publish一条消息后,等待服务器端confirm,如果服务端返回false或者超时时间内未返回,客户端进行消息重传。
关键代码如下:
|
1 2 3 4 |
channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); if(!channel.waitForConfirms()){ System.out.println("send message failed."); } |
第二种
批量confirm模式稍微复杂一点,客户端程序需要定期(每隔多少秒)或者定量(达到多少条)或者两则结合起来publish消息,然后等待服务器端confirm, 相比普通confirm模式,批量极大提升confirm效率,但是问题在于一旦出现confirm返回false或者超时的情况时,客户端需要将这一批次的消息全部重发,这会带来明显的重复消息数量,并且,当消息经常丢失时,批量confirm性能应该是不升反降的。
关键代码:
|
1 2 3 4 5 6 7 |
channel.confirmSelect(); for(int i=0;i<batchCount;i++){ channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); } if(!channel.waitForConfirms()){ System.out.println("send message failed."); } |
第三种
异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),我们需要自己为每一个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是通过SortedSet维护消息序号的。
关键代码:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 |
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } }); while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); } |
SDK中waitForConfirms方法实现:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
/** Set of currently unconfirmed messages (i.e. messages that have * not been ack'd or nack'd by the server yet. */ private final SortedSet<Long> unconfirmedSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); public boolean waitForConfirms(long timeout) throws InterruptedException, TimeoutException { if (nextPublishSeqNo == 0L) throw new IllegalStateException("Confirms not selected"); long startTime = System.currentTimeMillis(); synchronized (unconfirmedSet) { while (true) { if (getCloseReason() != null) { throw Utility.fixStackTrace(getCloseReason()); } if (unconfirmedSet.isEmpty()) { boolean aux = onlyAcksReceived; onlyAcksReceived = true; return aux; } if (timeout == 0L) { unconfirmedSet.wait(); } else { long elapsed = System.currentTimeMillis() - startTime; if (timeout > elapsed) { unconfirmedSet.wait(timeout - elapsed); } else { throw new TimeoutException(); } } } } } |
性能测试
Client端机器和RabbitMQ机器配置:CPU:24核,2600MHZ, 64G内存,1TB硬盘。
Client端发送消息体大小10B,线程数为1即单线程,消息都持久化处理(deliveryMode:2)。
分别采用事务模式、普通confirm模式,批量confirm模式和异步confirm模式进行producer实验,比对各个模式下的发送性能。

发送平均速率:
- 事务模式(tx):1637.484
- 普通confirm模式(common):1936.032
- 批量confirm模式(batch):10432.45
- 异步confirm模式(async):10542.06
可以看到事务模式性能是最差的,普通confirm模式性能比事务模式稍微好点,但是和批量confirm模式还有异步confirm模式相比,还是小巫见大巫。批量confirm模式的问题在于confirm之后返回false之后进行重发这样会使性能降低,异步confirm模式(async)编程模型较为复杂,至于采用哪种方式,那是仁者见仁智者见智了。
消息确认(Consumer端)
为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,可以指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,如果是持久化消息的话)中移去消息。否则,RabbitMQ会在队列中消息被消费后立即删除它。
采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题,因为RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。
当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分成了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,但是还没有收到消费者ack信号的消息。如果服务器端一直没有收到消费者的ack信号,并且消费此消息的消费者已经断开连接,则服务器端会安排该消息重新进入队列,等待投递给下一个消费者(也可能还是原来的那个消费者)。
RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否需要重新投递给消费者的唯一依据是消费该消息的消费者连接是否已经断开。这么设计的原因是RabbitMQ允许消费者消费一条消息的时间可以很久很久。
RabbitMQ管理平台界面上可以看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者但是未收到ack信号的消息数。也可以通过命令行来查看上述信息:

代码示例(关闭自动消息确认,进行手动ack):
|
1 2 3 4 5 6 7 8 9 |
QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(ConfirmConfig.queueName, false, consumer); while(true){ QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String msg = new String(delivery.getBody()); // do something with msg. channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } |
broker将在下面的情况中对消息进行confirm:
broker发现当前消息无法被路由到指定的queues中(如果设置了mandatory属性,则broker会发送basic.return)
非持久属性的消息到达了其所应该到达的所有queue中(和镜像queue中)
持久消息到达了其所应该到达的所有queue中(和镜像中),并被持久化到了磁盘(fsync)
持久消息从其所在的所有queue中被consume了(如果必要则会被ack)
basicRecover:是路由不成功的消息可以使用recovery重新发送到队列中。
basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,可以设置是否放回到队列中还是丢掉,而且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。
basicNack:可以一次拒绝N条消息,客户端可以设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的所有未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。
参考资料:
https://blog.csdn.net/u013256816/article/details/54914525
https://blog.csdn.net/whycold/article/details/41119807

