RabbitMQ 之Java API解析 一(创建连接)

作者: poxiao 分类: RabbitMQ 发布时间: 2018-03-23 15:10

本系列使用的RabbitMQ 版本为 3.7.4     Java Client 版本为:amqp-client 5.2.0   JDK:1.8

创建连接

由于创建连接需要的参数过多,所以是采用构建工厂来创建连接实例。

属性设置

首先看一下创建所需的参数:

基本参数:

 

这些不用解释,看一下ConnectionFactory的源码:

其中  用户名密码封装成为了 CredentialsProvider

常用属性:

setAutomaticRecoveryEnabled      启用或禁用自动重连。

setNetworkRecoveryInterval         设置多少毫秒重试一次

客户端和RabbitMQ节点之间的网络连接可能失败。RabbitMQ java客户端支持的连接拓扑自动恢复(队列、交流、绑定和消费者)。许多应用程序的自动恢复过程遵循以下步骤:

重新连接

恢复连接侦听器

重开通道

恢复通道侦听器

恢复通道基础、QoS设置、发布服务器确认和事务设置

拓扑恢复包括为每个通道执行的以下操作

重新声明Exchange(除了预定义的)

重新声明队列

恢复所有绑定

恢复所有消费者

作为4.0.0版的java客户端,自动恢复默认是启用的(因此拓扑恢复好)。

使用setNetworkRecoveryInterval    来设置故障后的重连时间 (默认是5秒)

如果创建实例时设置了地址列表:Address[] addrs  则会遍历这个列表

setTopologyRecoveryEnabled       设置是否重新声明交换器,队列等信息。

如果设置为false  则不会重复声明  交换机,队列信息

setChannelRpcTimeout          设置频道RPC通信连接的超时时间

这个默认是10分钟    设置为0的话表示永不超时

setConnectionTimeout        设置TCP连接的超时时间

这个默认是60秒, 设置为0的话表示永不超时

void setSharedExecutor​(ExecutorService executor)  设置消费者Channel使用的线程池

消费时获取connection时候,consumer线程默认自动分配一个 ExceutorService thread pool(默认8个线程)

如果Channel过多需要指定线程池,否则Channel再多也没用  Channel的数量最好和线程池容量一致

但是需要注意的是:当connection shutdown的时候,默认分配的ExecutorService会自动shutdown,用户自己定义的不会shutdown,需要手动shutdown。而且,只用在Consumer回调有严重的性能瓶颈的时候才用这种方式,在平常的使用中,默认的分配更加高效。

setCredentialsProvider​(CredentialsProvider credentialsProvider)        直接设置用户名密码即可,本方法可忽略

 

部分服务器参数属性可以参照RabbitMQ 服务器端配置:

key 描述
tcp_listeners 监听AMQP连接的端口或主机/对。
Default: [5672]
num_tcp_acceptors Erlang进程的数量,接受TCP监听器的连接数。
Default: 10
handshake_timeout 对AMQP 0-8/0-9/0-9-1握手的最大时间(在套接字连接和SSL握手之后),以毫秒为间隔
Default: 10000
ssl_listeners 如上所述,用于SSL连接。
Default: []
num_ssl_acceptors 用于接受SSL监听连接的Erlang进程的数量。
Default: 1
ssl_options SSL配置参数. 详情请看 SSL documentation.Default: []
ssl_handshake_timeout SSL握手超时,以毫秒为间隔。
Default: 5000
vm_memory_high_watermark 触发流控制的内存阈值。详情请看 memory-based flow control.Default: 0.4
vm_memory_high_watermark_paging_ratio 设置当内存使用超过总内存百分比多少时,队列开始将消息持久化到磁盘以释放内存。 详情请看 memory-based flow control.Default: 0.5
disk_free_limit RabbitMQ存储数据的分区的磁盘空间限制。当可用的磁盘空间低于这个限制时,就会触发流控制。值可以相对于RAM的总数设置(例如,内存比例,1.0)。该值也可以设置为整数的字节数。或者,单位(例如“50 mb”)。默认情况下,空闲磁盘空间必须超过50MB。详情请看 Disk Alarms.Default: 50000000
log_levels 控制日志的粒度。该值是一个日志事件类别和日志级别对的列表。
可设置级别:’none”error”warning”info”debug’以上下一层级别的日志输出均包含上层级别日志输出(如: warning包含warning和error), none为不输出日志另外,当前未分类的事件总是记录在日志中The categories are:

  • channel – 所有与AMQP通道有关的事件
  • connection – 对于所有与网络连接有关的事件
  • federation – 对于所有与federation有关的事件
  • mirroring – 对于与镜像队列相关的所有事件

Default: [{connection, info}]

frame_max 框架最大允许大小(以字节为单位)与消费者进行数据交换。设置为0意味着“无限”,但会在一些QPid客户端触发一个bug。

设置更大的值可能会提高吞吐量;

设置较小的值可能会提高延迟。

Default: 131072

channel_max 与消费者进行谈判的最大允许数量。设置为0意味着“无限”。

使用更多的通道会增加代理的内存占用。

Default: 0

channel_operation_timeout 通道操作超时为毫秒(内部使用,由于消息传递协议的差异和限制而不直接暴露于客户机)。
Default: 15000
heartbeat 该值表示服务器在连接中发送的心跳延迟,在几秒钟内。优化框架。如果设置为0,则会禁用心跳。客户端可能不会遵循服务器的建议,请参阅AMQP参考以了解更多细节。

在有大量连接的情况下,禁用心跳可能改善性能,但可能会导致连接在关闭非活动连接的网络设备的出现。

Default: 60 (580 prior to release 3.5.5)

default_vhost 当RabbitMQ创建一个新的数据库时,创建一个虚拟主机。交换amq.rabbitmq.logwill存在于这个虚拟主机中。Default: <<“/”>>
default_user 当RabbitMQ从头创建一个新数据库时,要创建用户名。
Default: <<“guest”>>
default_pass 默认用户的密码。
Default: <<“guest”>>
default_user_tags 默认用户的标记。
Default: [administrator]
default_permissions 在创建时分配给默认用户的权限。
Default:  [<<“.*”>>, <<“.*”>>, <<“.*”>>]
loopback_users 只允许通过环回接口连接到代理的用户列表(即localhost)。 如果您希望允许缺省的来宾用户远程连接,则需要将其更改为 [].

Default:  [<<“guest”>>]

cluster_nodes 当一个节点开始第一次启动时,将它设置为使集群自动发生。元组的第一个元素是节点试图集群到的节点。第二个元素是磁盘或ram,并确定节点类型。
Default: {[], disc}
server_properties 键值对的列表,在连接上向客户端宣布。
Default: []
collect_statistics 统计数据收集模式。主要与管理插件有关。选项有:

  • none (不要发布统计数据)
  • coarse (发出每个队列/每个通道/每个连接统计信息)
  • fine (发出的每条数据)

通常情况下不需要设置该参数
Default: none

collect_statistics_interval 统计数据收集间隔以毫秒为间隔。 主要相关插件 management plugin.Default: 5000
management_db_cache_multiplier 管理插件将缓存诸如队列清单之类的代价较高的查询的时间。缓存将把最后一个查询的运行时间乘以这个值,并在此时间内缓存结果。
Default: 5
auth_mechanisms SASL authentication mechanisms to offer to clients.Default: [‘PLAIN’, ‘AMQPLAIN’]
auth_backends List of authentication and authorisation backends to use.

Other databases than rabbit_auth_backend_internalare available through plugins.

Default: [rabbit_auth_backend_internal]

reverse_dns_lookups 设置为true,让RabbitMQ对客户端连接执行反向DNS查找,并通过rabbitmqctl和管理插件呈现该信息。
Default: false
delegate_count 用于集群内部通信的委托进程的数量。当为多核CPU时可以考虑设置该值
Default: 16
trace_vhosts Used internally by the tracer. 通常情况下不需要设置该参数Default: []
tcp_listen_options 默认的套接字选项。通常情况下不需要设置该参数
Default:

 

hipe_compile 设置为true,使用HiPE预编译RabbitMQ的部分,这是Erlang的即时编译器。这将增加服务器的吞吐量,以增加启动时间的成本。
您可能会看到,在启动时延迟几分钟,您的性能会提高20-50%。这些数据是高度工作负载和硬件依赖的。HiPE支持可能不会编译到您的Erlang安装中。如果不是这样,启用这个选项只会导致一个警告消息被显示,而启动将照常进行。例如,Debian/Ubuntu用户需要安装erlangbase-base-hipe包。HiPE在某些平台上是不可用的,尤其是Windows。HiPE在17.5之前就已经知道了erlangp/otp版本的问题。HiPE推荐使用最新的erlangp/otp版本Default: false
cluster_partition_handling 如何处理网络分区。可用模式:

  • ignore
  • pause_minority
  • {pause_if_all_down, [nodes], ignore | autoheal}
    (例: [‘rabbit@node1’, ‘rabbit@node2’])
  • autoheal

详情请看documentation on partitions
Default: ignore

cluster_keepalive_interval 节点应该多频繁地将keepalive消息发送到其他节点(以毫秒为单位)。请注意,这与netticktime不一样; 错过的keepalive消息不会导致节点被认为挂机。
Default: 10000
queue_index_embed_msgs_below 在消息的字节数中,消息将被直接嵌入到队列索引中。详情请看 persister tuning
Default: 4096
msg_store_index_module 用于队列索引的实现模块。 详情请看 persister tuning
Default: rabbit_msg_store_ets_index
backing_queue_module 队列内容的实现模块。通常情况下不需要设置该参数
Default: rabbit_variable_queue
msg_store_file_size_limit Tunable value for the persister.  通常情况下不需要设置该参数Default: 16777216
mnesia_table_loading_retry_limit 在等待集群中的Mnesia tables可用时,需要重试的次数。
Default: 10
mnesia_table_loading_retry_timeout 在集群中等待每个重试的时间,以便可用
Default: 30000
queue_index_max_ journal_entries Tunable value for the persister.  通常情况下不需要设置该参数Default: 65536
queue_master_locator Queue master定位策略

可用策略:

  • <<“min-masters”>>
  • <<“client-local”>>
  • <<“random”>>

详情请看 documentation on queue master location
Default: <<“client-local”>>

lazy_queue_explicit_gc_run_operation_threshold 调优: 只有在内存压力下有延迟队列时。

这是触发垃圾收集器和其他内存减少活动的阈值。一个低的值可以降低性能,一个高的值可以提高性能,但是会导致更高的内存消耗。通常情况下不需要设置该参数

Default: 1000

queue_explicit_gc_run_operation_threshold 调优: 在内存压力较大时。

这是触发垃圾收集器和其他内存减少活动的阈值。一个低的值可以降低性能,一个高的值可以提高性能,但是会导致更高的内存消耗。通常情况下不需要设置该参数

Default: 1000

 

其他属性:

void setChannelShouldCheckRpcResponseType​(boolean channelShouldCheckRpcResponseType)     是否校验RPC相应类型
void setClientProperties​(Map<String,Object> clientProperties)   设置客户端基本属性,没啥用

void setErrorOnWriteListener​(ErrorOnWriteListener errorOnWriteListener)     套接字写入IO异常监听器
void setExceptionHandler​(ExceptionHandler exceptionHandler)         创建连接异常处理器

void setRecoveryDelayHandler​(RecoveryDelayHandler recoveryDelayHandler)    延时自动重连处理器
void setHeartbeatExecutor​(ScheduledExecutorService executor)      设置心跳发送处理器

void setShutdownExecutor​(ExecutorService executor)   服务关闭时的处理器

void setMetricsCollector​(MetricsCollector metricsCollector)
void setNioParams​(NioParams nioParams)

void setSaslConfig​(SaslConfig saslConfig)

void setShutdownTimeout​(int shutdownTimeout)
void setSocketConfigurator​(SocketConfigurator socketConfigurator)    设置套接字配置器。
void setSocketFactory​(SocketFactory factory)     设置套接字工厂实例
void setSslContextFactory​(SslContextFactory sslContextFactory)
void setThreadFactory​(ThreadFactory threadFactory)    设置多线程工厂
void setUri​(String uriString)     设置连接字符串
void setUri​(URI uri)     设置连接URI
void setWorkPoolTimeout​(int workPoolTimeout)

其他功能

useBlockingIo​()      采用阻塞式交互模型

useNio​()               采用非阻塞式交互模型

useSslProtocol​  使用SSL协议以及协议设置

 

Connection  API

Connection提供的API不多:

close    关闭连接

abort​    此方法实现并不优雅,是直接调用的close

isOpen  连接是否打开

createChannel   创建频道

addBlockedListener   连接被阻塞时dejianting

addShutdownListener  节点关闭时的回调

 

 

参考资料:

http://www.rabbitmq.com/api-guide.html

https://www.cnblogs.com/zhen-rh/p/6884297.html

https://blog.csdn.net/weinianjie1/article/details/50611379

本文链接:RabbitMQ 之Java API解析 一(创建连接)

转载声明:本站文章若无特别说明,皆为原创,转载请注明来源:破晓(http://www.code2048.net),谢谢!^^


如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!

发表评论

电子邮件地址不会被公开。 必填项已用*标注