You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

862 lines
28 KiB

package data
import (
"context"
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"matchmaking-system/internal/biz/structure"
"matchmaking-system/internal/data/memory"
"matchmaking-system/internal/data/mq/consumer"
"matchmaking-system/internal/data/socket/publicData"
"matchmaking-system/internal/data/socket/virtualData"
"matchmaking-system/internal/data/tradedeal/virtual"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
"matchmaking-system/internal/pkg/logging/common"
"matchmaking-system/internal/pkg/setting"
"matchmaking-system/internal/pkg/utils"
"sync"
"time"
)
/*
合约行情订阅
1、初始化现货订阅信息
2、接收用户现货下单的交易对
3、根据交易对订阅现货行情数据
4、写入内存中用于并发计算
5、记录在热缓存中
*/
// QuitesContract
// @Description:
type QuitesContract struct {
ServersID string `json:"serversId"`
Content struct {
Ch string `json:"ch"`
Ts int64 `json:"ts"`
Tick struct {
ID int `json:"id"`
Mrid int64 `json:"mrid"`
Open string `json:"open"`
Close string `json:"close"`
High string `json:"high"`
Low string `json:"low"`
Amount string `json:"amount"`
Vol string `json:"vol"`
TradeTurnover string `json:"trade_turnover"`
Count int `json:"count"`
Asks any `json:"asks"`
Bids any `json:"bids"`
} `json:"Tick"`
} `json:"content"`
Symbol string `json:"symbol"`
}
// ContractSymbol
// @Description:
type ContractSymbol struct {
ContractMap chan []byte // 合约订单交易对 -- 用户合约下单通知订阅
ContractMapSymbol sync.Map // 合约订阅交易对簿
}
// ContractSetUp
// @Description:
type ContractSetUp struct {
SelfContractCode string `json:"selfContractCode"` // 合约交易对
BeginTime string `json:"BeginTime"` // 开始时间
Step int `json:"Step"` // 设置时间间隔
EndTime string `json:"EndTime"` // 结束时间
Price string `json:"Price"` // 设置价格
}
// InitCacheSymbolContract
//
// @Description:
// @param ctx
// @param uo
func InitCacheSymbolContract(ctx context.Context, uo *Data) {
contractList, err := GetBotContractList(ctx, uo)
if err != nil {
return
}
for _, value := range contractList {
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketContract, value).Result()
if err != nil {
applogger.Error("%v InitCacheSymbolSpots.MarketContract.HExists:%v", common.ErrContract, err)
return
}
applogger.Info("初始化合约交易对:%v", value)
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), setting.MarketContract, value, value).Err(); err != nil {
applogger.Error("%v InitCacheSymbolSpots.HSet:%v", common.ErrContract, err)
return
}
}
}
}
// InitSubscribeQuotesContract
//
// @Description:
// @param ctx
// @param uo
// @param contract
func InitSubscribeQuotesContract(contract *ContractSymbol) {
for {
listSpots, err := LoadLRangeList(setting.MarketContract)
if err != nil {
applogger.Error("%v InitSubscribeQuotesContract.LoadLRangeList:%v", common.ErrContract, err)
return
}
for _, value := range listSpots {
// Prevent duplicate subscription to CtrIp
_, ok := contract.ContractMapSymbol.Load(value)
if ok {
continue
}
contract.ContractMap <- []byte(value)
}
time.Sleep(10 * time.Second)
}
}
// SubscribeQuotesContract
//
// @Description: 合约订阅
// @param ctx
// @param uo
// @param contract
func SubscribeQuotesContract(ctx context.Context, uo *Data, contract *ContractSymbol) {
for {
select {
case symbol, _ := <-contract.ContractMap:
symbolStr := string(symbol)
// Prevent duplicate subscription to CtrIp
_, ok := contract.ContractMapSymbol.Load(symbolStr)
if ok {
time.Sleep(5 * time.Second)
continue
}
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketContract, symbolStr).Result()
if err != nil {
applogger.Error("%v SubscribeQuotesContract.MarketContract.HExists:%v", common.ErrContract, err)
time.Sleep(5 * time.Second)
continue
}
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), setting.MarketContract, symbolStr, symbolStr).Err(); err != nil {
applogger.Error("%v SubscribeQuotesContract.MarketContract.HSet:%v", common.ErrContract, err)
time.Sleep(5 * time.Second)
continue
}
}
// 订阅插针价格
go func() {
for {
var data string
data, err = uo.redisDB.Get(context.Background(), flags.ContractSystemPriceSetUp).Result()
if err != nil || data == "[]" {
time.Sleep(5 * time.Second)
continue
}
var dataList []ContractSetUp
if err = json.Unmarshal([]byte(data), &dataList); err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range dataList {
checkStart := len(utils.StrReplace(value.BeginTime))
checkEnd := len(utils.StrReplace(value.EndTime))
if checkStart == 0 || checkEnd == 0 {
continue
}
if value.SelfContractCode == symbolStr {
// Determine if it is the current subscription type
priceKey := publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypePrice)
if !utils.CheckTimeUTC(value.BeginTime, value.EndTime) {
continue
}
if err = memory.ContractPriceSetUp.Set(priceKey, []byte(utils.DecimalsPrice(value.Price))); err != nil {
applogger.Error("%v SubscribeQuotesContract.ContractPriceSetUp.set:%v", common.ErrContract, err)
continue
}
if !flags.CheckSetting {
applogger.Info("当前写入插针最新价格:%v,%v", priceKey, value.Price)
}
}
}
time.Sleep(2 * time.Second)
}
}()
// 订阅实时价格
go func() {
topIc := fmt.Sprintf("market.%v.detail", utils.ReplaceStrByValue(symbolStr, "/"))
pubSub := uo.redisDB.Subscribe(ctx, topIc)
defer pubSub.Close()
if _, err = pubSub.Receive(ctx); err != nil {
applogger.Error("%v SubscribeQuotesContract.Receive:%v", common.ErrContract, err)
return
}
chp := pubSub.Channel()
for msg := range chp {
var subMsg QuitesContract
if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
applogger.Error("%v SubscribeQuotesContract.QuitesContract.Unmarshal:%v", common.ErrContract, err)
close(contract.ContractMap)
return
}
// 交易类型:0最新价,1买入,2卖出
price := publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypePrice)
if err = memory.ContractCache.Set(price, []byte(subMsg.Content.Tick.Close)); err != nil {
applogger.Error("%v SubscribeQuotesContract.Set:%v", common.ErrContract, err)
continue
}
if !flags.CheckSetting {
applogger.Info("当前写入最新价格:%v,%v", price, subMsg.Content.Tick.Close)
}
}
}()
// Write in the map to determine whether to repeat subscription
contract.ContractMapSymbol.Store(symbolStr, symbolStr)
}
}
}
// OrderSubAdminContractSubscribeBySum
//
// @Description: 持仓订单浮动盈亏计算
// @param ctx
// @param uo
func OrderSubAdminContractSubscribeBySum(uo *Data) {
for {
topIc := setting.AdminContractSubscribe
hashMap, err := uo.redisDB.HGetAll(context.Background(), topIc).Result()
if err != nil {
applogger.Error("orderSubAdminContractSubscribe.HGetAll:%v", err)
time.Sleep(5 * time.Second)
continue
}
var hashList []virtual.ContractTallyCache
for _, value := range hashMap {
var msg virtual.ContractTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
switch msg.Status {
case flags.Entrust: // 挂单
case flags.Position: // 持仓
hashList = append(hashList, msg)
default: // 平仓|撤单
err = uo.redisDB.HDel(context.Background(), topIc, msg.OrderId).Err()
if err != nil {
applogger.Error("AdminContractSubscribe.HDel:%v", err)
continue
}
}
}
applogger.Info("合约数据量统计:%v", len(hashList))
// 统计合约总持仓订单浮动盈亏
priceSum := decimal.Zero
for _, value := range hashList {
orderModel := virtualData.ContractOrderProcessing(topIc, value)
if orderModel != nil {
var subPrice decimal.Decimal
newPrice, err := decimal.NewFromString(orderModel.Price)
if err != nil {
continue
}
openPrice, err := decimal.NewFromString(orderModel.OpenPrice)
if err != nil {
continue
}
switch value.Order.TradeType {
case flags.TradeTypeBuy: // 买涨 = 新价格 - 开仓价
subPrice = newPrice.Sub(openPrice)
case flags.TradeTypeSell: // 卖跌 = 开仓价 - 新价格
subPrice = openPrice.Sub(newPrice)
default:
subPrice = decimal.Zero
}
orderNumber := decimal.RequireFromString(value.Order.OrderNumber)
pryNum := decimal.RequireFromString(value.Order.PryNum)
price := subPrice.Mul(orderNumber).Mul(pryNum) // 合约浮动盈亏计算
priceSum = priceSum.Add(price) // 累加盈亏统计
}
}
// 写入缓存
if err = memory.ContractFloating.Set(flags.FloatingHy, []byte(priceSum.String())); err != nil {
applogger.Error("统计合约持仓订单浮动盈亏错误:%v", err)
time.Sleep(5 * time.Second)
continue
}
applogger.Info("统计合约持仓订单浮动盈亏:%v", priceSum)
time.Sleep(600 * time.Millisecond)
}
}
// ContractTransactionEntrust
//
// @Description: 合约委托订单
// @param ctx
func ContractTransactionEntrust(ctx context.Context) {
for {
ContractTransactionCalculationEntrust(ctx)
time.Sleep(400 * time.Millisecond)
}
}
// ContractTransactionCalculationEntrust
//
// @Description:
// @param ctx
func ContractTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
entrustList, err := Reds.HGetAll(context.Background(), setting.MarketContractEntrust).Result()
if err != nil {
applogger.Error("%v ContractTransactionCalculationEntrust.HGetAll:%v", common.ErrContract, err)
return
}
for _, value := range entrustList {
var msg = make(chan virtual.ContractTallyCache, 1)
var entrust virtual.ContractTallyCache
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ContractTransactionCalculationEntrust.Unmarshal:%v", common.ErrContract, err)
continue
}
var newPrice decimal.Decimal
newPrice, err = GetContractPrice(entrust.Symbol)
if err != nil {
applogger.Error("%v ContractTransactionCalculationPosition.GetContractPrice:%v", common.ErrContract, err)
continue
}
wg.Add(1)
go func(value string) {
resultMsg := ContractConcurrentComputingEntrust(&entrust, newPrice, &wg)
msg <- resultMsg // 通知管道异步处理订单操作
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Info("从信道接收合约挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Info("从信道接收到合约持仓信息:%v", resultMsg)
}
// 处理开仓逻辑
if err = ContractLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ContractTransactionCalculationEntrust.ContractLiquidationEntrust:%v", common.ErrContract, err)
continue
}
// 写入持仓缓存列表
data, err := json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ContractTransactionCalculationEntrust.Marshal:%v", common.ErrContract, err)
continue
}
// TODO: 写入持仓消息列表
//if err = uo.mqProducer.Entrust.PushNotice(data); err != nil {
// applogger.Error("%v ContractTransactionCalculationEntrust.PushNotice:%v", common.ContractError, err)
// continue
//}
// 写入合约持仓缓存队列
if err = Reds.HSet(context.Background(), setting.MarketContractPosition, resultMsg.OrderId, string(data)).Err(); err != nil {
applogger.Error("%v ContractTransactionCalculationEntrust.MarketContractPosition.HSet:%v", common.ErrContract, err)
continue
}
}
}
}
wg.Wait()
applogger.Info("合约挂单并发计算执行完毕......")
}
// ContractConcurrentComputingEntrust
//
// @Description: 合约挂单缓存列表业务处理
// @param order
// @param orderOld
// @param newPrice
// @param bidPrice
// @param askPrice
// @param wg
// @return tradedeal.ContractTallyCache
func ContractConcurrentComputingEntrust(order *virtual.ContractTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) virtual.ContractTallyCache {
var deleteCache bool
var dealPrice string
var limitPrice = decimal.RequireFromString(order.Order.LimitPrice) // 限价-价格
var status = order.Status // 原始价格
switch order.Order.TradeType {
case flags.TradeTypeBuy: // 买涨
if !flags.CheckSetting {
applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status)
}
// 限价买入逻辑判定 -> 挂单金额 > 当前价格
if limitPrice.Cmp(newPrice) > 0 {
deleteCache = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Add(difference).String() // 开仓价格
dealPrice = limitPrice.String() // 开仓价格
}
case flags.TradeTypeSell: // 买跌
if !flags.CheckSetting {
applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status)
}
// 限价卖入逻辑判定 -> 挂单金额 < 当前价格
if limitPrice.Cmp(newPrice) < 0 {
deleteCache = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Sub(difference).String() // 开仓价格
dealPrice = limitPrice.String() // 开仓价格
}
}
if deleteCache {
order.Status = flags.Position
// 删除挂单缓存列表
if err := Reds.HDel(context.Background(), setting.MarketContractEntrust, order.OrderId).Err(); err != nil {
applogger.Error("%v ContractConcurrentComputingEntrust.MarketContractEntrust.HDel:%v", common.ErrContract, err)
order.Status = status
dealPrice = decimal.Zero.String()
}
}
wg.Done()
return virtual.ContractTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
OpenPrice: dealPrice, // 开仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
}
}
// ContractMqOpenBusiness
//
// @Description: TODO: 合约持仓消息队列
// @param ctx
// @param uo
func ContractMqOpenBusiness(ctx context.Context, uo *Data) {
go func() {
uo.mqConsumer.Entrust.ConsumerNotice()
}()
for {
select {
case message, ok := <-consumer.ConsumerEntrustMq.ContractMq:
if !ok {
time.Sleep(5 * time.Second)
applogger.Error("%v ContractMqOpenBusiness.ContractMq err....", common.ErrContract)
continue
}
// 持仓订单缓存数据序列化解析
var resultMsg virtual.ContractTallyCache
if err := json.Unmarshal(message, &resultMsg); err != nil {
applogger.Error("%v ContractMqOpenBusiness.Unmarshal:%v", common.ErrContract, err)
time.Sleep(5 * time.Second)
continue
}
// 持仓平仓
if err := ContractLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ContractMqOpenBusiness.ContractLiquidationEntrust:%v", common.ErrContract, err)
continue
}
default:
// TODO: 是否处理空队列缓存的情况
applogger.Error("这里没有监控到持仓mq队列消息.........")
time.Sleep(2 * time.Second)
}
}
}
// ContractLiquidationEntrust
//
// @Description: 合约挂单-->开仓业务处理
// @param ctx
// @param order
// @return error
func ContractLiquidationEntrust(ctx context.Context, order *virtual.ContractTallyCache) error {
// 1、合约开仓操作
err := ContractOpenPosition(ctx, Msql, order.UserId, order.OrderId, order.OpenPrice, order.Order)
if err != nil {
applogger.Error("%v ContractLiquidationEntrust.ContractOpenPosition:%v", common.ErrContract, err)
return err
}
// 2、开仓更新用户合约订阅缓存订单状态
userSubKey := virtual.OrderIdListKey(setting.ContractSubscribe, order.UserId)
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Position, order.ClosingPrice); err != nil {
applogger.Error("%v ContractLiquidationEntrust.ContractSubscribe:%v", common.ErrContract, err)
return err
}
// 3、开仓更新管理员合约订阅缓存订单状态
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, setting.AdminContractSubscribe, flags.Position, order.ClosingPrice); err != nil {
applogger.Error("%v ContractLiquidationEntrust.AdminContractSubscribe:%v", common.ErrContract, err)
return err
}
return nil
}
// ContractTransactionPosition
//
// @Description: 合约-持仓业务处理
// @param ctx
func ContractTransactionPosition(ctx context.Context) {
for {
ContractTransactionCalculationPosition(ctx)
time.Sleep(400 * time.Millisecond)
}
}
// ContractTransactionCalculationPosition
//
// @Description: 监控合约持仓缓存列表(注意:止盈止损|强行平仓)
// @param ctx
func ContractTransactionCalculationPosition(ctx context.Context) {
var wg sync.WaitGroup
entrustList, err := Reds.HGetAll(context.Background(), setting.MarketContractPosition).Result()
if err != nil {
applogger.Error("%v ContractTransactionCalculationPosition.LRange:%v", common.ErrContract, err)
return
}
for _, value := range entrustList {
var msg = make(chan virtual.ContractTallyCache, 1)
var entrust virtual.ContractTallyCache
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ContractTransactionCalculationPosition.Unmarshal:%v", common.ErrContract, err)
continue
}
var newPrice decimal.Decimal
newPrice, err = GetContractPrice(entrust.Symbol)
if err != nil {
applogger.Warn("%v ContractTransactionCalculationPosition.GetContractPrice:%v", common.ErrContract, err)
continue
}
wg.Add(1)
go func(value string) {
resultMsg := ContractConcurrentComputingPosition(&entrust, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Position: // 持仓
if !flags.CheckSetting {
applogger.Info("从信道接收到合约持仓信息:%v", resultMsg)
}
case flags.Close: // 平仓
if !flags.CheckSetting {
applogger.Info("从信道接收到合约平仓信息:%v", resultMsg)
}
// TODO: 平仓操作(优化写入队列中等待平仓)
//var byteStr []byte
//byteStr, err = json.Marshal(resultMsg)
//if err != nil {
// applogger.Error("%v ContractTransactionCalculationPosition.Marshal:%v", common.ContractError, err)
// continue
//}
//if err = uo.mqProducer.Position.PushNotice(byteStr); err != nil {
// applogger.Error("%v ContractTransactionCalculationPosition.PushNotice:%v", common.ContractError, err)
// continue
//}
if err = ContractLiquidationPosition(ctx, &resultMsg); err != nil {
applogger.Error("%v ContractTransactionCalculationPosition.ContractLiquidationPosition:%v", common.ErrContract, err)
continue
}
}
}
}
wg.Wait()
applogger.Info("合约持仓并发计算执行完毕......")
}
// ContractConcurrentComputingPosition
//
// @Description: 合约持仓缓存列表业务处理
// @param order
// @param newPrice
// @param wg
// @return tradedeal.ContractTallyCache
func ContractConcurrentComputingPosition(order *virtual.ContractTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) virtual.ContractTallyCache {
var deleteCache, checkBool bool
var dealPrice string
var sellShort decimal.Decimal
var openPrice = decimal.RequireFromString(order.OpenPrice) // 限价
var orderNumber = decimal.RequireFromString(order.Order.OrderNumber) // 仓位
var status = order.Status // 原始状态
var pryNum = decimal.RequireFromString(order.Order.PryNum) // 杠杆数
// 下单判定设置(false无设置|true止盈止损)
_, stopWinPrice, stopLossPrice, _ := ContractVoteStopType(order.Order)
switch order.Order.TradeType {
case flags.TradeTypeBuy: // 买涨(强平)
if !flags.CheckSetting {
applogger.Info("用户ID:%v,持仓开仓价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status)
}
// 买涨挂止损止盈,止盈价必须<当前价,止损价必须>当前价;
if !stopWinPrice.IsZero() {
if stopWinPrice.Cmp(newPrice) < 0 {
checkBool = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Sub(difference).String() // 平仓价格
dealPrice = newPrice.String() // 平仓价格
}
}
if !stopLossPrice.IsZero() {
if stopLossPrice.Cmp(newPrice) > 0 {
checkBool = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Add(difference).String() // 平仓价格
dealPrice = newPrice.String() // 平仓价格
}
}
// 强行平仓(做多)
if !checkBool {
sellShort = newPrice.Sub(openPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum)
}
case flags.TradeTypeSell: // 买跌(存在强行平仓)
if !flags.CheckSetting {
applogger.Info("用户ID:%v,限价持仓买跌下单价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status)
}
// 买跌挂止损止盈,止盈价必须>当前价,止损价必须<当前价;
if !stopWinPrice.IsZero() {
if stopWinPrice.Cmp(newPrice) > 0 {
checkBool = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Add(difference).String() // 平仓价格
dealPrice = newPrice.String() // 平仓价格
}
}
if !stopLossPrice.IsZero() {
if stopLossPrice.Cmp(newPrice) < 0 {
checkBool = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//dealPrice = newPrice.Sub(difference).String() // 平仓价格
dealPrice = newPrice.String() // 平仓价格
}
}
// 强行平仓(做空)
if !checkBool {
sellShort = openPrice.Sub(newPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum)
}
}
/* 强行平仓(做多|做空)
1、做多:(最新成交价-开仓成交价)*仓位*杠杆
1、做空:(开仓成交价-最新成交价)*仓位*杠杆
2、当浮动盈亏 >= 保证金的70%订单强行平仓
*/
if sellShort.IsNegative() {
orderAmount := decimal.RequireFromString(order.Order.EarnestMoney)
floatPrice := orderAmount.Mul(order.Order.System.CompelNum)
if !flags.CheckSetting {
applogger.Info("强平保证金:%v,强平浮动70%:%v,强行平仓判定:%v", orderAmount, floatPrice, sellShort.Abs().Cmp(floatPrice))
}
if sellShort.Abs().Cmp(floatPrice) >= 0 {
checkBool = true
//difference := newPrice.Mul(utils.Difference()) // 设置价差
//switch order.Order.TradeType {
//case flags.TradeTypeBuy: //买涨
//dealPrice = newPrice.Sub(difference).String() // 平仓价格
//case flags.TradeTypeSell: // 买跌
//dealPrice = newPrice.Add(difference).String() // 平仓价格
//}
dealPrice = newPrice.String() // 平仓价格
}
}
// 判定订单状态
if checkBool {
deleteCache = true
order.Status = flags.Close
}
if deleteCache {
// 删除持仓缓存列表数据
if err := Reds.HDel(context.Background(), setting.MarketContractPosition, order.OrderId).Err(); err != nil {
applogger.Error("%v ContractConcurrentComputingPosition.MarketContractPosition.HDel:%v", common.ErrContract, err)
order.Status = status
}
}
wg.Done()
return virtual.ContractTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
ClosingPrice: dealPrice, // 平仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
}
}
// ContractMqClosingBusiness
//
// @Description: TODO: 处理合约平仓消息队列
// @param ctx
// @param uo
func ContractMqClosingBusiness(ctx context.Context, uo *Data) {
go func() {
uo.mqConsumer.Position.ConsumerNotice()
}()
for {
select {
case message, ok := <-consumer.ConsumerPositionMq.ContractMq:
if !ok {
time.Sleep(5 * time.Second)
applogger.Error("%v ContractMqClosingBusiness.ContractMq err....", common.ErrContract)
continue
}
applogger.Info("接收到平仓信息:%v", string(message))
// 持仓订单缓存数据序列化解析
var resultMsg virtual.ContractTallyCache
if err := json.Unmarshal(message, &resultMsg); err != nil {
applogger.Error("%v ContractMqClosingBusiness.Unmarshal:%v", common.ErrContract, err)
time.Sleep(5 * time.Second)
continue
}
// 持仓平仓
if err := ContractLiquidationPosition(ctx, &resultMsg); err != nil {
applogger.Error("%v ContractMqClosingBusiness.ContractLiquidationPosition:%v", common.ErrContract, err)
continue
}
default:
// TODO: 是否处理空队列缓存的情况
time.Sleep(2 * time.Second)
}
}
}
// ContractLiquidationPosition
//
// @Description: 合约平仓操作
// @param ctx
// @param order
// @return error
func ContractLiquidationPosition(ctx context.Context, order *virtual.ContractTallyCache) error {
// 1、平仓操作
err := ContractClosingPosition(ctx, Msql, order.OrderId, order.ClosingPrice, order.Order)
if err != nil {
applogger.Error("%v ContractLiquidationPosition.ContractClosingPosition:%v", common.ErrContract, err)
return err
}
// 2、平仓更新用户合约订阅订单状态
userSubKey := virtual.OrderIdListKey(setting.ContractSubscribe, order.UserId)
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Close, order.ClosingPrice); err != nil {
applogger.Error("%v ContractLiquidationPosition.ContractSubscribe:%v", common.ErrContract, err)
return err
}
// 3、平仓更新管理员合约订阅订单状态
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, setting.AdminContractSubscribe, flags.Close, order.ClosingPrice); err != nil {
applogger.Error("%v ContractLiquidationPosition.AdminContractSubscribe:%v", common.ErrContract, err)
return err
}
return nil
}
// ContractVoteStopType
//
// @Description: 设置合约止盈止损
// @param order
// @return bool
// @return decimal.Decimal
// @return decimal.Decimal
// @return error
func ContractVoteStopType(order structure.ContractOrder) (bool, decimal.Decimal, decimal.Decimal, error) {
var checkBool bool
var stopWinPrice, stopLossPrice decimal.Decimal
switch order.StopType {
case flags.StopTypeNone: // 暂无设置止盈止损
checkBool = false
case flags.StopTypeSet: // 设置止盈止损
checkBool = true
if len(order.StopWinPrice) != 0 {
stopWinPrice = decimal.RequireFromString(order.StopWinPrice)
}
if len(order.StopLossPrice) != 0 {
stopLossPrice = decimal.RequireFromString(order.StopLossPrice)
}
default:
return checkBool, stopWinPrice, stopLossPrice, flags.ErrContractThirteen
}
return checkBool, stopWinPrice, stopLossPrice, nil
}
// GetContractPrice
//
// @Description: 获取合约价格
// @param symbol
// @return decimal.Decimal
// @return decimal.Decimal
// @return decimal.Decimal
// @return error
func GetContractPrice(symbol string) (decimal.Decimal, error) {
var newPrice decimal.Decimal
key := publicData.SymbolCache(flags.Hy, symbol, flags.TradeTypePrice)
priceByte, err := memory.GetContractCache(key)
if err != nil {
applogger.Error("%v ContractTransactionCalculationPosition.Get:%v", common.ErrContract, err)
return decimal.Decimal{}, err
}
newPrice, err = decimal.NewFromString(string(priceByte))
if err != nil {
return decimal.Decimal{}, err
}
applogger.Info("contractCode:%v,topIc:%v,contractPrice:%v", symbol, key, newPrice)
return newPrice, nil
}