环境:
低配也需要3台服务器!!!!3台
3台服务器 centos7 jdk1.6
zookeeper安装包3.14 activemq安装包 5.9
jdk和activemq安装包对比图!!!一定要看!!!!
采用zookeeper去管理activemq 必须是5.9开始的版本!!!只有5.9之后才支持!!!
作者踩过的坑 采用了activemq5.11的安装包 启动不了 查看jdk版本太低
采用5.8 的安装包 无法解析xml levelDB 等错误信息
apache-activemq-5.0.0 1.5.0_12 1.5+
apache-activemq-5.1.0 1.5.0_12 1.5+
apache-activemq-5.2.0 1.5.0_15 1.5+
apache-activemq-5.3.0 1.5.0_17 1.5+
apache-activemq-5.4.0 1.5.0_19 1.5+
apache-activemq-5.5.0 1.6.0_23 1.6+
apache-activemq-5.6.0 1.6.0_26 1.6+
apache-activemq-5.7.0 1.6.0_33 1.6+
apache-activemq-5.8.0 1.6.0_37 1.6+
apache-activemq-5.9.0 1.6.0_51 1.6+
apache-activemq-5.10.0 1.7.0_12-ea 1.7+
apache-activemq-5.11.0 1.7.0_60 1.7+
apache-activemq-5.12.0 1.7.0_80 1.7+
apache-activemq-5.13.0 1.7.0_80 1.7+
apache-activemq-5.14.0 1.7.0_80 1.7+
apache-activemq-5.15.0 1.8.0_112 1.8+
这里先介绍一下acviteMQ的 3种集群模式
1、默认的单机部署(kahadb)
2、基于zookeeper的主从(levelDB Master/Slave)
3、基于共享数据库的主从(Shared JDBC Master/Slave)
作者采用 的是第二种采用zookeeper实现管理activitemq集群
zookeeper集群配置
先安装zookeeper集群
作者习惯的安装目录是local
[root@localhost local]# tar -zxvf zookeeper-**.tar.gz
[root@localhost local]# mv zookeeper-** zookeeper ##取一个简单点的名字
[root@localhost local]# cd zookeeper/conf 进入zookeeper的conf配置
[root@localhost conf]# cp zoo_sample.conf zoo.cfg 默认采用zoo_sample.conf 2个文件只能选择一个
[root@localhost conf]# rm -rf zoo_sample.conf 删除自带的zoo_sample.conf
[root@localhost conf]# vim zoo.conf
添加如下内容 有几个服务就写几个
server.0=master:2888:3888 ##master在这里是因为作者电脑配置了免密所以这样 可以写成ip
server.1=slave:2888:3888 ##slave在这里是因为作者电脑配置了免密所以这样 可以写成ip
server.2=slave:2888:3888
里面默认的clinet的端口是2181 如果有需要也可以进行修改
保存退出
[root@localhost conf]#vim ../data/myid 修改data目录的myid
0
保存退出
开始配置环境变量
export ZOOKEEPER_HME=/usr/local/zookeeper
export PATH=$PATH:$ZOOKEEPER/bin
保存退出
source /etc/profile
进行分发操作把A的zookeeper配置分发到B服务器 需要修改myid的参数值不能重复
./zkServer start 分别启动2台服务器的zookeeper服务
./zkServer status 查看启动是否成功 一个领队 一个跟随者
A
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
B
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
zookeeper集群配置到这里就OK了
activeMQ集群配置
作者的安装包已经上传了
tar -zxvf apache-activemq-5.9.0 #解压
cd activitmq/conf #进入conf目录 修改配置文件
brokerName的名称需要修改 如果是在2台服务器上那么就无所谓了
配置activite集群 最少需要3台服务器 采用zookeeper管理只有brokerName一致才会被认为是集群
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="master" dataDirectory="${activemq.data}">
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb
replicas="3"
bind="tcp://0.0.0.0:51121" 集群当中的通讯端口
zkAddress="127.0.0.1:2181,127.0.0.1:2182" zookeeper集群的地址
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>
这里的重要的一个属性 需要注意
replicas : 集群中的节点数【(replicas/2)+1公式表示集群中至少要正常运行的服务数量】, 3台集群那么允许1台宕机, 另外两台要正常运行 这个参数值设置有误则会出现以下提示 提示数量不足:无法确定master
Not enough cluster members connected to elect a master.
配置完毕 采用scp -r 分发命令 分发到B C服务器
分别启动两台服务器的activitmq服务 打开防火墙的8161端口 默认的clinet端口 提供给客户一个web操作界面
进行测试是否成功
启动3台服务器的mq服务
打开防火墙8161端口 3个ip 只能打开一个 因为只有一个master主机
关闭master主机 剩下2台会再推出一台。继续访问该链接还可以访问那么就配置成功了
关闭master 的服务
再通过该节点访问 如果还能访问该链接 那么配置集群成功
测试代码
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
//发送消息
public class P2PSender {
private static final String QUEUE = "client1-to-client2";
public static void main(String[] args) throws Exception {
//1. 建立一个ConnectionFactory. 默认tcp://0.0.0.0:61616
String userName = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String brokerURL = "failover:(tcp://192.168.99.1:61616,tcp://192.168.99.2:61616,tcp:/192.168.99.3:61616)?Randomize=false";
//作者这里采用的默认配置 也就是有密码 默认activitemq密码是 admin rabbitmq的默认的是guest
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", brokerURL );
//2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session session = connection.createSession( false, Session.CLIENT_ACKNOWLEDGE );//手工签收--常用
//4. 通过Session创建Destination对象
Destination destination = session.createQueue( "foodQueue" );
//5. 通过Session创建发送或接受对象
MessageProducer messageProducer = session.createProducer( null );
//7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据,用MessageProducer来发送
for (int i = 1; i < 500; i++) {
TextMessage message = session.createTextMessage();
message.setText( "好了 测试成功了" + i );
//参数:目标,消息,传递数据的模式,优先级,消息的过期时间
messageProducer.send( destination, message, DeliveryMode.NON_PERSISTENT, 0, 1000 * 60 );
//System.out.println("生产者:" + message.getText());
}
//使用事务要手动提交
//session.commit();
//8. 关闭连接
connection.close();
}
}
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class P2PReceiver {
private static final String QUEUE = "client1-to-client2";
public static void main(String[] args) throws Exception {
//1. 建立一个ConnectionFactory. tcp://0.0.0.0:61616
String userName = ActiveMQConnectionFactory.DEFAULT_USER;
String password = ActiveMQConnectionFactory.DEFAULT_PASSWORD;
String brokerURL = "failover:(tcp://192.168.99.1:61616,tcp://192.168.99.2:61616,tcp://192.168.99.3:61616)?Randomize=false";
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory( "admin", "admin", brokerURL );
//2. 通过ConnectionFactory建立一个Connection连接,并且调用start方法开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3. 通过Connection创建Session,用于接收消息[第一个参数:是否启用事务;第二个参数:设置签收模式]
Session session = connection.createSession( false, Session.AUTO_ACKNOWLEDGE );
//4. 通过Session创建Destination对象
Destination destination = session.createQueue( "foodQueue" );
//5. 通过Session创建发送或接受对象
MessageConsumer messageConsumer = session.createConsumer( destination );
//7. 使用JMS规范里面消息类型之一 TextMessage 来创建数据
while (true) {
TextMessage message = (TextMessage) messageConsumer.receive();
if (message == null) break;
System.out.println( "消费者:" + message.getText() );
}
//8. 关闭连接
connection.close();
}
}