You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

37 lines
892 B

package kafka
import (
"strings"
"chat-room/pkg/global/log"
"github.com/Shopify/sarama"
)
var producer sarama.AsyncProducer
var topic string = "default_message"
func InitProducer(topicInput, hosts string) {
topic = topicInput
config := sarama.NewConfig()
config.Producer.Compression = sarama.CompressionGZIP
client, err := sarama.NewClient(strings.Split(hosts, ","), config)
if nil != err {
log.Logger.Error("init kafka client error", log.Any("init kafka client error", err.Error()))
}
producer, err = sarama.NewAsyncProducerFromClient(client)
if nil != err {
log.Logger.Error("init kafka async client error", log.Any("init kafka async client error", err.Error()))
}
}
func Send(data []byte) {
be := sarama.ByteEncoder(data)
producer.Input() <- &sarama.ProducerMessage{Topic: topic, Key: nil, Value: be}
}
func Close() {
if producer != nil {
producer.Close()
}
}