博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
ActiveMQ(七)_集群负载配置
阅读量:4041 次
发布时间:2019-05-24

本文共 6775 字,大约阅读时间需要 22 分钟。

主要注意2.4使用场景的情况

1  概述

简单的说,就是通过把多个不同的broker实例连接在一起,作为一个整体对外提供服务,从而提高整体对外的消息服务能力。

通过这种方式连接在一起的broker实例之间,可以共享队列和消费者列表,从而达到分布式队列的目的。

 

2  自定义安装

2.1  部署结构

Machine1:192.168.1.106

Machine2:192.168.1.107

 

2.2  安装步骤

安装步骤如下:

1、解压;

[html]   
 
  1. tar -zxvf apache-activemq-5.9.0-bin.tar.gz  

 

2、相关配置,编辑/conf/activemq.xml配置文件,

(采用staticBroker-Cluster配置方式实现):

可以参考官网:

 

1)  设置数据回流配置:

<policyEntries>节点下新增:

[html]   
 
  1. <!--属性enableAudit=false,是防止消息在回流后被当做重复消息而不被转发 -->   
  2.                 <policyEntryqueuepolicyEntryqueue=">" producerFlowControl="false"memoryLimit="10mb" enableAudit="false">   
  3.                      <!-- 属性replayWhenNoConsumers=true,保证在该节点断开,并重启后,且consumers已经连接到另外一个节点上的情况下,消息自动回流到原始节点-->   
  4.                     <networkBridgeFilterFactory>   
  5.                   <conditionalNetworkBridgeFilterFactoryreplayWhenNoConsumersconditionalNetworkBridgeFilterFactoryreplayWhenNoConsumers="true"/>   
  6.                </networkBridgeFilterFactory>   
  7.                 </policyEntry>  


 

2)  在Machine1上设置对Machine2的引用

配置多个用逗号隔开host

<networkConnectoruri="static:(tcp://host1:61616,tcp://host2:61616)" duplex="true"/>

or

        <networkConnectors>

           <networkConnectoruri="static:(tcp://192.168.1.107:61617)" duplex="true"  />

       </networkConnectors>

 

       <transportConnectors>

            <!-- DOSprotection, limit concurrent connections to 1000 and frame size to 100MB -->

            <transportConnectorname="openwire"uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

            …

       </transportConnectors>

                                           

3) 在Machine2上设置对Machine1的引用

        <networkConnectors>

           <networkConnectoruri="static:(tcp://192.168.1.106:61616)"  duplex="true"  />

       </networkConnectors>

 

        <transportConnectors>

            <!-- DOS protection, limitconcurrent connections to 1000 and frame size to 100MB -->

            <transportConnectorname="openwire"uri="tcp://0.0.0.0:61617?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>

            …

        </transportConnectors>

 

 

 

2、启动:

执行/bin目录下activemq

分别启动Machine1和Machine2上的activemq

[html]   
 
  1. ./activemq start &  


 

 

3、查看启动后的日志:

 

如下日志显示,集群间Cluster相互连接,表示集群BrokerCluster集群配置成功。

 

 

2.3     属性说明

2.3.1 networkConnector配置参数

[html]   
 
  1. <networkConnectors>  
  2.      <networkConnectorurinetworkConnectoruri="static:(tcp://host1:61616,tcp://host2:61616,tcp://..)"/>  
  3. </networkConnectors>  


 

URI的几个属性:

属性

默认值

描述

initialReconnectDelay

1000

重连之前等待的时间(ms) (如果useExponentialBackOff false)

maxReconnectDelay

30000

重连之前等待的最大时间(ms)

useExponentialBackOff

true

每次重连失败时是否增大等待时间

backOffMultiplier

2

增大等待时间的系数

 

 

networkConnector参数属性

 

属性

默认值

描述

name

bridge

名称

dynamicOnly

false

如果为true, 持久订阅被激活时才创建对应的网路持久订阅。默认是启动时激活。

decreaseNetworkConsumerPriority

false

如果为true,网络的消费者优先级降低为-5。如果为false,则默认跟本地消费者一样为0.

