You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

130 lines
3.1 KiB

package consumer
import (
"github.com/streadway/amqp"
"matchmaking-system/internal/conf"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
)
// ChanPositionStatus
// @Description:
type ChanPositionStatus struct {
ShareUsMq chan []byte // 美股
ShareMysMq chan []byte // 马股
ShareIdnMq chan []byte // 印尼股
ShareThaMq chan []byte // 泰股
ShareInrMq chan []byte // 印度股
ShareGbxMq chan []byte // 英股
ShareSgdMq chan []byte // 新加坡股
ShareHkdMq chan []byte // 港股
ShareEurMq chan []byte // 德股
ShareFurMq chan []byte // 法股
ShareBrlMq chan []byte // 巴西股
ShareJpyMq chan []byte // 日股
OptionInrMq chan []byte // 期权(印度)
ContractMq chan []byte // 合约
ForexMq chan []byte // 外汇
}
// ConsumerPosition
// @Description: 平仓消息队列(持仓--平仓)
type ConsumerPosition struct {
Conn *amqp.Connection
Channel *amqp.Channel
Queue *amqp.Queue
}
// NewPosition
//
// @Description: 初始化平仓消息列队
// @param f
// @param con
// @return *ConsumerEntrust
// @return error
func NewPosition(f *conf.Data, con *amqp.Connection) (*ConsumerPosition, error) {
var err error
var queue amqp.Queue
cp := new(ConsumerPosition)
// Producer Link Service
cp.Conn = con
// Create a consumer channel
cp.Channel, err = cp.Conn.Channel()
if err != nil {
applogger.Error("channel err:%v", err)
return cp, err
}
// Declare a queue
queue, err = cp.Channel.QueueDeclare(
f.Mq.Position,
true,
false,
false,
false,
nil,
)
cp.Queue = &queue
return cp, nil
}
// ConsumerNotice
//
// @Description: 处理接收到的消息
// @receiver c
func (c *ConsumerPosition) ConsumerNotice() {
msgs, err := c.Channel.Consume(
c.Queue.Name,
flags.SetNull,
true,
false,
false,
false,
nil,
)
if err != nil {
applogger.Error("ConsumerNotice err:%v", err)
return
}
applogger.Info("消费者消息接受通道........")
for value := range msgs {
// TODO: 处理接收到的消息进行平仓操作
switch flags.CheckEnvironment {
case flags.CheckContract:
ConsumerPositionMq.ContractMq <- value.Body
case flags.CheckForex:
ConsumerPositionMq.ForexMq <- value.Body
case flags.CheckShareUs:
ConsumerPositionMq.ShareUsMq <- value.Body
case flags.CheckShareMys:
ConsumerPositionMq.ShareMysMq <- value.Body
case flags.CheckShareTha:
ConsumerPositionMq.ShareThaMq <- value.Body
case flags.CheckShareIdn:
ConsumerPositionMq.ShareIdnMq <- value.Body
case flags.CheckShareInr:
ConsumerPositionMq.ShareInrMq <- value.Body
case flags.CheckShareSgd:
ConsumerPositionMq.ShareSgdMq <- value.Body
case flags.CheckShareHkd:
ConsumerPositionMq.ShareHkdMq <- value.Body
case flags.CheckShareGbx:
ConsumerPositionMq.ShareGbxMq <- value.Body
case flags.CheckShareEur:
ConsumerPositionMq.ShareEurMq <- value.Body
case flags.CheckShareFur:
ConsumerPositionMq.ShareFurMq <- value.Body
case flags.CheckShareBrl:
ConsumerPositionMq.ShareBrlMq <- value.Body
case flags.CheckOptionInr:
ConsumerPositionMq.OptionInrMq <- value.Body
default:
applogger.Error("服务启动标识错误.")
}
}
}