筆者不才,根據小弟的經驗覺得使用rabbitMQ進行RPC調研不太妥當,需要他能夠實現跨語言,但是對於整體來說使用消息隊列服務進行RPC調用,通過RabbitMQ的事務來確定消息已經成功處理完畢,然後通過消息隊列服務的reply隊列返回處理結果。總覺得差點什麼,或者你跟我一樣發現了一些問題。第一如何處理分布式事務,這個的確有點費解,這個後面在spring和JPA的時候再去說吧。第二個問題也是我還沒有弄懂的一個問題,就是如何做到多線程並發處理。
為什麼我會提出這個問題,因為根據rabbitMQ的消息隊列處理機制當中,消息隊列在消息沒有ack之前他是不會繼續分發消息的,所以同一時間內你只能處理一條消息,也代表你處理的消息是線性的。再看WEB程序,大家都清楚在Servlet 被調用的時候會新開一條線程,然後在獨立的線程去調用serivce 或 dao 等等的業務,也證明了WEB服務是多線程去處理的,而非線性的,總不能一個人在調用這個鏈接之後,下一個用戶也調用這個鏈接就卡住那裡等著吧?所以對於消息隊列這種線性的處理方式,我們應該怎麼去做到並發呢?方法有能多種,設置成no_ack 然後每次接收到消息之後開啟性的線程去處理。第二種方法也是比較簡單的開多幾個channel,這樣就形成多個消費者了。不過以上兩個想法,只是我想象而已,也沒有實踐過,不過今天在寫這個東西的時候,我打算實踐一下。而且我會在這個筆記當中記錄RabbitMQ 的RPC 調用方式。
本人參考的文檔為官網關於RPC的介紹:
http://www.rabbitmq.com/tutorials/tutorial-six-java.html
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
CentOS7環境安裝使用專業的消息隊列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡
本次我開發所使用的語言為JAVA,在動手寫代碼之前先說明幾個重點問題。
在rabbitMQ上實現RPC,與之前的訂閱消息和生產消息非常相似,唯一不一樣的是RPC需要消息處理的放回結果,當然有時你不需要結果,你只在乎是否執行成功而已,但是這裡所涉及的問題是我們怎樣能做到當消息發送到隊列之後,開始等待直到消息處理完成後返回處理結果後再進行執行其他處理。
引用一下官方的圖,真心畫的不錯。
在到這個圖基本上都清晰了。
1. client發起消息,然後帶上一個唯一的correlation_id,同時在reply_to中提供一個獨立的隊列。官方一般建議使用默認獨立。後面會有代碼~
2. 攜帶好correlation_id 和 reply_to 之後將消息發布。
3. 然後server 獲得消息,並處理消息。
4. 得出的處理結果通過 reply_to 中的隊列 返回消息 並攜帶上 消息中攜帶的的correlation_id。
5. client 監聽 reply_to 隊列,當獲得消息後,判斷消息中correlation_id 是否與請求時發出的一致,如果一致就證明該消息是這個業務的處理結果。如果不一致就證明消息不是你的啦~ 別碰人家的東西!
貼出官方的summary:
Client 如下:
package com.maxfunner.rpc;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
import java.util.UUID;
/**
* Created by Tony on 2016/11/3.
*/
public class Client {
public String sayHelloToServer(String username) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交換器名稱
String queueName = "rpc_queue"; //隊列名稱
String routingKey = "rpc_key"; //路由鍵
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd"); //創建鏈接
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定義交換器
channel.queueDeclare(queueName, false, false, false, null); //定義隊列
channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列
String callbackQueue = channel.queueDeclare().getQueue(); //獲得匿名的 獨立的 默認隊列
String correlationId = UUID.randomUUID().toString(); //產生一個 關聯ID correlationID
QueueingConsumer consumer = new QueueingConsumer(channel); // 創建一個消費者對象
channel.basicConsume(callbackQueue,true,consumer); //消費消息
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() //創建消息屬性
.correlationId(correlationId) //攜帶唯一的 correlationID
.replyTo(callbackQueue).build(); //攜帶callback 回調的隊列路由鍵
channel.basicPublish(exchangeName,routingKey,basicProperties, SerializationUtils.serialize(username)); //發布消息
String response = null;
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //循環獲得消息
if(correlationId.equals(delivery.getProperties().getCorrelationId())){ //匹配correlationID是否與發出去的correlation的ID一直
response = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得處理結果
break;
}
}
channel.close();
connection.close();
//關閉鏈接
return response;
}
public static void main(String[] args) throws IOException, InterruptedException {
Client client = new Client();
String response = client.sayHelloToServer("TONY");
System.out.println("server response : " + response);
}
}
Server 代碼如下:
package com.maxfunner.rpc;
import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;
import java.io.IOException;
/**
* Created by Tony on 2016/11/3.
*/
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交換器名稱
String queueName = "rpc_queue"; //隊列名稱
String routingKey = "rpc_key"; //路由鍵
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
Connection connection = factory.newConnection(); //創建鏈接
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定義交換器
channel.queueDeclare(queueName, false, false, false, null); //定義隊列
channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列
QueueingConsumer consumer = new QueueingConsumer(channel); //創建一個消費者
channel.basicConsume(queueName,true,consumer); //消費消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //獲得一條消息
String correlationID = delivery.getProperties().getCorrelationId(); //獲得額外攜帶的correlationID
String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵
String body = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得請求的內容
String responseMsg = "welcome " + body; //處理返回內容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息時攜帶 請求時傳過來的correlationID
.build();
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果
}
}
}
運行結果:
server response : welcome TONY
Process finished with exit code 0
最後看一個問題:如果這樣去進行RPC調用,由於消息隊列是線性去處理的,所以關鍵的問題是在幾個點上【 auto_ack | channel&&exclusive 】:
事實上我是錯的。看看下面貼出的代碼大家都懂了。auto_ack 可以設置成true 不過重點還是在 channel 那個默認獨立的 匿名隊列,原來每次channel調用都是不一樣的routingkey 嚇死寶寶了~
看代碼:
public class Client {
private static Connection connection = null;
private static ConnectionFactory factory = null;
public Connection getConnection() throws IOException {
if (connection == null) {
factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd"); //創建鏈接
connection = factory.newConnection();
}
return connection;
}
public String sayHelloToServer(String username) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交換器名稱
String queueName = "rpc_queue"; //隊列名稱
String routingKey = "rpc_key"; //路由鍵
Channel channel = getConnection().createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定義交換器
channel.queueDeclare(queueName, false, false, false, null); //定義隊列
channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列
String callbackQueue = channel.queueDeclare().getQueue(); //獲得匿名的 獨立的 默認隊列
String correlationId = UUID.randomUUID().toString(); //產生一個 關聯ID correlationID
QueueingConsumer consumer = new QueueingConsumer(channel); // 創建一個消費者對象
channel.basicConsume(callbackQueue, true, consumer); //消費消息
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder() //創建消息屬性
.correlationId(correlationId) //攜帶唯一的 correlationID
.replyTo(callbackQueue).build(); //攜帶callback 回調的隊列路由鍵
channel.basicPublish(exchangeName, routingKey, basicProperties, SerializationUtils.serialize(username)); //發布消息
String response = null;
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //循環獲得消息
System.out.println("delivery >>>>[user:"+ username +"] >> routingKey : " + callbackQueue);
if (correlationId.equals(delivery.getProperties().getCorrelationId())) { //匹配correlationID是否與發出去的correlation的ID一直
response = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得處理結果
break;
}
}
channel.close();
//關閉鏈接
return response;
}
public static void main(String[] args) throws IOException, InterruptedException {
List<String> usernameList = new ArrayList<String>();
usernameList.add("TONY_A");
usernameList.add("TONY_B");
usernameList.add("TONY_C");
usernameList.add("TONY_D");
usernameList.add("TONY_E");
usernameList.add("TONY_F");
usernameList.add("TONY_G");
for (int i = 0; i < usernameList.size(); i++) {
final String username = usernameList.get(i);
new Thread(new Runnable() {
public void run() {
Client client = new Client();
String response = null;
try {
response = client.sayHelloToServer(username);
} catch (IOException e) {
e.printStackTrace();
response = "ERROR!!!";
} catch (InterruptedException e) {
e.printStackTrace();
response = "ERROR!!!";
}
System.out.println("server response : " + response);
}
}).start();
}
}
}
服務器代碼:
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交換器名稱
String queueName = "rpc_queue"; //隊列名稱
String routingKey = "rpc_key"; //路由鍵
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
Connection connection = factory.newConnection(); //創建鏈接
Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定義交換器
channel.queueDeclare(queueName, false, false, false, null); //定義隊列
channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列
QueueingConsumer consumer = new QueueingConsumer(channel); //創建一個消費者
channel.basicConsume(queueName,true,consumer); //消費消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //獲得一條消息
String correlationID = delivery.getProperties().getCorrelationId(); //獲得額外攜帶的correlationID
String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵
String body = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得請求的內容
Thread.sleep(2000);
String responseMsg = "welcome " + body; //處理返回內容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息時攜帶 請求時傳過來的correlationID
.build();
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果
}
}
}
運行之後嚇死你~
delivery >>>>[user:TONY_A] >> routingKey : amq.gen-8IIKmpbatOaxi4qpk2iq1g
server response : welcome TONY_A
delivery >>>>[user:TONY_F] >> routingKey : amq.gen-DoLdGmIjpyQ58rDM1S-0Bw
server response : welcome TONY_F
delivery >>>>[user:TONY_D] >> routingKey : amq.gen-Xa-fQW2uUXuwcp9PXI8gzg
server response : welcome TONY_D
delivery >>>>[user:TONY_E] >> routingKey : amq.gen-RZcnSdVx4SWCsZRQD6Umcw
server response : welcome TONY_E
delivery >>>>[user:TONY_G] >> routingKey : amq.gen-ab8Wy9fCks5TuyETmZ0jFQ
server response : welcome TONY_G
delivery >>>>[user:TONY_C] >> routingKey : amq.gen-rhKc2IVr3VHpL0vmNxraPA
server response : welcome TONY_C
delivery >>>>[user:TONY_B] >> routingKey : amq.gen-DWYsJva6D7MAFdiVZacr-g
server response : welcome TONY_B
可以看到每一個channel的默認匿名的隊列routingKey都是不一樣的。所以不存在我所想的白癡問題。
我還白癡到畫了一個圖,這是我本以為是這樣的:
這是事實:
通過上面的實驗線性執行的,但是可以將處理的業務開一個線程去處理。就好像這樣。
服務器的代碼改改:
public class Server {
public static void main(String[] args) throws IOException, InterruptedException {
String exchangeName = "rpc_exchange"; //交換器名稱
String queueName = "rpc_queue"; //隊列名稱
String routingKey = "rpc_key"; //路由鍵
ConnectionFactory factory = new ConnectionFactory();
factory.setVirtualHost("test");
factory.setHost("192.168.0.21");
factory.setPort(5673);
factory.setUsername("tony");
factory.setPassword("tonypwd");
final Connection connection = factory.newConnection(); //創建鏈接
final Channel channel = connection.createChannel();
channel.exchangeDeclare(exchangeName, "direct", false, false, null); //定義交換器
channel.queueDeclare(queueName, false, false, false, null); //定義隊列
channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列
QueueingConsumer consumer = new QueueingConsumer(channel); //創建一個消費者
channel.basicConsume(queueName,true,consumer); //消費消息
while (true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery(); //獲得一條消息
final String correlationID = delivery.getProperties().getCorrelationId(); //獲得額外攜帶的correlationID
final String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵
final String body = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得請求的內容
new Thread(new Runnable() {
public void run() {
Channel channel = null;
try {
channel = connection.createChannel();
} catch (IOException e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
String responseMsg = "welcome " + body; //處理返回內容
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.correlationId(correlationID) //返回消息時攜帶 請求時傳過來的correlationID
.build();
try {
channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果
} catch (IOException e) {
e.printStackTrace();
}
}
}).start();
}
}
}
當然我只是單純的新增了一下線程,你可以做成線程池等等去處理性能上的問題。
關於RabbitMQ的RPC就介紹就結束啦~~~~