安裝配置epel源:(詳見http:
/
/
www.cnblogs.com
/
ernest
-
zhang
/
p
/
5714434.html
)
安裝erlang:
yum
-
y install erlang
注:安裝erlang的時候碰到
Error: Package: erlang
-
erts
-
R14B
-
04.3
.el6.i686 (epel)
Requires: libz.so.
1
(ZLIB_1.
2.2
)
[root@localhost ~]
# yum whatprovides libz.so.1
Loaded plugins: rhnplugin
This system
is
not
registered with RHN.
RHN support will be disabled.
zlib
-
1.2
.
3
-
25.el6
.i686 : The zlib compression
and
decompression library
#提供壓縮與解壓縮庫
Repo : local
Matched
from
:
Other : libz.so.
1
檢查發現應該是zlib的版本太老了,從網上下載最新的zlib
-
1.2
.
8
-
10.fc24
.i686,然後使用RPM安裝後解決。
下載地址:http:
/
/
www.zlib.net
/
#zlib官網
http:
/
/
rpmfind.net
/
linux
/
rpm2html
/
search.php?query
=
zlib
#zlib下載網站
安裝rabbitMQ:
yum
-
y install rabbitmq
-
server
service rabbitmq-server start/stop 啟動和停止rabbitmq
安裝API,然後可以基於API操作rabbitmq
1
2
3
4
5
6
7
pip install pika
or
easy_install pika
or
源碼
https:
/
/
pypi.python.org
/
pypi
/
pika
Python 操作RabbitMQ
發布端:
1
2
3
4
5
6
7
import
pika
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.0.74'
))
#服務器地址
channel
=
connection.channel()
channel.queue_declare(queue
=
'Hi'
)
#如果有隊列,略過;如果沒有,創建隊列
channel.basic_publish(exchange
=
'
',routing_key='
cc
',body='
hello!world!!!')
print
(
"[x] sent 'hello,world!'"
)
connection.close()
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16import
pika
#創建一個連接對象,綁定rabbitmq的IP
connection
=
pika.BlockingConnection(pika.ConnectionParameters(host
=
'192.168.0.74'
))
#創建一個頻道對象
channel
=
connection.channel()
#頻道中聲明指定queue,如果MQ中沒有指定queue就創建,如果有,則略過
channel.queue_declare(queue
=
'Hi'
)
#定義回調函數
def
callback(ch,method,properties,body):
print
(
'[x] Recieved %r'
%
body)
# channel.close()
#no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq,callback:回調函數,queue:指定隊列
channel.basic_consume(callback,queue
=
'Hi'
,no_ack
=
True
)
# channel.basic_consume(callback,queue='cc')
print
(
'[*] Waiting for msg'
)
channel.start_consuming()
1、acknowledgment 消息不丟失
no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會重新將該任務添加到隊列中。
ch.basic_ack(delivery_tag=method.delivery_tag)
no_ack=False
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=False) print('[*] Waiting for msg') channel.start_consuming()
消息生產者端發送消息時掛掉了,消費者接消息時掛掉了,以下方法會讓RabbitMQ重新將該消息添加到隊列中:
ch.basic_ack(delivery_tag=method.delivery_tag)
,消費端需要做的no_ack=False
,消費端需要做的properties=pika.BasicProperties(delivery_mode=2)
,生產者端需要做的import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 如果有,略過;如果沒有,創建隊列 channel.basic_publish(exchange='', routing_key='Hi', body='hello!world!!!', properties=pika.BasicProperties(delivery_mode=2)) #消息持久化 print("[x] sent 'hello,world!'") connection.close()生產者
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=True) print('[*] Waiting for msg') channel.start_consuming()消費者
默認消息隊列裡的數據是按照順序被消費者拿走,例如:消費者1去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。但有大部分情況下,消息隊列後端的消費者服務器的處理能力是不相同的,這就會出現有的服務器閒置時間較長,資源浪費的情況,那麼,我們就需要改變默認的消息隊列獲取順序!
channel.basic_qos(prefetch_count=1)
表示誰來誰取,不再按照奇偶數排列,這是消費者端需要做的
import pika connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.queue_declare(queue='Hi') # 定義回調函數 def callback(ch, method, properties, body): print('[x] Recieved %r' % body) # channel.close() ch.basic_ack(delivery_tag=method.delivery_tag) channel.basic_qos(prefetch_count=1) #改變默認獲取順序,誰來誰取 # no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq channel.basic_consume(callback, queue='Hi', no_ack=True) print('[*] Waiting for msg') channel.start_consuming()消費者
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
RabbitMQ中,所有生產者提交的消息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行存儲 。RabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少,只對前三種模式進行比較。exchange type = fanout
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') msg='456' channel.basic_publish(exchange='logs_fanout',routing_key='',body=msg) print('開始發送:%s'%msg) connection.close()生產者
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_fanout',type='fanout') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue #綁定相關隊列名稱 channel.queue_bind(exchange='logs_fanout',queue=queue_name) def callback(ch,method,properties,body): print('[x] %r'%body) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者
關鍵字
任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。 1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字符串,下文稱其為default Exchange)。 2.這種模式下不需要將Exchange進行任何綁定(binding)操作 3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。 4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') serverity='error' msg='123' channel.basic_publish(exchange='logs_direct_test1',routing_key=serverity,body=msg) print('開始發送:%r:%r'%(serverity,msg)) connection.close()生產者
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue serverities=['error','info','warning',] for serverity in serverities: channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity) print('[***] 開始接受消息!') def callback(ch,method,properties,body): print('[x] %r:%r'%(method.routing_key,body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者1
import pika connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='logs_direct_test1',type='direct') #隨機創建隊列 result=channel.queue_declare(exclusive=True) queue_name=result.method.queue serverities=['error',] for serverity in serverities: channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity) print('[***] 開始接受消息!') def callback(ch,method,properties,body): print('[x] %r:%r'%(method.routing_key,body)) channel.basic_consume(callback,queue=queue_name,no_ack=True) channel.start_consuming()消費者2
模糊訂閱
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上 1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的隊列。 2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。 3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。 4.“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。 5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info' message = ' '.join(sys.argv[2:]) or 'Hello World!' channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) print(" [x] Sent %r:%r" % (routing_key, message)) connection.close()生產者
#!/usr/bin/env python import pika import sys connection = pika.BlockingConnection(pika.ConnectionParameters( host='192.168.0.74')) channel = connection.channel() channel.exchange_declare(exchange='topic_logs', type='topic') result = channel.queue_declare(exclusive=True) queue_name = result.method.queue binding_keys = sys.argv[1:] if not binding_keys: sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0]) sys.exit(1) for binding_key in binding_keys: channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key) print(' [*] Waiting for logs. To exit press CTRL+C') def callback(ch, method, properties, body): print(" [x] %r:%r" % (method.routing_key, body)) channel.basic_consume(callback, queue=queue_name, no_ack=True) channel.start_consuming()
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