You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

94 lines
1.7 KiB

2 months ago
package producers
import (
"github.com/streadway/amqp"
"matchmaking-system/internal/conf"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
)
// ProducerPosition
// @Description:
type ProducerPosition struct {
Conn *amqp.Connection
Channel *amqp.Channel
Queue *amqp.Queue
}
// NewPosition
//
// @Description: 初始化消费者平仓消息队列
// @param f
// @param con
// @return *ProducerPosition
// @return error
func NewPosition(f *conf.Data, con *amqp.Connection) (*ProducerPosition, error) {
var err error
var queue amqp.Queue
p := new(ProducerPosition)
// Producer Link Service
p.Conn = con
// Create a producer message channel
p.Channel, err = p.Conn.Channel()
if err != nil {
applogger.Error("channel err:%v", err)
return p, err
}
// Declare a queue
queue, err = p.Channel.QueueDeclare(
f.Mq.Position,
true,
false,
false,
false,
nil,
)
p.Queue = &queue
return p, nil
}
// PushNotice
//
// @Description: 生产者推送消息
// @receiver p
// @param msgCache
// @return error
func (p *ProducerPosition) PushNotice(msgCache []byte) error {
if err := p.dealPublish(msgCache); err != nil {
applogger.Info("send message exception err:%v", err)
// TODO: 处理消息推送异常情况(需要尝试再次链接)
return err
}
return nil
}
// DealPublish
//
// @Description: 处理推送消息
// @receiver p
// @param body
// @return error
func (p *ProducerPosition) dealPublish(body []byte) error {
err := p.Channel.Publish(
flags.SetNull,
p.Queue.Name,
false,
false,
amqp.Publishing{
ContentType: "text/plain",
Body: body,
})
if err != nil {
applogger.Error("message sending exception:%v", err)
return err
}
return nil
}