MessageQueue是分布式的系統裡經常要用到的組件,一般來說,當需要把消息跨網段、跨集群的分發出去,就可以用這個。一些典型的示例就是:
1、集群A中的消息需要發送給多個機器共享;
2、集群A中消息需要主動推送,但彼此的網絡不是互通的(如集群A只有過HA才能被外界訪問);
當然上面的幾個點,除了用MQ還有其它實現方式,但是MQ無疑是非常適合用來做這些事的。眾多MQ中,ActiveMQ是比較有名氣也很穩定的,它發送消息的成本非常廉價,支持Queue與Topic兩種消息機制。本文主要就是講如何在Spring環境下配置此MQ:
1、場景假設
現有機器兩台Server、Worker需要進行異步通信,另有一台ActiveMQ機器,關於MQ的配置信息存放在Zookeeper中,Zookeeper的節點有:
- /mq/activemq/ip:mq的機器ip
-/mq/activemq/port:這是mq的機器端口
2、Server的Spring XML配置
Server主要的工作就是接受Worker消息,並發送消息給Worker。主要是定義了連接MQ的連接池,接受Worker消息的隊列worker,發送消息給Worker的隊列server:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms" xmlns:p="http://www.springframework.org/schema/p" xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.1.xsd
http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms-4.1.xsd">
<!-- ActiveMQ連接池 -->
<bean id="conFactory" class="org.apache.activemq.pool.PooledConnectionFactory">
<property name="connectionFactory">
<bean class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL">
<bean class="lekko.mq.util.MQPropertiesFactory" factory-method="getUrl" />
</property>
<property name="closeTimeout" value="60000" />
<!-- <property name="userName" value="admin" /> -->
<!-- <property name="password" value="admin" /> -->
<!-- <property name="optimizeAcknowledge" value="true" /> -->
<property name="optimizedAckScheduledAckInterval" value="10000" />
</bean>
</property>
</bean>
<!-- Worker任務消息 -->
<bean id="taskWorkerTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="worker_topic" />
</bean>
<!-- 任務監聽容器 -->
<bean id="taskWorkerContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="conFactory" />
<property name="destination" ref="taskWorkerTopic" />
<property name="messageListener">
<bean class="lekko.mq.task.TaskWorkerListener" />
</property>
<property name="pubSubDomain" value="true" />
</bean>
<!-- Server任務消息 -->
<bean id="taskServerTopic" class="org.apache.activemq.command.ActiveMQTopic">
<constructor-arg value="server_topic" />
</bean>
<!-- 任務消息發送模板 -->
<bean id="taskServerTemplate" class="org.springframework.jms.core.JmsTemplate" p:connectionFactory-ref="conFactory" p:defaultDestination-ref="taskServerTopic" />
</beans>
一段一段地分析,ActiveMQ連接池這裡,定義了連接的bean為“conFactory”,其中broberURL屬性是通過後台Java代碼的靜態方法來設置的,方便線上環境通過Java代碼動態地切換,稍後會介紹這塊代碼,你現在需要知道的是,它實際上返回的就是一個字符串,格式像:tcp://xxx.xxx.xxx.xxx:port,如果不要用後台來管理連接信息,直接改成“<property name="brokerURL" value="tcp://xxx.xxx.xxx.xxx:port">”也是OK的。
接下來,便是Worker消息隊列的定義,這裡定義為“taskWorkerTopic”,類型是org.apache.activemq.command.ActiveMQTopic,(訂閱模式)它表示一個消息可以被多個機器收到並處理,其它的還有org.apache.activemq.command.ActiveMQQueue,(點對點模式)表示一個消息只能被一台機器收到,當收到後消息就出隊列了,其它機器無法處理。它們都有一個構造參數constructor-arg,指定了消息隊列的名稱,一個MQ中一個消息隊列的名字是唯一的。
Worker的消息隊列定義好了之後,就是接受Worker的裡消息了,這裡定義了“taskWorkerContainer”,其屬性分別定義了連接池、目標隊列、消息處理器(我們自己的Java類,後面再講),參數pubSubDomain用於指定是使用訂閱模式還是使用點對點模式,如果是ActiveMQTopic則要設置為true,默認是false。
好了,Server現在已經可以通過自己定義的“lekko.mq.task.TaskWorkerListener”類接受並處理taskWorkerTopic的消息了。
如法炮制,定義一個專門用於往Worker裡發消息的隊列“taskServerTopic”,並定義發送消息的模板“taskServerTemplate”備用。
3、Server端的接收類與發送類
lekko.mq.task.TaskWorkerListener便是一個接收類示例:
package lekko.mq.task;
import javax.jms.Message;
import javax.jms.MessageListener;
import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.log4j.Logger;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;
/**
* Task消息監聽類
* @author lekko
*/
@Service
public class TaskWorkerListener implements MessageListener {
private Logger _logger = Logger.getLogger(TaskWorkerListener.class);
@Override
public void onMessage(Message message) {
if (message instanceof ActiveMQObjectMessage) {
ActiveMQObjectMessage aMsg = (ActiveMQObjectMessage) message;
try {
onMessage((MessageModel) aMsg.getObject());
} catch (Exception e) {
_logger.warn("Message:${} is not a instance of MessageModel.", e);
}
} else {
_logger.warn("Message:${} is not a instance of ActiveMQObjectMessage.");
}
}
/**
* 處理消息
* @param message 自定義消息實體
*/
public void onMessage(MessageModel message) { ... }
}
這裡給大家演示的並不是最基礎的知識,處理的消息是一個自定義的類“lekko.mq.model.MessageModel”,這個類怎麼寫可以隨便整,反正就是一些你要傳遞的數據字段,但是記得要實現Serializable接口。如果你需要傳遞的僅僅是純字符串,那麼直接在代碼的23行片,把message.toString()即可。這個類通過前面XML配置會處理來自“worker_topic”隊列中的消息。
再就是發送類,實際上就是把前面的taskServiceTemplate拿來用就行了:
package lekko.mq.task;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
import lekko.mq.model.MessageModel;
/**
* 服務器任務消息分發
* @author lekko
*/
@Service
public class TaskServerSender {
@Autowired
@Qualifier("taskServerTemplate")
private JmsTemplate jmsTemplate;
/**
* 發送消息
*/
public void sendMessage(MessageModel msg) {
jmsTemplate.convertAndSend(msg);
}
}
把這個類TaskServerSender注入到任意需要用到的地方,調用sendMessage方法即可。它會往前面定義的“server_topic”中塞消息,等Worker來取。
4、關於Zookeeper配置MQ連接信息
Worker端的配置我這裡不再闡述,因為它跟在Server端的配置太相像,區別就在於Server端是從worker_topic中取消息,往server_topic中寫消息;而Worker端的代碼則是反過來,往worker_topic中寫消息,從server_topic中取消息。
那麼如何使用Java代碼來控制ActiveMQ的配置消息呢:
package lekko.mq.util;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
* 獲取MQ配置
* @author lekkoli
*/
public class MQPropertiesFactory {
private static boolean isLoaded = false;
private static String ZOOKEEPER_CLUST = "xxx.xxx.xxx.xxx:2181";
private static ZooKeeper _zk;
private static String _ip;
private static String _port;
private static String getProperty(String path) throws Exception {
if (_zk == null) {
if (ZOOKEEPER_CLUST == null) {
throw new Exception("Zookeeper, Host \"" + ZOOKEEPER_CLUST + "\" is null!");
}
_zk = new ZooKeeper(ZOOKEEPER_CLUST, 90000, null);
}
Stat s = _zk.exists(path, false);
if (s != null)
return new String(_zk.getData(path, false, s));
throw new Exception("Zookeeper, Path \"" + path + "\" is not exist!");
}
private static void load() throws Exception {
if (!isLoaded) {
_ip = getProperty("/mq/activemq/ip");
_port = getProperty("/mq/activemq/port");
isLoaded = true;
}
}
public static String getUrl() throws Exception {
load();
StringBuilder failover = new StringBuilder();
String[] ips = _ip.split(";"), ports = _port.split(";");
for (int i = 0; i < ips.length; ++i) {
failover.append("tcp://").append(ips[i]).append(":").append(ports[i]).append(",");
}
failover.setLength(failover.length() - 1);
String failovers = failover.toString();
if (ips.length > 1) {
failovers = "failover:(" + failovers + ")";
}
return failovers;
}
}
上面的代碼需要解釋的地方跟MQ相關的不多,主要就是如果是mq集群,則格式是:failover:(tcp://192.168.1.117:1001,tcp://192.168.1.118:1001,tcp://xxx.xxx.xxx.xxx:port)。其它上面代碼沒有對Zookeeper集群都掛了的情況,做應急連接方案。當然,無論如何本節都不是全文的重點,但是多學一技何嘗不可?
推薦閱讀:
Linux系統下ActiveMQ 安裝 http://www.linuxidc.com/Linux/2012-03/55623.htm
Ubuntu下的ACTIVEMQ服務器 http://www.linuxidc.com/Linux/2008-07/14587.htm
CentOS 6.5啟動ActiveMQ報錯解決 http://www.linuxidc.com/Linux/2015-08/120898.htm
Spring+JMS+ActiveMQ+Tomcat實現消息服務 http://www.linuxidc.com/Linux/2011-10/44632.htm
Linux環境下面ActiveMQ端口號設置和WEB端口號設置 http://www.linuxidc.com/Linux/2012-01/51100.htm
ActiveMQ 的詳細介紹:請點這裡
ActiveMQ 的下載地址:請點這裡