networkTTL

1

消息和订阅在网络上通过的broker数量

conduitSubscriptions

true

多个网络消费者是否被当做一个消费者来对待。

excludedDestinations

empty

不通过网络转发的destination

dynamicallyIncludedDestinations

empty

通过网络转发的destinations,注意空列表代表所有的都转发。

staticallyIncludedDestinations

empty

匹配的都将通过网络转发-即使没有对应的消费者

duplex

false

如果为true,则既可消费又可生产消息到网络broker

prefetchSize

1000

设置网络消费者的参数。必须大于0,因为网络消费者不能自己轮询消息。

suppressDuplicateQueueSubscriptions

false

(5.3版本开始如果为true, 重复的订阅关系一产生即被阻止。

bridgeTempDestinations

true

是否广播advisory messages来创建临时destination

alwaysSyncSend

false

( 5.6版本开始如果为true,非持久化消息也将使用request/reply方式代替oneway方式发送到远程broker

staticBridge

false

(5.6版本开始如果为true,只有staticallyIncludedDestinations中配置的destination可以被处理。

 

 

2.4     场景说明

一个很有意思的场景是,broker1和broker2通过networkConnector连接。一些个consumers连接到broker1,消费broker2上的消息。消息先被broker1从broker2上消费掉,然后转发给这些consumers。不幸的是转发部分消息的时候broker1重启了,这些consumers发现broker1连接失败,通过failover连接到broker2上去了,但是有一部分他们还没有消费的消息被broker2已经分发到了broker1上去了。这些消息,就好像是消失了,除非有消费者重新连接到broker1上来消费。怎么办呢?

办法就是从5.6版本destinationPolicy上新增的选项replayWhenNoConsumers。这个选项使得broker1上有需要转发的消息但是没有消费者时,把消息回流到它原始的broker。同时把enableAudit设置为false,为了防止消息回流后被当做重复消息而不被分发。

 

3       Cluster原理

networkConnector的实现原理是基于ActiveMQ的公告消息(AdvisoryMessage)机制的(参见)。当broker2通过networkConnectorduplex方式指向broker1时,发生了什么呢?

假定broker1已经启动,这时候broker2开始启动。

1.         broker2先启动自己的connector

2.         然后使用一个vmconnector,创建一个connection,把自己作为一个client,连接到broker1

3.         通过订阅AdvisoryMessage,拿到相互的Consumer与相应的Queue列表。

至此,双方建立关系。

 

4       集群场景测试

4.1  集群测试

1、使用客户端发送队列数据,

 

1)在生产者上连接61616发送数据;

2)在消费者上连接61617进行接收数据;

 

 

Product代码:

[html]   
 
  1. public class Producer {  
  2.    
  3.     private static final String BROKER_URL = "tcp://192.168.1.106:61616";  
  4.    
  5.     private static final Boolean NON_TRANSACTED = false;  
  6.     private static final int NUM_MESSAGES_TO_SEND = 100;  
  7.     private static final long DELAY = 100;  
  8.    
  9.     public static void main(String[] args) {  
  10.         String url = BROKER_URL;  
  11.         if (args.length > 0) {  
  12.             url = args[0].trim();  
  13.         }  
  14.         ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("admin", "admin", url);  
  15.         Connection connection = null;  
  16.         try {  
  17.             connection = connectionFactory.createConnection();  
  18.             connection.start();  
  19.             Session session =connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);  
  20.             Destination destination =session.createQueue("test-queue");  
  21.             MessageProducer producer =session.createProducer(destination);  
  22.             for (int i = 0; i < NUM_MESSAGES_TO_SEND; i++) {  
  23.                 TextMessage message =session.createTextMessage("Message#" + i);  
  24.                 System.out.println("Sending message #" + i);  
  25.                 producer.send(message);  
  26.                 Thread.sleep(DELAY);  
  27.             }  
  28.             producer.close();  
  29.             session.close();  
  30.    
  31.         } catch (Exception e) {  
  32.             System.out.println("Caught exception!");  
  33.         }  
  34.         finally {  
  35.             if (connection != null) {  
  36.                 try {  
  37.                     connection.close();  
  38.                 } catch (JMSException e) {  
  39.                     System.out.println("Could not close an open connection...");  
  40.                 }  
  41.             }  
  42.         }  
  43.     }  
  44. }  


 

 

Consumer代码:

[html]   
 
  1. public class Consumer {  
  2.    
  3. // private static final String BROKER_URL ="tcp://192.168.1.106:61616";  
  4.     private static final String BROKER_URL = "tcp://192.168.1.107:61617";  
  5.    
  6.     private static final Boolean NON_TRANSACTED = false;  
  7.     private static final long TIMEOUT = 20000;  
  8.    
  9.     public static void main(String[] args) {  
  10.         String url = BROKER_URL;  
  11.         if (args.length > 0) {  
  12.             url = args[0].trim();  
  13.         }  
  14.         System.out.println("\nWaiting to receive messages... will timeout after " + TIMEOUT / 1000 +"s");  
  15.         ActiveMQConnectionFactoryconnectionFactory = new ActiveMQConnectionFactory("admin", "admin", url);  
  16.         Connection connection = null;  
  17.         try {  
  18.             connection = connectionFactory.createConnection();  
  19.             connection.start();  
  20.             Session session =connection.createSession(NON_TRANSACTED, Session.AUTO_ACKNOWLEDGE);  
  21.             Destination destination =session.createQueue("test-queue");  
  22.             MessageConsumer consumer =session.createConsumer(destination);  
  23.             int i = 0;  
  24.             while (true) {  
  25.                 Message message =consumer.receive(TIMEOUT);  
  26.    
  27.                 if (message != null) {  
  28.                     if (message instanceof TextMessage) {  
  29.                         String text =((TextMessage) message).getText();  
  30.                         System.out.println("Got " + i++ + ". message: " + text);  
  31.                     }  
  32.                 } else {  
  33.                     break;  
  34.                 }  
  35.             }  
  36.             consumer.close();  
  37.             session.close();  
  38.         } catch (Exception e) {  
  39.             System.out.println("Caught exception!");  
  40.         }  
  41.         finally {  
  42.             if (connection != null) {  
  43.                 try {  
  44.                     connection.close();  
  45.                 } catch (JMSException e) {  
  46.                     System.out.println("Could not close an open connection...");  
  47.                 }  
  48.             }  
  49.         }  
  50.     }  
  51. }  


 

 

2、发送队列消息后,查看activeMQ网页信息:

 

可以通过网页查看到已经存储了队列信息:

 

3、通过消费者取集群中另外一台机上的队列数据,可以取到

至此,ActiveMQ集群配置完成

转载地址:http://blog.csdn.net/vtopqx/article/details/51787780

 

你可能感兴趣的文章
媒体广告业如何将内容资产进行高效地综合管理与利用
查看>>
能源化工要怎么管控核心数据
查看>>
制药医疗使用云盘能带来什么样的好处
查看>>
媒体广告业如何运用云盘提升效率
查看>>
企业如何运用企业云盘进行数字化转型-实现新发展
查看>>
司法如何运用电子智能化加快现代化建设
查看>>
设计行业运用企业云盘能带来什么样的变化
查看>>
如何运用企业云盘助力企业数字化新发展
查看>>
企业云盘可以在哪些行业发光发热
查看>>
为什么汽车制造业需要企业云盘
查看>>
企业云盘和旅游行业碰撞在一起会产生怎样的火花
查看>>
医疗制药企业要怎么进一步进行系统的管理
查看>>
企业云盘如何让能源电力行业乘上数字化发展列车
查看>>
企业云盘为什么说是互联网软件公司的好帮手
查看>>
企业云盘为媒体广告业打造一站式文件管理协作平台
查看>>
教育行业推动校园信息化建设的重中之重
查看>>
咨询服务行业如何利用专用工具提升自我价值
查看>>
智慧与安全共济共同服务公共事业
查看>>
是谁在背后默默支撑教育行业加速进程教育信息化2.0
查看>>
企业云盘为司法行业注入电子数据化新动力
查看>>