吾八哥博客

您现在的位置是:首页 > 码农手记 > Golang > 正文

Golang

Golang里读写kafka消息队列的简单用法

吾八哥2022-05-29Golang2319

本文主要记录利用kafka来作为订阅模式消息队列的简单使用方法,kafka环境是在本地使用docker起的两个容器,启动方法如下:

// 创建网络
docker network create app-tier --driver bridge

// 启动zookeeper
docker run -d --name zookeeper-server \
    --network app-tier \
    -e ALLOW_ANONYMOUS_LOGIN=yes \
    bitnami/zookeeper:latest
    

// 启动第一个kafka实例
docker run -d --name kafka-server1 \
    --network app-tier \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
    --hostname kafka-server1 \
    -p 9092:9092 \
    bitnami/kafka:latest

// 启动第二个kafka实例
docker run -d --name kafka-server2 \
    --network app-tier \
    -e ALLOW_PLAINTEXT_LISTENER=yes \
    -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 \
    --hostname kafka-server2 \
    -p 9093:9092 \
    bitnami/kafka:latest


使用上面的命令即可启动两个kafka实例,我们这里kafka包为github.com/Shopify/sarama,下面是demo代码:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"time"

	"github.com/Shopify/sarama"
)

var (
	topic      = "www.5bug.wang"
	logger     = log.New(os.Stdout, "[OTelInterceptor] ", log.LstdFlags)
	brokerList = []string{"127.0.0.1:9092", "127.0.0.1:9093"}
)

func recveMessage(tag int, broker string, signals chan os.Signal) {
	config := sarama.NewConfig()
	consumer, err := sarama.NewConsumer([]string{broker}, config)
	if err != nil {
		panic(err)
	}
	partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
	if err != nil {
		panic(err)
	}
	defer partitionConsumer.Close()

	for {
		select {
		case msg := <-partitionConsumer.Messages():
			logger.Printf("-----> [%d/%s]%s/%d/%d\t%s\t%s\n", tag, broker, msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
		case err := <-partitionConsumer.Errors():
			log.Println(err)
		case <-signals:
			return
		}
	}
}

func main() {
	c := sarama.NewConfig()
	producer, err := sarama.NewAsyncProducer(brokerList, c)
	if err != nil {
		panic(err)
	}

	defer producer.Close()

	// kill -2, trap SIGINT to trigger a shutdown
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	go recveMessage(1, "127.0.0.1:9092", signals)
	go recveMessage(2, "127.0.0.1:9093", signals)

	bulkSize := 1
	duration := 3 * time.Second
	ticker := time.NewTicker(duration)
	logger.Printf("Starting to produce %v messages every %v", bulkSize, duration)
	for {
		select {
		case t := <-ticker.C:
			now := t.Format(time.RFC3339)
			logger.Printf("producing %v messages to topic: %s at %s", bulkSize, topic, now)
			for i := 0; i < bulkSize; i++ {
				producer.Input() <- &sarama.ProducerMessage{
					Topic: topic,
					Key:   sarama.StringEncoder("5bug"),
					Value: sarama.StringEncoder(fmt.Sprintf("test message %v/%v from kafka-client-go-test at %s", i+1, bulkSize, now)),
				}
			}
		case <-signals:
			logger.Println("terminating the program")
			logger.Println("Bye :)")
			return
		}
	}
}

将上述代码启动起来,然而并没有那么顺利,会出现如下错误:

dial tcp: lookup kafka-server: no such host

在本地出现这种错误是因为网络里无法识别kafka-server这个服务器名字,这里需要配置下hosts文件,增加:127.0.0.1 kafka-server即可!下面是运行成功的效果:

202205292242_6270.png