Golang里读写kafka消息队列的简单用法
本文主要记录利用kafka来作为订阅模式消息队列的简单使用方法,kafka环境是在本地使用docker起的两个容器,启动方法如下:
// 创建网络 docker network create app-tier --driver bridge // 启动zookeeper docker run -d --name zookeeper-server \ --network app-tier \ -e ALLOW_ANONYMOUS_LOGIN=yes \ bitnami/zookeeper:latest // 启动第一个kafka实例 docker run -d --name kafka-server1 \ --network app-tier \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \ --hostname kafka-server1 \ -p 9092:9092 \ bitnami/kafka:latest // 启动第二个kafka实例 docker run -d --name kafka-server2 \ --network app-tier \ -e ALLOW_PLAINTEXT_LISTENER=yes \ -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \ --hostname kafka-server2 \ -p 9093:9092 \ bitnami/kafka:latest
使用上面的命令即可启动两个kafka实例,我们这里kafka包为github.com/Shopify/sarama,下面是demo代码:
package main
import (
"fmt"
"log"
"os"
"os/signal"
"time"
"github.com/Shopify/sarama"
)
var (
topic = "www.5bug.wang"
logger = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags)
brokerList = []string{"127.0.0.1:9092", "127.0.0.1:9093"}
)
func recveMessage(tag int, broker string, signals chan os.Signal) {
config := sarama.NewConfig()
consumer, err := sarama.NewConsumer([]string{broker}, config)
if err != nil {
panic(err)
}
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if err != nil {
panic(err)
}
defer partitionConsumer.Close()
for {
select {
case msg := <-partitionConsumer.Messages():
logger.Printf("-----> [%d/%s]%s/%d/%d\t%s\t%s\n", tag, broker, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
case err := <-partitionConsumer.Errors():
log.Println(err)
case <-signals:
return
}
}
}
func main() {
c := sarama.NewConfig()
producer, err := sarama.NewAsyncProducer(brokerList, c)
if err != nil {
panic(err)
}
defer producer.Close()
// kill -2, trap SIGINT to trigger a shutdown
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
go recveMessage(1, "127.0.0.1:9092", signals)
go recveMessage(2, "127.0.0.1:9093", signals)
bulkSize := 1
duration := 3 * time.Second
ticker := time.NewTicker(duration)
logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
for {
select {
case t := <-ticker.C:
now := t.Format(time.RFC3339)
logger.Printf("producing %v messages to topic: %s at %s", bulkSize, topic, now)
for i := 0; i < bulkSize; i++ {
producer.Input() <- &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("5bug"),
Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
}
}
case <-signals:
logger.Println("terminating the program")
logger.Println("Bye :)")
return
}
}
}将上述代码启动起来,然而并没有那么顺利,会出现如下错误:
dial tcp: lookup kafka-server: no such host
在本地出现这种错误是因为网络里无法识别kafka-server这个服务器名字,这里需要配置下hosts文件,增加:127.0.0.1 kafka-server即可!下面是运行成功的效果:
