activeMQ

安装部署:

wget http://archive.apache.org/dist/activemq/apache-activemq/5.9.0/apache-activemq-5.9.0-bin.tar.gz
解压
运行:activemq start
(1)普通启动./activemq start
(2)启动并指定日志文件./activemq start> tmp / smlog
(3)后台启动方式nohup ./activemq start> / tmp / smlog
管理后台为:
http://ip:8161/admin/
连接默认端口61616

安全配置

jetty.xml中
<property name="authenticate" value="true" /> //开启认证
<property name="port" value="8191" /> //修改端口
jetty-realm.properties中修改密码

username: password [,rolename …]
用户名 : 密码 ,角色名
如:admin: admin, admin

依赖

1
2
3
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId> </dependency>

springMVC

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-core</artifactId>
<version>5.7.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${spring.version}</version>
</dependency>

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
#==================activemq Config Start==================
spring.activemq.broker-url=tcp://127.0.0.1:61616?jms.prefetchPolicy.all=2
spring.activemq.in-memory=true
spring.activemq.password=admin
spring.activemq.user=admin
#如果为True,则是Topic;如果是false或者默认,则是queue
spring.jms.pub-sub-domain=false
spring.activemq.packages.trust-all=false
spring.activemq.packages.trusted=
spring.activemq.pool.configuration.*=
spring.activemq.pool.enabled=false
spring.activemq.pool.expiry-timeout=0
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.max-connections=1
#==================activemq Config End ==================

springMVC配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<context:annotation-config />
<amq:connectionFactory id="amqConnectionFactory"
brokerURL="tcp://localhost:61616"
userName="admin"
password="admin" />

<!-- 配置JMS连接工长 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<constructor-arg ref="amqConnectionFactory" />
<property name="sessionCacheSize" value="100" />
</bean>
<!--spring jms 为我们提供的连接池-->
<bean id="connectionFactory" class="org.springframework.jms.connection.SingleConnectionFactory">
<property name="targetConnectionFactory" ref="targetConnectionFactory" />
</bean>
<!--一个队列的目的地,点对点的-->
<bean id="queueDestionation" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="queue" />
</bean>
<!--一个主题,发布订阅-->
<bean id="topicDestionation" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="topic" />
</bean>
<!-- 配置JMS模板(Queue),Spring提供的JMS工具类,它发送、接收消息。 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<property name="connectionFactory" ref="connectionFactory" />
<property name="defaultDestination" ref="demoQueueDestination" />
<property name="receiveTimeout" value="10000" />
<!-- true是topic,false是queue,默认是false,此处显示写出false -->
<property name="pubSubDomain" value="false" />
</bean>

<bean class="com.imooc.jms.producer.ProducerServiceImpl"></bean>
<!--加入监听器 -->
<bean id="queueMessageListener" class="com.qqw.active.QueueMessageListener" />

<!-- 显示注入消息监听容器(Queue),配置连接工厂,监听的目标是demoQueueDestination,监听器是上面定义的监听器 -->
<bean id="queueListenerContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory" />
<property name="destination" ref="queueDestionation" />
<property name="messageListener" ref="queueMessageListener" />
</bean>

监听

1
2
3
4
   @JmsListener(destination="SPEED_DOWN")
public void receive(String msg) {
System.out.println(msg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class QueueMessageListener implements MessageListener {  


//当收到消息后,自动调用该方法
@Override
public void onMessage(Message message) {

TextMessage tm = (TextMessage) message;
try {
System.out.println("QueueMessageListener监听到了文本消息:\t"
+ tm.getText());

Person fromJson = JSON.parseObject(tm.getText(), Person.class);
System.out.println(fromJson.toString());

//do something ...
} catch (JMSException e){ e.printStackTrace();
}
}
}

}

发送

1
2
3
Destination destination = new ActiveMQQueue("SPEED_DOWN");// 这里定义了Queue的key
jsmTemplate.convertAndSend(destination, string+price);
//首先要注入jmsTemplate

springMVC

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
实现类
public class ProducerServiceImpl implements ProducerService {
@Autowired
JmsTemplate jmsTemplate;
@Resource(name = "queueDestionation")
Destination destination;

@Override
public void sendMessage(final String message) {
jmsTemplate.send(destination, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage(message);
System.out.println("发送消息:"+textMessage.getText());
return textMessage;
}
});
}
}

其他

队列模式:生产者先发送消息,消费者后消费消息,消息被平均消费掉
主题模式:消费者先订阅消息,生产者产出的消息才可以被消费者接收到,而且是全部接收到的

lightquant wechat
欢迎您订阅灯塔量化公众号!