安裝配置epel源:(詳見http://www.cnblogs.com/ernest-zhang/p/5714434.html)
安裝erlang:
yum -y install erlang
注:安裝erlang的時候碰到
Error: Package: erlang-erts-R14B-04.3.el6.i686 (epel)
Requires: libz.so.1(ZLIB_1.2.2)
[root@localhost ~]# yum whatprovides libz.so.1
Loaded plugins: rhnplugin
This system is not registered with RHN.
RHN support will be disabled.
zlib-1.2.3-25.el6.i686 : The zlib compression and decompression library #提供壓縮與解壓縮庫
Repo : local
Matched from:
Other : libz.so.1
檢查發現應該是zlib的版本太老了,從網上下載最新的zlib-1.2.8-10.fc24.i686,然後使用RPM安裝後解決。
下載地址:http://www.zlib.net/ #zlib官網
http://rpmfind.net/linux/rpm2html/search.php?query=zlib #zlib下載網站
安裝rabbitMQ:
yum -y install rabbitmq-server
service rabbitmq-server start/stop 啟動和停止rabbitmq
安裝API,然後可以基於API操作rabbitmq
1
2
3
4
5
6
7
pip install pika
or
easy_install pika
or
源碼
https://pypi.python.org/pypi/pika
Python 操作RabbitMQ
發布端:
1
2
3
4
5
6
7
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74')) #服務器地址
channel=connection.channel()
channel.queue_declare(queue='Hi') #如果有隊列,略過;如果沒有,創建隊列
channel.basic_publish(exchange='',routing_key='cc',body='hello!world!!!')
print("[x] sent 'hello,world!'")
connection.close()
接收端:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16import pika
#創建一個連接對象,綁定rabbitmq的IP
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
#創建一個頻道對象
channel=connection.channel()
#頻道中聲明指定queue,如果MQ中沒有指定queue就創建,如果有,則略過
channel.queue_declare(queue='Hi')
#定義回調函數
def callback(ch,method,properties,body):
print('[x] Recieved %r'%body)
# channel.close()
#no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq,callback:回調函數,queue:指定隊列
channel.basic_consume(callback,queue='Hi',no_ack=True)
# channel.basic_consume(callback,queue='cc')
print('[*] Waiting for msg')
channel.start_consuming()
1、acknowledgment 消息不丟失
no-ack = False,如果消費者遇到情況(its channel is closed, connection is closed, or TCP connection is lost)掛掉了,那麼,RabbitMQ會重新將該任務添加到隊列中。
ch.basic_ack(delivery_tag=method.delivery_tag)no_ack=Falseimport pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
print('[x] Recieved %r' % body)
# channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
no_ack=False)
print('[*] Waiting for msg')
channel.start_consuming()
消息生產者端發送消息時掛掉了,消費者接消息時掛掉了,以下方法會讓RabbitMQ重新將該消息添加到隊列中:
ch.basic_ack(delivery_tag=method.delivery_tag),消費端需要做的no_ack=False,消費端需要做的properties=pika.BasicProperties(delivery_mode=2),生產者端需要做的import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi') # 如果有,略過;如果沒有,創建隊列
channel.basic_publish(exchange='',
routing_key='Hi',
body='hello!world!!!',
properties=pika.BasicProperties(delivery_mode=2)) #消息持久化
print("[x] sent 'hello,world!'")
connection.close()
生產者
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
print('[x] Recieved %r' % body)
# channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag)
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者
默認消息隊列裡的數據是按照順序被消費者拿走,例如:消費者1去隊列中獲取 奇數 序列的任務,消費者2去隊列中獲取 偶數 序列的任務。但有大部分情況下,消息隊列後端的消費者服務器的處理能力是不相同的,這就會出現有的服務器閒置時間較長,資源浪費的情況,那麼,我們就需要改變默認的消息隊列獲取順序!
channel.basic_qos(prefetch_count=1) 表示誰來誰取,不再按照奇偶數排列,這是消費者端需要做的
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.queue_declare(queue='Hi')
# 定義回調函數
def callback(ch, method, properties, body):
print('[x] Recieved %r' % body)
# channel.close()
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1) #改變默認獲取順序,誰來誰取
# no_ack=Fales:表示消費完以後不主動把狀態通知rabbitmq
channel.basic_consume(callback, queue='Hi',
no_ack=True)
print('[*] Waiting for msg')
channel.start_consuming()
消費者
發布訂閱和簡單的消息隊列區別在於,發布訂閱會將消息發送給所有的訂閱者,而消息隊列中的數據被消費一次便消失。所以,RabbitMQ實現發布和訂閱時,會為每一個訂閱者創建一個隊列,而發布者發布消息時,會將消息放置在所有相關隊列中。
RabbitMQ中,所有生產者提交的消息都由Exchange來接受,然後Exchange按照特定的策略轉發到Queue進行存儲 。RabbitMQ提供了四種Exchange:fanout,direct,topic,header header模式在實際使用中較少,只對前三種模式進行比較。exchange type = fanout
任何發送到Fanout Exchange的消息都會被轉發到與該Exchange綁定(Binding)的所有Queue上。
1.可以理解為路由表的模式
2.這種模式不需要RouteKey
3.這種模式需要提前將Exchange與Queue進行綁定,一個Exchange可以綁定多個Queue,一個Queue可以同多個Exchange進行綁定。
4.如果接受到消息的Exchange沒有與任何Queue綁定,則消息會被拋棄。
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
msg='456'
channel.basic_publish(exchange='logs_fanout',routing_key='',body=msg)
print('開始發送:%s'%msg)
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_fanout',type='fanout')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
#綁定相關隊列名稱
channel.queue_bind(exchange='logs_fanout',queue=queue_name)
def callback(ch,method,properties,body):
print('[x] %r'%body)
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者
關鍵字
任何發送到Direct Exchange的消息都會被轉發到RouteKey中指定的Queue。 1.一般情況可以使用rabbitMQ自帶的Exchange:”"(該Exchange的名字為空字符串,下文稱其為default Exchange)。 2.這種模式下不需要將Exchange進行任何綁定(binding)操作 3.消息傳遞時需要一個“RouteKey”,可以簡單的理解為要發送到的隊列名字。 4.如果vhost中不存在RouteKey中指定的隊列名,則該消息會被拋棄。import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
serverity='error'
msg='123'
channel.basic_publish(exchange='logs_direct_test1',routing_key=serverity,body=msg)
print('開始發送:%r:%r'%(serverity,msg))
connection.close()
生產者
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error','info','warning',]
for serverity in serverities:
channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者1
import pika
connection=pika.BlockingConnection(pika.ConnectionParameters(host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='logs_direct_test1',type='direct')
#隨機創建隊列
result=channel.queue_declare(exclusive=True)
queue_name=result.method.queue
serverities=['error',]
for serverity in serverities:
channel.queue_bind(exchange='logs_direct_test1',queue=queue_name,routing_key=serverity)
print('[***] 開始接受消息!')
def callback(ch,method,properties,body):
print('[x] %r:%r'%(method.routing_key,body))
channel.basic_consume(callback,queue=queue_name,no_ack=True)
channel.start_consuming()
消費者2
模糊訂閱
任何發送到Topic Exchange的消息都會被轉發到所有關心RouteKey中指定話題的Queue上 1.這種模式較為復雜,簡單來說,就是每個隊列都有其關心的主題,所有的消息都帶有一個“標題”(RouteKey),Exchange會將消息轉發到所有關注主題能與RouteKey模糊匹配的隊列。 2.這種模式需要RouteKey,也許要提前綁定Exchange與Queue。 3.在進行綁定時,要提供一個該隊列關心的主題,如“#.log.#”表示該隊列關心所有涉及log的消息(一個RouteKey為”MQ.log.error”的消息會被轉發到該隊列)。 4.“#”表示0個或若干個關鍵字,“*”表示一個關鍵字。如“log.*”能與“log.warn”匹配,無法與“log.warn.timeout”匹配;但是“log.#”能與上述兩者匹配。 5.同樣,如果Exchange沒有發現能夠與RouteKey匹配的Queue,則會拋棄此消息。#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
message = ' '.join(sys.argv[2:]) or 'Hello World!'
channel.basic_publish(exchange='topic_logs',
routing_key=routing_key,
body=message)
print(" [x] Sent %r:%r" % (routing_key, message))
connection.close()
生產者
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='192.168.0.74'))
channel = connection.channel()
channel.exchange_declare(exchange='topic_logs',
type='topic')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
if not binding_keys:
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
sys.exit(1)
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" [x] %r:%r" % (method.routing_key, body))
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