key | 描述 | ||
---|---|---|---|
tcp_listeners | 监听AMQP连接的端口或主机/对。 Default: [5672] |
||
num_tcp_acceptors | Erlang进程的数量,接受TCP监听器的连接数。 Default: 10 |
||
handshake_timeout | 对AMQP 0-8/0-9/0-9-1握手的最大时间(在套接字连接和SSL握手之后),以毫秒为间隔 Default: 10000 |
||
ssl_listeners | 如上所述,用于SSL连接。 Default: [] |
||
num_ssl_acceptors | 用于接受SSL监听连接的Erlang进程的数量。 Default: 1 |
||
ssl_options | SSL配置参数. 详情请看 SSL documentation.Default: [] | ||
ssl_handshake_timeout | SSL握手超时,以毫秒为间隔。 Default: 5000 |
||
vm_memory_high_watermark | 触发流控制的内存阈值。详情请看 memory-based flow control.Default: 0.4 | ||
vm_memory_high_watermark_paging_ratio | 设置当内存使用超过总内存百分比多少时,队列开始将消息持久化到磁盘以释放内存。 详情请看 memory-based flow control.Default: 0.5 | ||
disk_free_limit | RabbitMQ存储数据的分区的磁盘空间限制。当可用的磁盘空间低于这个限制时,就会触发流控制。值可以相对于RAM的总数设置(例如,内存比例,1.0)。该值也可以设置为整数的字节数。或者,单位(例如“50 mb”)。默认情况下,空闲磁盘空间必须超过50MB。详情请看 Disk Alarms.Default: 50000000 | ||
log_levels | 控制日志的粒度。该值是一个日志事件类别和日志级别对的列表。 可设置级别:’none”error”warning”info”debug’以上下一层级别的日志输出均包含上层级别日志输出(如: warning包含warning和error), none为不输出日志另外,当前未分类的事件总是记录在日志中The categories are:
Default: [{connection, info}] |
||
frame_max | 框架最大允许大小(以字节为单位)与消费者进行数据交换。设置为0意味着“无限”,但会在一些QPid客户端触发一个bug。
设置更大的值可能会提高吞吐量; 设置较小的值可能会提高延迟。 Default: 131072 |
||
channel_max | 与消费者进行谈判的最大允许数量。设置为0意味着“无限”。
使用更多的通道会增加代理的内存占用。 Default: 0 |
||
channel_operation_timeout | 通道操作超时为毫秒(内部使用,由于消息传递协议的差异和限制而不直接暴露于客户机)。 Default: 15000 |
||
heartbeat | 该值表示服务器在连接中发送的心跳延迟,在几秒钟内。优化框架。如果设置为0,则会禁用心跳。客户端可能不会遵循服务器的建议,请参阅AMQP参考以了解更多细节。
在有大量连接的情况下,禁用心跳可能改善性能,但可能会导致连接在关闭非活动连接的网络设备的出现。 Default: 60 (580 prior to release 3.5.5) |
||
default_vhost | 当RabbitMQ创建一个新的数据库时,创建一个虚拟主机。交换amq.rabbitmq.logwill存在于这个虚拟主机中。Default: <<“/”>> | ||
default_user | 当RabbitMQ从头创建一个新数据库时,要创建用户名。 Default: <<“guest”>> |
||
default_pass | 默认用户的密码。 Default: <<“guest”>> |
||
default_user_tags | 默认用户的标记。 Default: [administrator] |
||
default_permissions | 在创建时分配给默认用户的权限。 Default: [<<“.*”>>, <<“.*”>>, <<“.*”>>] |
||
loopback_users | 只允许通过环回接口连接到代理的用户列表(即localhost)。 如果您希望允许缺省的来宾用户远程连接,则需要将其更改为 [].
Default: [<<“guest”>>] |
||
cluster_nodes | 当一个节点开始第一次启动时,将它设置为使集群自动发生。元组的第一个元素是节点试图集群到的节点。第二个元素是磁盘或ram,并确定节点类型。 Default: {[], disc} |
||
server_properties | 键值对的列表,在连接上向客户端宣布。 Default: [] |
||
collect_statistics | 统计数据收集模式。主要与管理插件有关。选项有:
通常情况下不需要设置该参数 |
||
collect_statistics_interval | 统计数据收集间隔以毫秒为间隔。 主要相关插件 management plugin.Default: 5000 | ||
management_db_cache_multiplier | 管理插件将缓存诸如队列清单之类的代价较高的查询的时间。缓存将把最后一个查询的运行时间乘以这个值,并在此时间内缓存结果。 Default: 5 |
||
auth_mechanisms | SASL authentication mechanisms to offer to clients.Default: [‘PLAIN’, ‘AMQPLAIN’] | ||
auth_backends | List of authentication and authorisation backends to use.
Other databases than rabbit_auth_backend_internalare available through plugins. Default: [rabbit_auth_backend_internal] |
||
reverse_dns_lookups | 设置为true,让RabbitMQ对客户端连接执行反向DNS查找,并通过rabbitmqctl和管理插件呈现该信息。 Default: false |
||
delegate_count | 用于集群内部通信的委托进程的数量。当为多核CPU时可以考虑设置该值 Default: 16 |
||
trace_vhosts | Used internally by the tracer. 通常情况下不需要设置该参数Default: [] | ||
tcp_listen_options | 默认的套接字选项。通常情况下不需要设置该参数 Default:
|
||
hipe_compile | 设置为true,使用HiPE预编译RabbitMQ的部分,这是Erlang的即时编译器。这将增加服务器的吞吐量,以增加启动时间的成本。 您可能会看到,在启动时延迟几分钟,您的性能会提高20-50%。这些数据是高度工作负载和硬件依赖的。HiPE支持可能不会编译到您的Erlang安装中。如果不是这样,启用这个选项只会导致一个警告消息被显示,而启动将照常进行。例如,Debian/Ubuntu用户需要安装erlangbase-base-hipe包。HiPE在某些平台上是不可用的,尤其是Windows。HiPE在17.5之前就已经知道了erlangp/otp版本的问题。HiPE推荐使用最新的erlangp/otp版本Default: false |
||
cluster_partition_handling | 如何处理网络分区。可用模式:
详情请看documentation on partitions |
||
cluster_keepalive_interval | 节点应该多频繁地将keepalive消息发送到其他节点(以毫秒为单位)。请注意,这与netticktime不一样; 错过的keepalive消息不会导致节点被认为挂机。 Default: 10000 |
||
queue_index_embed_msgs_below | 在消息的字节数中,消息将被直接嵌入到队列索引中。详情请看 persister tuning Default: 4096 |
||
msg_store_index_module | 用于队列索引的实现模块。 详情请看 persister tuning Default: rabbit_msg_store_ets_index |
||
backing_queue_module | 队列内容的实现模块。通常情况下不需要设置该参数 Default: rabbit_variable_queue |
||
msg_store_file_size_limit | Tunable value for the persister. 通常情况下不需要设置该参数Default: 16777216 | ||
mnesia_table_loading_retry_limit | 在等待集群中的Mnesia tables可用时,需要重试的次数。 Default: 10 |
||
mnesia_table_loading_retry_timeout | 在集群中等待每个重试的时间,以便可用 Default: 30000 |
||
queue_index_max_ journal_entries | Tunable value for the persister. 通常情况下不需要设置该参数Default: 65536 | ||
queue_master_locator | Queue master定位策略
可用策略:
详情请看 documentation on queue master location |
||
lazy_queue_explicit_gc_run_operation_threshold | 调优: 只有在内存压力下有延迟队列时。
这是触发垃圾收集器和其他内存减少活动的阈值。一个低的值可以降低性能,一个高的值可以提高性能,但是会导致更高的内存消耗。通常情况下不需要设置该参数 Default: 1000 |
||
queue_explicit_gc_run_operation_threshold | 调优: 在内存压力较大时。
这是触发垃圾收集器和其他内存减少活动的阈值。一个低的值可以降低性能,一个高的值可以提高性能,但是会导致更高的内存消耗。通常情况下不需要设置该参数 Default: 1000 |
RabbitMQ 之Java API解析 一(创建连接)
本系列使用的RabbitMQ 版本为 3.7.4 Java Client 版本为:amqp-client 5.2.0 JDK:1.8
创建连接
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
ConnectionFactory factory = new ConnectionFactory(); factory.setHost(config.getProperty("cache.host"));// MQ的IP factory.setPort(Integer.parseInt(config.getProperty("cache.port")));// MQ端口 factory.setUsername(config.getProperty("cache.user"));// MQ用户名 factory.setPassword(config.getProperty("cache.password"));// MQ密码 factory.setVirtualHost(config.getProperty("cache.virtualHost")); ExecutorService service = Executors.newFixedThreadPool(10); factory.setSharedExecutor(service); // 设置自动恢复 factory.setAutomaticRecoveryEnabled(true); factory.setNetworkRecoveryInterval(10000);// 设置 没10s ,重试一次 factory.setTopologyRecoveryEnabled(false);// 设置不重新声明交换器,队列等信息。 Connection connection = factory.newConnection(); |
由于创建连接需要的参数过多,所以是采用构建工厂来创建连接实例。
属性设置
首先看一下创建所需的参数:
基本参数:
1 2 3 4 5 |
factory.setHost("114.215.71.92");// MQ的IP factory.setPort(5672);// MQ端口 factory.setUsername("poxiao");// MQ用户名 factory.setPassword("123456");// MQ密码 factory.setVirtualHost("/"); // MQ虚拟机 |
这些不用解释,看一下ConnectionFactory的源码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
/** Default user name */ public static final String DEFAULT_USER = "guest"; /** Default password */ public static final String DEFAULT_PASS = "guest"; /** Default virtual host */ public static final String DEFAULT_VHOST = "/"; /** The default host */ public static final String DEFAULT_HOST = "localhost"; /** 'Use the default port' port */ public static final int USE_DEFAULT_PORT = -1; private String virtualHost = DEFAULT_VHOST; private String host = DEFAULT_HOST; private int port = USE_DEFAULT_PORT; private CredentialsProvider credentialsProvider = new DefaultCredentialsProvider(DEFAULT_USER, DEFAULT_PASS); |
其中 用户名密码封装成为了 CredentialsProvider
常用属性:
setAutomaticRecoveryEnabled 启用或禁用自动重连。
setNetworkRecoveryInterval 设置多少毫秒重试一次
客户端和RabbitMQ节点之间的网络连接可能失败。RabbitMQ java客户端支持的连接拓扑自动恢复(队列、交流、绑定和消费者)。许多应用程序的自动恢复过程遵循以下步骤:
重新连接
恢复连接侦听器
重开通道
恢复通道侦听器
恢复通道基础、QoS设置、发布服务器确认和事务设置
拓扑恢复包括为每个通道执行的以下操作
重新声明Exchange(除了预定义的)
重新声明队列
恢复所有绑定
恢复所有消费者
作为4.0.0版的java客户端,自动恢复默认是启用的(因此拓扑恢复好)。
使用setNetworkRecoveryInterval 来设置故障后的重连时间 (默认是5秒)
如果创建实例时设置了地址列表:Address[] addrs 则会遍历这个列表
setTopologyRecoveryEnabled 设置是否重新声明交换器,队列等信息。
如果设置为false 则不会重复声明 交换机,队列信息
setChannelRpcTimeout 设置频道RPC通信连接的超时时间
这个默认是10分钟 设置为0的话表示永不超时
setConnectionTimeout 设置TCP连接的超时时间
这个默认是60秒, 设置为0的话表示永不超时
void setSharedExecutor(ExecutorService executor) 设置消费者Channel使用的线程池
消费时获取connection时候,consumer线程默认自动分配一个 ExceutorService thread pool(默认8个线程)
如果Channel过多需要指定线程池,否则Channel再多也没用 Channel的数量最好和线程池容量一致
1 2 |
ExecutorService service = Executors.newFixedThreadPool(500); factory.setSharedExecutor(service); |
但是需要注意的是:当connection shutdown的时候,默认分配的ExecutorService会自动shutdown,用户自己定义的不会shutdown,需要手动shutdown。而且,只用在Consumer回调有严重的性能瓶颈的时候才用这种方式,在平常的使用中,默认的分配更加高效。
setCredentialsProvider(CredentialsProvider credentialsProvider) 直接设置用户名密码即可,本方法可忽略
部分服务器参数属性可以参照RabbitMQ 服务器端配置:
其他属性:
void setChannelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType) 是否校验RPC相应类型
void setClientProperties(Map<String,Object> clientProperties) 设置客户端基本属性,没啥用
void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) 套接字写入IO异常监听器
void setExceptionHandler(ExceptionHandler exceptionHandler) 创建连接异常处理器
void setRecoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler) 延时自动重连处理器
void setHeartbeatExecutor(ScheduledExecutorService executor) 设置心跳发送处理器
void setShutdownExecutor(ExecutorService executor) 服务关闭时的处理器
void setMetricsCollector(MetricsCollector metricsCollector)
void setNioParams(NioParams nioParams)
void setSaslConfig(SaslConfig saslConfig)
void setShutdownTimeout(int shutdownTimeout)
void setSocketConfigurator(SocketConfigurator socketConfigurator) 设置套接字配置器。
void setSocketFactory(SocketFactory factory) 设置套接字工厂实例
void setSslContextFactory(SslContextFactory sslContextFactory)
void setThreadFactory(ThreadFactory threadFactory) 设置多线程工厂
void setUri(String uriString) 设置连接字符串
void setUri(URI uri) 设置连接URI
void setWorkPoolTimeout(int workPoolTimeout)
其他功能
useBlockingIo() 采用阻塞式交互模型
useNio() 采用非阻塞式交互模型
useSslProtocol 使用SSL协议以及协议设置
Connection API
Connection提供的API不多:
close 关闭连接
abort 此方法实现并不优雅,是直接调用的close
isOpen 连接是否打开
createChannel 创建频道
addBlockedListener 连接被阻塞时dejianting
addShutdownListener 节点关闭时的回调
参考资料:
http://www.rabbitmq.com/api-guide.html
https://www.cnblogs.com/zhen-rh/p/6884297.html
https://blog.csdn.net/weinianjie1/article/details/50611379