測試環境:Ubuntu 15.10 64位
cpu:inter core i7-4790 3.60GHZ * 8
內存:16GB
硬盤:ssd 120GB
軟件環境:rabbmitmq 3.6.0 kafka0.8.1 (均為單機本機運行)
PS: 測試結果均為單操作測試,即生產的時候沒有消費操作
測試結果:
kafka :消費速度: 37,586 /s 生產速度: 448,753 /s
rabbitmq: 消費速度: 20,807 /s 生產速度 16.413 /s
出現問題:
rabbitmq 生產4分鐘左右出現隊列阻塞,無法繼續添加數據,1分鐘後恢復,再過大約1分鐘又出現此現象並以約1分鐘為間隔出現此問題。
rabbitmq 生產對象時有不小的幾率(約 1/20)添加隊列失敗,報出的錯誤是“tcp鏈接重置”
其他並無任何問題
結論:
很明顯的看出kafka的性能遠超rabbitmq。不過這也是理所當然的,畢竟2個消息隊列實現的協議是不一樣的,處理消息的場景也大有不同。rabbitmq適合處理一些數據嚴謹的消息,比如說支付消息,社交消息等不能丟失的數據。kafka是批量操作切不報證數據是否能完整的到達消費者端,所以適合一些大量的營銷消息的場景。
代碼:
kafka:
package main
import (
"github.com/Shopify/sarama"
"os"
"os/signal"
"sync"
"log"
"time"
)
func main() {
go producer()
// go consumer()
time.Sleep(10*time.Minute)
}
func producer() {
config :=sarama.NewConfig()
config.Producer.Return.Successes = true
proder,err := sarama.NewAsyncProducer([]string{"localhost:9092"},config)
if err != nil {
panic(err)
}
signals :=make(chan os.Signal,1)
signal.Notify(signals,os.Interrupt)
var (
wg sync.WaitGroup
enqueued, successes, errors int
)
wg.Add(1)
go func() {
defer wg.Done()
for _=range proder.Successes(){
successes++
}
}()
wg.Add(1)
go func() {
defer wg.Done()
for err := range proder.Errors(){
log.Println(err)
errors++
}
}()
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(enqueued)
}
}()
ProducerLoop:
for{
message :=&sarama.ProducerMessage{Topic:"test",Value:sarama.StringEncoder("testing 123")}
select {
case proder.Input() <- message:
enqueued++
case <- signals:
proder.AsyncClose()
break ProducerLoop
}
}
wg.Wait()
log.Println("Successfully produced:%d;errors:%d\n",successes,errors)
}
func consumer() {
coner,err := sarama.NewConsumer([]string{"localhost:9092"},nil)
if err != nil {
panic(err)
}
defer func() {
if err :=coner.Close(); err !=nil{
log.Fatalln(err)
}
}()
partitionConsumer ,err := coner.ConsumePartition("test",0,sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer func() {
if err := partitionConsumer.Close();err!=nil{
log.Fatalln(err)
}
}()
signals := make(chan os.Signal,1)
signal.Notify(signals,os.Interrupt)
consumed:=0
go func() {
t1 := time.NewTicker(time.Second)
for{
<- t1.C
log.Println(consumed)
}
}()
ConsumerLoop:
for{
select {
case _ = <-partitionConsumer.Messages():
consumed++
// log.Println( string(msg.Value)," => ",consumed)
case <-signals:
break ConsumerLoop
}
}
log.Printf("Consumed: %d\n", consumed)
}
rabbitmq:
package main
import (
"github.com/streadway/amqp"
"time"
"fmt"
"log"
)
const (
queueName = "push.msg.q"
exchange = "t.msg.ex"
mqurl ="amqp://shimeng:[email protected]:5672/push"
)
var conn *amqp.Connection
var channel *amqp.Channel
func main() {
fmt.Println(1)
// push()
receive()
// fmt.Println("end")
// close()
}
func failOnErr(err error, msg string) {
if err != nil {
log.Fatalf("%s:%s", msg, err)
panic(fmt.Sprintf("%s:%s", msg, err))
}
}
func mqConnect() {
var err error
conn, err = amqp.Dial(mqurl)
if err != nil {
log.Println(1)
log.Fatalln(err)
}
fmt.Println(5)
channel, err = conn.Channel()
if err != nil {
fmt.Println(2)
log.Fatalln(err)
}else {
fmt.Println("a")
}
}
func push() {
count := 0
if channel == nil {
fmt.Println(2)
mqConnect()
}else {
fmt.Println(3)
}
msgContent := "hello world!"
t1 := time.NewTicker(time.Second)
go func() {
for{
<- t1.C
log.Println(count)
}
}()
for{
err := channel.Publish(exchange, "test", false, false, amqp.Publishing{
ContentType: "text/plain",
Body: []byte(msgContent),
})
if err != nil {
}else {
count ++
}
}
}
func receive() {
if channel == nil {
mqConnect()
}
count :=0
msgs, err := channel.Consume(queueName, "", true, false, false, false, nil)
failOnErr(err, "")
forever := make(chan bool)
t1 := time.NewTicker(time.Second)
go func() {
for{
<- t1.C
log.Println(count)
}
}()
go func() {
//fmt.Println(*msgs)
for _= range msgs {
count ++
// s := BytesToString(&(d.Body))
// count++
// fmt.Printf("receve msg is :%s -- %d\n", *s, count)
}
}()
fmt.Printf(" [*] Waiting for messages. To exit press CTRL+C\n")
<-forever
}
CentOS 5.6 安裝RabbitMQ http://www.linuxidc.com/Linux/2013-02/79508.htm
RabbitMQ客戶端C++安裝詳細記錄 http://www.linuxidc.com/Linux/2012-02/53521.htm
用Python嘗試RabbitMQ http://www.linuxidc.com/Linux/2011-12/50653.htm
RabbitMQ集群環境生產實例部署 http://www.linuxidc.com/Linux/2012-10/72720.htm
Ubuntu下PHP + RabbitMQ使用 http://www.linuxidc.com/Linux/2010-07/27309.htm
在CentOS上安裝RabbitMQ流程 http://www.linuxidc.com/Linux/2011-12/49610.htm
RabbitMQ概念及環境搭建 http://www.linuxidc.com/Linux/2014-12/110449.htm
RabbitMQ入門教程 http://www.linuxidc.com/Linux/2015-02/113983.htm
RabbitMQ 的詳細介紹:請點這裡
RabbitMQ 的下載地址:請點這裡
本文永久更新��接地址:http://www.linuxidc.com/Linux/2016-03/129093.htm