RabbitMQ 之Java API解析 二(声明队列)
概览
首先看一下Channel中队列相关API:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
队列声明: Queue.DeclareOk queueDeclare() throws IOException; Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; void queueDeclareNoWait(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; Queue.DeclareOk queueDeclarePassive(String queue) throws IOException; 队列删除: Queue.DeleteOk queueDelete(String queue) throws IOException; Queue.DeleteOk queueDelete(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; void queueDeleteNoWait(String queue, boolean ifUnused, boolean ifEmpty) throws IOException; 队列绑定: Queue.BindOk queueBind(String queue, String exchange, String routingKey) throws IOException; Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; void queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; 队列解绑: Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey) throws IOException; Queue.UnbindOk queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException; 队列清除: Queue.PurgeOk queuePurge(String queue) throws IOException; |
队列声明
在Rabbit MQ中,无论是生产者发送消息还是消费者接受消息,都首先需要声明一个MessageQueue。这就存在一个问题,是生产者声明还是消费者声明呢?要解决这个问题,首先需要明确:
a)消费者是无法订阅或者获取不存在的MessageQueue中信息。
b)消息被Exchange接受以后,如果没有匹配的Queue,则会被丢弃。
在明白了上述两点以后,就容易理解如果是消费者去声明Queue,就有可能会出现在声明Queue之前,生产者已发送的消息被丢弃的隐患。如果应用能够通过消息重发的机制允许消息丢失,则使用此方案没有任何问题。但是如果不能接受该方案,这就需要无论是生产者还是消费者,在发送或者接受消息前,都需要去尝试建立消息队列。这里有一点需要明确,如果客户端尝试建立一个已经存在的消息队列,Rabbit MQ不会做任何事情,并返回客户端建立成功的。
如果一个消费者在一个信道中正在监听某一个队列的消息,Rabbit MQ是不允许该消费者在同一个channel去声明其他队列的。就是说消费者只需声明其正在消费的队列即可。
需要注意的是不能用不同的参数声明同一队列,如果要改参数重新声明已存在的队列,需要把原队列删除。
队列声明的四种方式,首先看第一种:
queueDeclare()
1 2 3 4 |
@Override public AMQP.Queue.DeclareOk queueDeclare() throws IOException { return queueDeclare("", false, true, true, null); } |
看一下源码:是对第二种声明方式的封装,创建一个非持久的、唯一的、自动删除的队列且队列名称由服务器随机产生。
最常用的声明方式是第二种:
1 |
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException; |
看一下他的几个参数:
String queue 这没什么好说的,队列名称。
boolean durable 是否持久化,那么问题来了,这是什么意思?持久化,指的是队列持久化到数据库中。在之前的博文中也说过,如果RabbitMQ服务挂了怎么办,队列丢失了自然是不希望发生的。持久化设置为true的话,即使服务崩溃也不会丢失队列。
boolean exclusive 是否排外,what? 这又是什么呢。设置了排外为true的队列只可以在本次的连接中被访问,也就是说在当前连接创建多少个channel访问都没有关系,但是如果是一个新的连接来访问,对不起,不可以。还有一个需要说一下的是,排外的queue在当前连接被断开的时候会自动消失(清除)无论是否设置了持久化。
如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。
boolean autoDelete 这个就很简单了,是否自动删除。也就是说queue会清理自己。但是是在最后一个connection断开的时候
如果该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
Map<String, Object> arguments 这个说来话就长了,放后面说。
第三种声明方式:异步声明
1 |
queueDeclareNoWait(queue, durable, exclusive, autoDelete, arguments) |
参数同第二种方式,区别是相当于一个async版的声明队列,可以看到是没有返回的。调完方法就结束。也不等队列创建结果。
第四种声明方式:消极创建
1 |
queueDeclarePassive(queue) |
消极的声明创建?这是什么鬼,事实上它没有去声明队列,所谓消极,去看看有没有名为xxx的queue,如果有我就把名字什么的信息告诉你,没有就直接报错也不管。那么问题来了,这个方法如此鸡肋,我要它有何用? 。。。其实你可以用来确认queue是否存在嘛
arguments 解析
上面讲参数的一些作用的时候,忽略了最后一个字典类型的参数,因为这个参数是大有文章的,值得单独进出来说道说道。
这时,就不得不打开我们的 Web UI管理系统了,可以看到在这边添加queue的时候,有Arguments下面有一些说明,Message TTL,Auto expire…….
Message TTL
可以看到,点击一下Message TTL,它的参数名是 x-message-ttl 类型是number,那么这个应该怎么用呢?
一起来看看官方解释,嗯。。。其实也就是在说,在声明队列的时候可以添加这个参数,那么它的作用是让发布的message在队列中可以存活多长时间,以毫秒为单位。更通俗点就是,设置了这个参数,发布的消息在queue时间超过了你设定的时间就会被删除掉。
送上代码,不多说,先跑起来,这时就可以看到,在features也可以看到queue是ttl的
1 2 3 |
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 5000); channel.queueDeclare("messagettlqueue", true, false, false, arguments); //////////// 声明队列 |
这个是针对queue的,当然更加定制化的针对message也是有的,也就是说可以让queue里的某条message在多久消失
1 2 3 4 |
Builder properties=new BasicProperties.Builder(); properties.expiration("5000"); byte[] messageBodyBytes = "我五秒后就会消失".getBytes(); channel.basicPublish("exchange", "queue", properties.build(), messageBodyBytes); |
如果队列 和消息同时设置了 TTL 则以少的为准,
比如
队列TTL=30秒
消息1 TTL=5秒
消息2 TTL=10秒
消息3 TTL=45秒
消息4 TTL=55秒
执行顺序为:5秒后消息1过期; 10秒后消息2过期; 30秒后消息4、5过期(队列中所有消息过期)
Auto expire
之前是针对 queue中的消息或者消息本身,而这个才是针对queue,这里是官方解释,也就是说,当前的queue在指定的时间内,没有consumer、basic.get也就是未被访问,就会被删除。
设置起来也是非常简单的
1 2 3 4 |
//声明一个queue,queue五秒内而且未被任何形式的消费,则被删除 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-expires", 5000); channel.queueDeclare("messagettlqueue", true, false, false, arguments); |
MaxLength与MaxLength bytes
相信到这里大家已经可以根据名字去猜测它的作用了,没错它们是设置queue的消息最大条数与消息最大占用大小
并不是说,设置了最大长度为10,第11条数据插入的时候就会报错,而是在超过了最大长度后进行插入会删除之前插入的消息为本次的留出空间,也就是说无论什么时候,queue中的消息始终都是十条,相应的最大占用大小也是这个道理,当超过了这个大小的时候,会删除之前插入的消息为本次的留出空间。
1 2 3 4 5 |
////声明一个queue,最大长度10,最大大小2048bytes Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-max-length", 10); arguments.put("x-max-length-bytes", 2048); channel.queueDeclare("messagettlqueue", true, false, false, arguments); |
Dead letter exchagne 与 Dead letter routing key
Dead letter 死信,可能有些人对这个词比较陌生,那么我们首先来了解什么叫死信,也就是说那些没有被投递出去的信件
就像上面的messagettl,maxlength等。消息因为超时或超过限制在队列里消失,这样我们就丢失了一些消息,也许里面就有一些是我们做需要获知的。而rabbitmq的死信功能则为我们带来了解决方案。设置了dead letter exchange与dead letter routingkey(要么都设定,要么都不设定)那些因为超时或超出限制而被删除的消息会被推动到我们设置的exchange中,再根据routingkey推到queue中
这里是生产端,声明了一个testqueue,它里面的消息会在5秒后被删除,然后又设置了死信的exchange与routingkey。
就是说 testqueue 内的消息5秒后会进入dlexqueue队列
1 2 3 4 5 6 7 8 9 10 |
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-message-ttl", 5000); arguments.put("x-dead-letter-exchange", "dlexchange"); arguments.put("x-dead-letter-routing-key", "dlexqueue"); channel.queueDeclare("testqueue", true, false, false, arguments); channel.queueDeclare("dlexqueue", true, false, false, null); channel.exchangeDeclare("dlexchange", "direct", true); channel.queueBind("dlexqueue", "dlexchange", "dlexqueue"); byte[] messageBodyBytes = "我五秒后就会消失".getBytes(); channel.basicPublish("", "testqueue", null, messageBodyBytes); |
这时候运行程序可以看到这里已经有了两个queue,一个是正常的queue一个是负责死信的queue
生产一条消息后会看到 testqueue 内有了一条消息
5秒后 会发现 testqueue 内的消息已被删除,被转发到 dlexqueue队列
Maximum priority
不知不觉到了最后一个,这里先说一个场景,当我们打开社交软件的时候,假如这里同时有十个未读消息,但是其中有一条消息是你的女票发来的,肯定优先会看女票发来的消息,剩下的才根据其重要程度决定查看的顺序。
而Maximnum priority也是为我们的queue内的消息进行分级,根据级别来决定其重要程度。
闲话不多说,直接走代码,这里是发布者端代码,设置了五个级别,5最高, 0最低
1 2 3 4 5 6 7 8 9 10 11 |
Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-max-priority", 5); int i = 0; while(i<5) { i++; byte[] messageBodyBytes = (i + "级别的消息").getBytes(); Builder properties=new BasicProperties.Builder(); properties.priority(i); channel.basicPublish("", "priorityQueue", true, false, properties.build(), messageBodyBytes); } |
直接在WEB UI工具中拿数据,然后看看顺序,可以直观的发现最后发布的一条消息是第一个拿出来的,这就是因为我们设置了级别,而它的优先级是最高的
Lazy Queue
RabbitMQ 3.6.0版本又增加了延迟加载功能,queue中的消息存在磁盘只有在consumer来找它要了,才会加载到内存。
下面是官方文档,可以看到去lazy queue的介绍,lazy queue是在3.6.0版本被引入的,lazy queue的信息尽可能的都保存在磁盘上,仅在消费者请求的时候才会加载到RAM中。
使用默认的queue,并且消息不持久化的话,都是放在RAM中的。当消息峰值的时候,大量的消息在RAM导致rabbitmq服务器压力过大,当RAM使用量到一定数字的时候就会因为压力把数据移到硬盘中,但是不要嗨皮,rabbitmq服务器重启后消息一样会丢失。而且过大的压力可能会出现各种各样的异常情况,这并不是我们想要看到的
使用lazy queue会有以下几种搭配
lazy queue 消息不持久化 , 但是这种模式还是会把消息放到硬盘里,RAM的使用率会一直很稳定,但是重启后一样会丢失消息
lazy queue 消息持久化,这种方式无疑是最佳搭配,消息放到硬盘并且不会因为服务器重启而丢失,面对高并发也是从容不已
设置lazy queue也很简单,下面代码是一个示例,x-queue-model : lazy
1 2 3 4 |
//声明一个懒队列 Map<String, Object> arguments = new HashMap<String, Object>(); arguments.put("x-queue-mode", "lazy"); channel.QueueDeclare("lazyqueue", true, false, false, arguments); |
参考资料:
https://www.cnblogs.com/LiangSW/p/6218886.html
http://www.cnblogs.com/LiangSW/p/6224333.html
https://www.cnblogs.com/LiangSW/p/6231178.html
https://blog.csdn.net/samxx8/article/details/47417133