歡迎來到Linux教程網
Linux教程網
Linux教程網
Linux教程網
您现在的位置: Linux教程網 >> UnixLinux >  >> Linux編程 >> Linux編程

使用RabbitMQ的RPC

筆者不才,根據小弟的經驗覺得使用rabbitMQ進行RPC調研不太妥當,需要他能夠實現跨語言,但是對於整體來說使用消息隊列服務進行RPC調用,通過RabbitMQ的事務來確定消息已經成功處理完畢,然後通過消息隊列服務的reply隊列返回處理結果。總覺得差點什麼,或者你跟我一樣發現了一些問題。第一如何處理分布式事務,這個的確有點費解,這個後面在spring和JPA的時候再去說吧。第二個問題也是我還沒有弄懂的一個問題,就是如何做到多線程並發處理。

為什麼我會提出這個問題,因為根據rabbitMQ的消息隊列處理機制當中,消息隊列在消息沒有ack之前他是不會繼續分發消息的,所以同一時間內你只能處理一條消息,也代表你處理的消息是線性的。再看WEB程序,大家都清楚在Servlet 被調用的時候會新開一條線程,然後在獨立的線程去調用serivce 或 dao 等等的業務,也證明了WEB服務是多線程去處理的,而非線性的,總不能一個人在調用這個鏈接之後,下一個用戶也調用這個鏈接就卡住那裡等著吧?所以對於消息隊列這種線性的處理方式,我們應該怎麼去做到並發呢?方法有能多種,設置成no_ack 然後每次接收到消息之後開啟性的線程去處理。第二種方法也是比較簡單的開多幾個channel,這樣就形成多個消費者了。不過以上兩個想法,只是我想象而已,也沒有實踐過,不過今天在寫這個東西的時候,我打算實踐一下。而且我會在這個筆記當中記錄RabbitMQ 的RPC 調用方式。

本人參考的文檔為官網關於RPC的介紹:

http://www.rabbitmq.com/tutorials/tutorial-six-java.html

CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm

RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm

用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm

RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm

CentOS7環境安裝使用專業的消息隊列產品RabbitMQ http://www.linuxidc.com/Linux/2016-11/13673.htm

在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm

RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm

RabbitMQ入門教程  http://www.linuxidc.com/Linux/2015-02/113983.htm

RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡

本次我開發所使用的語言為JAVA,在動手寫代碼之前先說明幾個重點問題。

在RabbitMQ 實現RPC的原理

在rabbitMQ上實現RPC,與之前的訂閱消息和生產消息非常相似,唯一不一樣的是RPC需要消息處理的放回結果,當然有時你不需要結果,你只在乎是否執行成功而已,但是這裡所涉及的問題是我們怎樣能做到當消息發送到隊列之後,開始等待直到消息處理完成後返回處理結果後再進行執行其他處理。

引用一下官方的圖,真心畫的不錯。

在到這個圖基本上都清晰了。
1. client發起消息,然後帶上一個唯一的correlation_id,同時在reply_to中提供一個獨立的隊列。官方一般建議使用默認獨立。後面會有代碼~
2. 攜帶好correlation_id 和 reply_to 之後將消息發布。
3. 然後server 獲得消息,並處理消息。
4. 得出的處理結果通過 reply_to 中的隊列 返回消息 並攜帶上 消息中攜帶的的correlation_id。
5. client 監聽 reply_to 隊列,當獲得消息後,判斷消息中correlation_id 是否與請求時發出的一致,如果一致就證明該消息是這個業務的處理結果。如果不一致就證明消息不是你的啦~ 別碰人家的東西!

貼出官方的summary:

  1. When the Client starts up, it creates an anonymous exclusive callback queue.【當Client運行時,創建一個匿名的 獨立的 回調隊列】
  2. For an RPC request, the Client sends a message with two properties: replyTo, which is set to the callback queue and correlationId, which is set to a unique value for every request. 【client 發送的消息 會攜帶兩個參數是為了 RPC調用的,replyTo 是設置回調的隊列,correlation 是 唯一的數值,不同的請求就不同的correlationID】
  3. The request is sent to an rpc_queue queue. [發送消息到隊列當中]
  4. The RPC worker (aka: server) is waiting for requests on that queue. When a request appears, it does the job and sends a message with the result back to the Client, using the queue from the replyTo field.【worker 即是服務器 等待隊列中的消息,當請求出現時,開始干活同時通過replyTo字段中的隊列發送處理結果】
  5. The client waits for data on the callback queue. When a message appears, it checks the correlationId property. If it matches the value from the request it returns the response to the application.【client 等待數據返回的隊列,當有消息出現時,檢查corrleationID 參數,如果和之前發出的correlationid吻合,就將結果返回到應用程序】

實現代碼如下

Client 如下:

package com.maxfunner.rpc;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;
import java.util.UUID;

/**
 * Created by Tony on 2016/11/3.
 */
public class Client {


    public String sayHelloToServer(String username) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交換器名稱
        String queueName = "rpc_queue";     //隊列名稱
        String routingKey = "rpc_key";  //路由鍵

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");     //創建鏈接

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();   

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定義交換器

        channel.queueDeclare(queueName, false, false, false, null); //定義隊列

        channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列

        String callbackQueue = channel.queueDeclare().getQueue();   //獲得匿名的 獨立的 默認隊列

        String correlationId = UUID.randomUUID().toString();    //產生一個 關聯ID correlationID

        QueueingConsumer consumer = new QueueingConsumer(channel);  // 創建一個消費者對象

        channel.basicConsume(callbackQueue,true,consumer);      //消費消息

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()   //創建消息屬性 
                .correlationId(correlationId)   //攜帶唯一的 correlationID
                .replyTo(callbackQueue).build();    //攜帶callback 回調的隊列路由鍵

        channel.basicPublish(exchangeName,routingKey,basicProperties, SerializationUtils.serialize(username));  //發布消息


        String response = null;

        while(true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();   //循環獲得消息

            if(correlationId.equals(delivery.getProperties().getCorrelationId())){  //匹配correlationID是否與發出去的correlation的ID一直

                response = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得處理結果

                break;
            }

        }


        channel.close();

        connection.close();


        //關閉鏈接

        return response;

    }


    public static void main(String[] args) throws IOException, InterruptedException {

        Client client = new Client();

        String response = client.sayHelloToServer("TONY");

        System.out.println("server response : " + response);

    }


}

Server 代碼如下:

package com.maxfunner.rpc;

import com.rabbitmq.client.*;
import org.apache.commons.lang.SerializationUtils;

import java.io.IOException;

/**
 * Created by Tony on 2016/11/3.
 */
public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交換器名稱
        String queueName = "rpc_queue";     //隊列名稱
        String routingKey = "rpc_key";  //路由鍵

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        Connection connection = factory.newConnection();    //創建鏈接

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定義交換器

        channel.queueDeclare(queueName, false, false, false, null); //定義隊列

        channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //創建一個消費者

        channel.basicConsume(queueName,true,consumer);  //消費消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //獲得一條消息

            String correlationID  = delivery.getProperties().getCorrelationId();    //獲得額外攜帶的correlationID

            String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵

            String body = (String) SerializationUtils.deserialize(delivery.getBody());  //獲得請求的內容

            String responseMsg = "welcome " + body; //處理返回內容

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationID)   //返回消息時攜帶 請求時傳過來的correlationID
                    .build();

            channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果

        }



    }



}

運行結果:

server response : welcome TONY

Process finished with exit code 0

最後看一個問題:如果這樣去進行RPC調用,由於消息隊列是線性去處理的,所以關鍵的問題是在幾個點上【 auto_ack | channel&&exclusive 】:

  • auto_ack :上面的程序當中我將auto_ack設置成true,每次消息到達後我都會去確認消息已經收到,當確認後隊列就會刪除該消息,如果在等待消費返回的時候有其他channel發出的請求處理結果,然而這個correlationID和目前channel發出的correlation並不一致,但是這消息被你給確認了,所以正在需要獲得返回結果的channel就與處理結果無緣了。
  • channel && exclusive:雖然exclusive 是獨立的只供一個應用程序使用,但是多個channel之間是可以使用同一個獨立隊列的。所以如果多個線程去使用這個隊列去消費消息的話,rabbitMQ會循環派發的。所以你要在不同的線程中獲得相應請求的處理的結果,那就沒有那麼簡單了。

