ActiveMQ内存配置
ActiveMQ的内核是Java编写的,也就是说如果服务端没有Java运行环境ActiveMQ是无法运行的。ActiveMQ启动时,启动脚本使用wrapper包装器来启动JVM。JVM相关的配置信息在启动目录的“wrapper.conf”配置文件中。可以通过改变其中的配置项,设置JVM的初始内存大小和最大内存大小。
ActiveMQ每一个服务节点都是一个独立的进程。在ActiveMQ主配置文件中,可以找到一个systemUsage标记,定义如下:
systemUsage:该标记用于设置整个ActiveMQ节点在进程级别的内存容量大小设置。其中可设置的属性包括:
- sendFailIfNoSpaceAfterTimeout:当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余容量了,那么就会等待一段时间(这里设置的毫秒数),如果超过这个等待时间ActiveMQ仍然没有可用的容量,那么就拒绝接收这条消息并在消息的发送端抛出javax.jms.ResourceAllocationException异常;
- sendFailIfNoSpace,当ActiveMQ收到一条消息时,如果ActiveMQ这时已经没有多余容量了,就直接拒绝这条消息(不用等待一段时间),并在消息的发送端抛出javax.jms.ResourceAllocationException异常。
memoryUsage:该子标记设置整个ActiveMQ节点的“可用内存限制”。这个值不能超过上文中设置的JVM maxmemory的值。其中的percentOfJvmHeap属性表示使用“百分数值”进行设置,除了这个属性以外,您还可以使用limit属性进行固定容量授权,例如:limit=”1000 mb”。这些内存容量将供所有队列使用。
storeUsage:该标记设置整个ActiveMQ节点,用于存储“持久化消息”的“可用磁盘空间”。该子标记的limit属性必须要进行设置。在使用KahaDB方案或者LevelDB方案进行PERSISTENT Message持久化存储时,这个storeUsage属性都会起作用;但是如果使用数据库存储方案,这个属性就不会起作用了。
tempUsage:在ActiveMQ 5.X+ 版本中,一旦ActiveMQ服务节点存储的消息达到了memoryUsage的限制,NON_PERSISTENT Message就会被转储到 temp store区域。虽然我们说过NON_PERSISTENT Message不进行持久化存储,但是ActiveMQ为了防止“数据洪峰”出现时NON_PERSISTENT Message大量堆积致使内存耗尽的情况出现,还是会将NON_PERSISTENT Message写入到磁盘的临时区域——temp store。这个子标记就是为了设置这个temp store区域的“可用磁盘空间限制”。注意storeUsage和tempUsage并不是“最大可用空间”,而是一个阀值。
ActiveMQ消息持久化
ActiveMQ的持久化机制包含JDBC,KahaDB、LevelDB,ActiveMQ默认持久化是采用KahaDB 机制 ,如图:
使用JDBC持久化
<persistenceAdapter>
<!-- <kahaDB directory="${activemq.data}/kahadb"/> -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="false" />
</persistenceAdapter>
createTablesOnStartup默认值是true,即每次ActiveMQ启动的时候都重新创建数据表,一般是首次启动设置为true,之后设置为false。
同时,在broker标签外设置bean
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://127.0.0.1:3306/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
将mysql的jdbc jar包放置到activemq的lib目录下。同时,将commons-pool.jar、commons-dbcp.jar和commons-collections.jar放置到activemq的lib目录下。
重启ActiveMQ,进入data目录查看启动日志
通过日志可以发现,创建activemq_acks表的时候抛异常了
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Specified key was too long; max key length is 767 bytes
异常原因
数据库表采用utf8编码,其中varchar(255)的column进行了唯一键索引
而mysql默认情况下单个列的索引不能超过767位(不同版本可能存在差异)
于是utf8字符编码下,255*3 byte 超过限制了
解决方法:
数据库需要字符集设置为latin1 。
新建了一个数据库,字符集设为Latin1 ,然后启动ActiveMq,查看数据库,发现多了三张表:
消息持久化测试
ActiveMQ管理台地址 http://127.0.0.1:8161/admin/index.jsp 默认账号密码:admin/admin
执行Producer生产消息
package com.dsy.producer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author sanya deng
* @version 1.0.0
* @Title: Producer
* @ProjectName mq
* @Description: TODO
* @email forestsancy@163.com
* @date 2019-04-15 17:37
*/
public class Producer {
public static void main(String[] args) throws Exception{
//1.创建ConnectionFactory对象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616");
//2.创建一个Connection并开启
Connection connection = connectionFactory.createConnection();
connection.start();
//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式
//参数设置生产者使用事务、客户端(消费者)签收方式
Session session = connection.createSession(Boolean.TRUE, Session.CLIENT_ACKNOWLEDGE);
//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic
Queue destination = session.createQueue("test_queue");
//5.创建消息的生产者
MessageProducer messageProducer=session.createProducer(null);
//6.设置生产者的消息持久化与非持久化特性
//messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
//7.选择需要的JMS消息格式,创建并发送消息,此处选择的是TextMessage字符串对象
TextMessage textMessage=session.createTextMessage();
textMessage.setText("生产者"+"activemq消息测试");
//messageProducer.send(textMessage);
//第3个参数:是否持久化;第4个参数:优先级(0~4普通 5~9加急);第5个参数:消息在ActiveMQ中间件中存放的有效期
messageProducer.send(destination, textMessage,DeliveryMode.PERSISTENT, 4, 1000*60*10);
//使用事务,必须有commit操作
session.commit();
//8.释放Connection
if(null!=connection){
connection.close();
}
}
}
查看控制台:
因为使用了JDBC持久化方式,数据库会创建3个表:activemq_msgs,activemq_acks和activemq_lock。activemq_msgs用于存储消息,Queue和Topic都存储在这个表中。
执行Consumer消费者
package com.dsy.consumer;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
/**
* @author sanya deng
* @version 1.0.0
* @Title: Consumer
* @ProjectName mq
* @Description: TODO
* @email forestsancy@163.com
* @date 2019-04-16 10:30
*/
public class Consumer {
public static void main(String[] args) throws Exception{
//1.创建ConnectionFactory对象
ConnectionFactory connectionFactory=new ActiveMQConnectionFactory(
"admin",
"admin",
"tcp://localhost:61616");
//2.创建一个Connection并开启
Connection connection=connectionFactory.createConnection();
connection.start();
//3.创建Session会话,用来接收消息,通过参数可以设置:是否启用事务、消息签收模式
Session session=connection.createSession(Boolean.FALSE, Session.CLIENT_ACKNOWLEDGE);
//4.创建Destination对象。在点对点模式中,该对象被称为Queue;在发布订阅模式中,该对象被称为Topic
Destination destination=session.createQueue("test_queue");
//5.创建消息的消费者
MessageConsumer messageConsumer=session.createConsumer(destination);
//6.消费者从消息中间件的Queue获取消息
while(true){
TextMessage textMessage=(TextMessage) messageConsumer.receive();
if(null==textMessage){
break;
}
System.out.println("消费者接收到的内容:"+textMessage.getText());
//手动确认签收
textMessage.acknowledge();
}
//7.释放Connection
if(null!=connection){
connection.close();
}
}
}
控制台输出:
因为消息已经被消费掉,再次查看mysql数据库中的activemq_msgs表,发现消息已经不存在了。