You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

48 lines
1.1 KiB

package kafka
import (
"chat-room/pkg/global/log"
"github.com/Shopify/sarama"
"strings"
)
var consumer sarama.Consumer
type ConsumerCallback func(data []byte)
// 初始化消费者
func InitConsumer(hosts string) {
config := sarama.NewConfig()
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if nil != err {
log.Logger.Error("init kafka consumer client error", log.Any("init kafka consumer client error", err.Error()))
}
consumer, err = sarama.NewConsumerFromClient(client)
if nil != err {
log.Logger.Error("init kafka consumer error", log.Any("init kafka consumer error", err.Error()))
}
}
// 消费消息,通过回调函数进行
func ConsumerMsg(callBack ConsumerCallback) {
partitionConsumer, err := consumer.ConsumePartition(topic, 0, sarama.OffsetNewest)
if nil != err {
log.Logger.Error("iConsumePartition error", log.Any("ConsumePartition error", err.Error()))
return
}
defer partitionConsumer.Close()
for {
msg := <-partitionConsumer.Messages()
if nil != callBack {
callBack(msg.Value)
}
}
}
func CloseConsumer() {
if nil != consumer {
consumer.Close()
}
}