事實上我是錯的。看看下面貼出的代碼大家都懂了。auto_ack 可以設置成true 不過重點還是在 channel 那個默認獨立的 匿名隊列,原來每次channel調用都是不一樣的routingkey 嚇死寶寶了~

看代碼:

public class Client {

    private static Connection connection = null;
    private static ConnectionFactory factory = null;

    public Connection getConnection() throws IOException {

        if (connection == null) {
            factory = new ConnectionFactory();
            factory.setVirtualHost("test");
            factory.setHost("192.168.0.21");
            factory.setPort(5673);
            factory.setUsername("tony");
            factory.setPassword("tonypwd");     //創建鏈接
            connection = factory.newConnection();
        }
        return connection;
    }


    public String sayHelloToServer(String username) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交換器名稱
        String queueName = "rpc_queue";     //隊列名稱
        String routingKey = "rpc_key";  //路由鍵


        Channel channel = getConnection().createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定義交換器

        channel.queueDeclare(queueName, false, false, false, null); //定義隊列

        channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列

        String callbackQueue = channel.queueDeclare().getQueue();   //獲得匿名的 獨立的 默認隊列

        String correlationId = UUID.randomUUID().toString();    //產生一個 關聯ID correlationID

        QueueingConsumer consumer = new QueueingConsumer(channel);  // 創建一個消費者對象

        channel.basicConsume(callbackQueue, true, consumer);      //消費消息

        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties.Builder()   //創建消息屬性
                .correlationId(correlationId)   //攜帶唯一的 correlationID
                .replyTo(callbackQueue).build();    //攜帶callback 回調的隊列路由鍵

        channel.basicPublish(exchangeName, routingKey, basicProperties, SerializationUtils.serialize(username));  //發布消息


        String response = null;

        while (true) {

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();   //循環獲得消息

            System.out.println("delivery >>>>[user:"+ username +"] >> routingKey : " + callbackQueue);

            if (correlationId.equals(delivery.getProperties().getCorrelationId())) {  //匹配correlationID是否與發出去的correlation的ID一直

                response = (String) SerializationUtils.deserialize(delivery.getBody()); //獲得處理結果

                break;
            }

        }


        channel.close();

        //關閉鏈接

        return response;

    }


    public static void main(String[] args) throws IOException, InterruptedException {


        List<String> usernameList = new ArrayList<String>();

        usernameList.add("TONY_A");
        usernameList.add("TONY_B");
        usernameList.add("TONY_C");
        usernameList.add("TONY_D");
        usernameList.add("TONY_E");
        usernameList.add("TONY_F");
        usernameList.add("TONY_G");


        for (int i = 0; i < usernameList.size(); i++) {

            final String username = usernameList.get(i);

            new Thread(new Runnable() {
                public void run() {

                    Client client = new Client();

                    String response = null;

                    try {

                        response = client.sayHelloToServer(username);

                    } catch (IOException e) {

                        e.printStackTrace();
                        response = "ERROR!!!";

                    } catch (InterruptedException e) {

                        e.printStackTrace();
                        response = "ERROR!!!";
                    }

                    System.out.println("server response : " + response);
                }
            }).start();


        }

    }


}

服務器代碼:

public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交換器名稱
        String queueName = "rpc_queue";     //隊列名稱
        String routingKey = "rpc_key";  //路由鍵

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        Connection connection = factory.newConnection();    //創建鏈接

        Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定義交換器

        channel.queueDeclare(queueName, false, false, false, null); //定義隊列

        channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //創建一個消費者

        channel.basicConsume(queueName,true,consumer);  //消費消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //獲得一條消息

            String correlationID  = delivery.getProperties().getCorrelationId();    //獲得額外攜帶的correlationID

            String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵

            String body = (String) SerializationUtils.deserialize(delivery.getBody());  //獲得請求的內容

            Thread.sleep(2000);

            String responseMsg = "welcome " + body; //處理返回內容

            AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                    .correlationId(correlationID)   //返回消息時攜帶 請求時傳過來的correlationID
                    .build();

            channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果

        }



    }



}

運行之後嚇死你~

delivery >>>>[user:TONY_A] >> routingKey : amq.gen-8IIKmpbatOaxi4qpk2iq1g
server response : welcome TONY_A
delivery >>>>[user:TONY_F] >> routingKey : amq.gen-DoLdGmIjpyQ58rDM1S-0Bw
server response : welcome TONY_F
delivery >>>>[user:TONY_D] >> routingKey : amq.gen-Xa-fQW2uUXuwcp9PXI8gzg
server response : welcome TONY_D
delivery >>>>[user:TONY_E] >> routingKey : amq.gen-RZcnSdVx4SWCsZRQD6Umcw
server response : welcome TONY_E
delivery >>>>[user:TONY_G] >> routingKey : amq.gen-ab8Wy9fCks5TuyETmZ0jFQ
server response : welcome TONY_G
delivery >>>>[user:TONY_C] >> routingKey : amq.gen-rhKc2IVr3VHpL0vmNxraPA
server response : welcome TONY_C
delivery >>>>[user:TONY_B] >> routingKey : amq.gen-DWYsJva6D7MAFdiVZacr-g
server response : welcome TONY_B

可以看到每一個channel的默認匿名的隊列routingKey都是不一樣的。所以不存在我所想的白癡問題。

我還白癡到畫了一個圖,這是我本以為是這樣的:

這是事實:

通過上面的實驗線性執行的,但是可以將處理的業務開一個線程去處理。就好像這樣。

服務器的代碼改改:

public class Server {

    public static void main(String[] args) throws IOException, InterruptedException {

        String exchangeName = "rpc_exchange";   //交換器名稱
        String queueName = "rpc_queue";     //隊列名稱
        String routingKey = "rpc_key";  //路由鍵

        ConnectionFactory factory = new ConnectionFactory();
        factory.setVirtualHost("test");
        factory.setHost("192.168.0.21");
        factory.setPort(5673);
        factory.setUsername("tony");
        factory.setPassword("tonypwd");

        final Connection connection = factory.newConnection();    //創建鏈接

        final Channel channel = connection.createChannel();

        channel.exchangeDeclare(exchangeName, "direct", false, false, null);    //定義交換器

        channel.queueDeclare(queueName, false, false, false, null); //定義隊列

        channel.queueBind(queueName, exchangeName, routingKey, null); //綁定隊列

        QueueingConsumer consumer = new QueueingConsumer(channel);     //創建一個消費者

        channel.basicConsume(queueName,true,consumer);  //消費消息

        while (true){

            QueueingConsumer.Delivery delivery = consumer.nextDelivery();  //獲得一條消息

            final String correlationID  = delivery.getProperties().getCorrelationId();    //獲得額外攜帶的correlationID

            final String replyTo = delivery.getProperties().getReplyTo(); //獲得回調的隊列路由鍵

            final String body = (String) SerializationUtils.deserialize(delivery.getBody());  //獲得請求的內容

            new Thread(new Runnable() {

                public void run() {

                    Channel channel = null;
                    try {
                        channel = connection.createChannel();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }

                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                    String responseMsg = "welcome " + body; //處理返回內容

                    AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
                            .correlationId(correlationID)   //返回消息時攜帶 請求時傳過來的correlationID
                            .build();

                    try {
                        channel.basicPublish("",replyTo,properties,SerializationUtils.serialize(responseMsg)); //返回處理結果
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }

            }).start();




        }



    }

}

當然我只是單純的新增了一下線程,你可以做成線程池等等去處理性能上的問題。

關於RabbitMQ的RPC就介紹就結束啦~~~~

 

Copyright © Linux教程網 All Rights Reserved