總結一下幾種ExchangeTypes。
之前寫發布/訂閱模式時第一次提到了exchange type(見 http://www.linuxidc.com/Linux/2014-08/105107.htm)。
即producer不是將消息直接放到隊列中,而是先到exchange中,exchange主要用於控制消息到隊列的路由,根據具體的exchange type將消息傳給需要的隊列或者直接廢棄。
在這一篇中總結一下那些用到的exchange type。
一.Direct Exchange
direct exchange算是最基本的了。
direct exchange用於將帶上routing key的消息傳值擁有相同routing key的隊列中。
當我們想用一個簡單的標識符區別所有傳入同一個exchange中的消息時direct exchange就非常合適。
代碼如下:
private static String DIRECT_EXCHANGE = "DIRECT_EXCHAGNE";
static class FanoutProducer {
public static void main(String[] args) throws IOException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();;
String content = "I miss the conversation";
channel.exchangeDeclare(DIRECT_EXCHANGE, ExchangeTypes.DIRECT);
channel.basicPublish(DIRECT_EXCHANGE, "alvez", null, content.getBytes());
}
}
static class FanoutConsumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory connectionFactory = new ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, DIRECT_EXCHANGE, "alvez");
QueueingConsumer consumer = new QueueingConsumer(channel);
String s = channel.basicConsume(queueName, true, consumer);
System.out.println(s);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println("From:" + routingKey + "':'" + message + "'");
}
}
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
二.Fanout Exchange
fanout和routing key無關,它將消息無差別地(indiscriminately)傳送給所有隊列。
fanout exchange通常用於發布/訂閱模式。
將消息傳送給不同的隊列,不同的隊列對同一種消息采取不同的行為。
比如,現在有一個客戶訂單消息被三個隊列接收,隊列1完成該訂單,隊列2將訂單寫入日志,隊列3將訂單發給別的部門什麼的。
比如下面的代碼,消費者可以獲得routing key並輸出,但能否獲取與routing key無關:
private
static
String FANOUT_EXCHANGE =
"FANOUT_EXCHANGE"
;
static
class
DirectProducer {
public
static
void
main(String[] args)
throws
IOException {
ConnectionFactory connectionFactory =
new
ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();;
String content =
"I miss the conversation"
;
channel.exchangeDeclare(FANOUT_EXCHANGE, ExchangeTypes.FANOUT);
channel.basicPublish(FANOUT_EXCHANGE,
"alvez"
,
null
, content.getBytes());
}
}
static
class
DirectConsumer {
public
static
void
main(String[] args)
throws
IOException, InterruptedException {
ConnectionFactory connectionFactory =
new
ConnectionFactory();
Connection connection = connectionFactory.newConnection();
Channel channel= connection.createChannel();
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, FANOUT_EXCHANGE,
""
);
QueueingConsumer consumer =
new
QueueingConsumer(channel);
String s = channel.basicConsume(queueName,
true
, consumer);
System.out.println(s);
while
(
true
) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message =
new
String(delivery.getBody());
String routingKey = delivery.getEnvelope().getRoutingKey();
System.out.println(
"From:"
+ routingKey +
"':'"
+ message +
"'"
);
}
}
}
更多詳情見請繼續閱讀下一頁的精彩內容: http://www.linuxidc.com/Linux/2014-08/105107p2.htm