package data import ( "context" "encoding/json" "fmt" "github.com/shopspring/decimal" "matchmaking-system/internal/biz/structure" "matchmaking-system/internal/data/memory" "matchmaking-system/internal/data/socket/moneyData" "matchmaking-system/internal/data/socket/publicData" "matchmaking-system/internal/data/tradedeal/money" "matchmaking-system/internal/data/tradedeal/virtual" "matchmaking-system/internal/pkg/flags" "matchmaking-system/internal/pkg/logging/applogger" "matchmaking-system/internal/pkg/logging/common" "matchmaking-system/internal/pkg/setting" "matchmaking-system/internal/pkg/utils" "regexp" "strconv" "strings" "sync" "time" ) /* 综合(现货|合约|外汇)行情订阅 1、初始化订阅信息 2、接收用户下单的交易对 3、根据交易对订阅行情数据 4、写入内存中用于并发计算 5、记录在热缓存中 */ // QuitesMoneyJsonStruct // @Description: 现货|合约|外汇 type QuitesMoneySpots struct { ServersID string `json:"serversId"` Content struct { Ch string `json:"ch"` Ts int64 `json:"ts"` Tick struct { Open string `json:"open"` High string `json:"high"` Low string `json:"low"` Close string `json:"close"` Amount string `json:"amount"` Count int `json:"count"` Bid string `json:"bid"` BidSize string `json:"bidSize"` Ask string `json:"ask"` AskSize string `json:"askSize"` LastPrice string `json:"lastPrice"` LastSize string `json:"lastSize"` } `json:"Tick"` Data any `json:"Data"` } `json:"content"` Symbol string `json:"symbol"` } type QuitesMoneyContract struct { ServersID string `json:"serversId"` Content struct { Ch string `json:"ch"` Ts int64 `json:"ts"` Tick struct { ID int `json:"id"` Mrid int64 `json:"mrid"` Open string `json:"open"` Close string `json:"close"` High string `json:"high"` Low string `json:"low"` Amount string `json:"amount"` Vol string `json:"vol"` TradeTurnover string `json:"trade_turnover"` Count int `json:"count"` Asks any `json:"asks"` Bids any `json:"bids"` } `json:"Tick"` } `json:"content"` Symbol string `json:"symbol"` } type MoneyContractSetUp struct { SelfContractCode string `json:"selfContractCode"` // 合约交易对 BeginTime string `json:"BeginTime"` // 开始时间 Step int `json:"Step"` // 设置时间间隔 EndTime string `json:"EndTime"` // 结束时间 Price string `json:"Price"` // 设置价格 } type QuitesMoneyForex struct { Ev string `json:"ev"` // 事件类型 P string `json:"p"` // 交易对 A float64 `json:"a"` // 买价 X int `json:"x"` // 交易所标识符 B float64 `json:"b"` // 卖价 T int64 `json:"t"` // 时间 } type MoneyForexJsonData struct { Event string `json:"ev"` // 事件类型(实时数据) Pair string `json:"pair"` // 货币对 Open float64 `json:"o"` // 开盘价 Close float64 `json:"c"` // 收盘价 High float64 `json:"h"` // 最高价 Low float64 `json:"l"` // 最低价 Volume int `json:"v"` // 交易量 Timestamp int64 `json:"s"` // 时间戳 } // MoneySymbol // @Description: 综合-(现货|合约|外汇)订阅交易簿 type MoneySymbol struct { MoneySpotsMap chan []byte // 现货订单交易对 -- 用户现货下单通知订阅 MoneySpotsMapSymbol sync.Map // 现货订阅交易对簿 MoneyContractMap chan []byte // 合约订单交易对 -- 用户合约下单通知订阅 MoneyContractMapSymbol sync.Map // 合约订阅交易对簿 MoneyForexMap chan []byte // 外汇订单交易对 -- 用户外汇下单通知订阅 MoneyForexMapSymbol sync.Map // 外汇订阅交易对簿 } // JsonData // @Description: 综合-(现货|合约|外汇)用户总资产 type JsonData struct { UserId int64 `json:"user_id"` TotalAmount string `json:"total_amount"` } // 综合-(现货|合约|外汇)下单交易对订阅 func InitSubscribeQuotesMoneySpots(sp *MoneySymbol) { for { listSpots, err := LoadLRangeList(setting.MarketSpots) if err != nil { applogger.Error("%v InitSubscribeQuotesMoneySpots.LoadLRangeList:%v", common.ErrMoney, err) return } for _, value := range listSpots { _, ok := sp.MoneySpotsMapSymbol.Load(value) if ok { continue } sp.MoneySpotsMap <- []byte(value) } time.Sleep(10 * time.Second) } } func InitSubscribeQuotesMoneyContract(ct *MoneySymbol) { for { listContract, err := LoadLRangeList(setting.MarketContract) if err != nil { applogger.Error("%v InitSubscribeQuotesMoneyContract.LoadLRangeList:%v", common.ErrMoney, err) return } for _, value := range listContract { usdtToUsd := strings.Replace(value, "USDT", "USD", -1) _, ok := ct.MoneyContractMapSymbol.Load(usdtToUsd) if ok { continue } ct.MoneyContractMap <- []byte(usdtToUsd) } time.Sleep(10 * time.Second) } } func InitSubscribeQuotesMoneyForex(mf *MoneySymbol) { for { listForex, err := LoadLRangeList(setting.MarketForex) if err != nil { applogger.Error("%v InitSubscribeQuotesMoneyForex.LoadLRangeList:%v", common.ErrMoney, err) return } re := regexp.MustCompile(`USD$`) for _, value := range listForex { if !re.MatchString(value) { continue } _, ok := mf.MoneyForexMapSymbol.Load(value) if ok { continue } mf.MoneyForexMap <- []byte(value) } time.Sleep(10 * time.Second) } } // 综合-(现货|合约|外汇)交易对行情订阅 func SubscribeQuotesMoneySpots(ctx context.Context, uo *Data, ms *MoneySymbol) { for { select { case symbol, _ := <-ms.MoneySpotsMap: symbolStr := string(symbol) // Prevent duplicate subscription to CtrIp _, ok := ms.MoneySpotsMapSymbol.Load(symbolStr) if ok { time.Sleep(5 * time.Second) continue } // Write to the redis list hot cache for initializing subscriptions when the program is pulled up checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, symbolStr).Result() if err != nil { applogger.Error("%v SubscribeQuotesMoneySpots.MarketSpots.HExists:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } if !checkBool { if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, symbolStr, symbolStr).Err(); err != nil { applogger.Error("%v SubscribeQuotesMoneySpots.HSet:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } } go func() { topIc := fmt.Sprintf("market.%vusdt.ticker", symbolStr) pubSub := uo.redisDB.Subscribe(ctx, topIc) defer pubSub.Close() if _, err = pubSub.Receive(ctx); err != nil { applogger.Error("%v SubscribeQuotesMoneySpots.Receive:%v", common.ErrMoney, err) return } ch := pubSub.Channel() for msg := range ch { var subMsg QuitesMoneySpots if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("%v SubscribeQuotesMoneySpots.Unmarshal:%v", common.ErrMoney, err) close(ms.MoneySpotsMap) return } // 交易类型:0最新价,1买入,2卖出 xh-symbol-0 var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Xh, symbolStr, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Xh, symbolStr, flags.TradeTypePrice) } if err = memory.MoneySpotsCache.Set(keyPrice, []byte(subMsg.Content.Tick.Close)); err != nil { applogger.Error("%v SubscribeQuotesMoneySpots.Set.price:%v", common.ErrMoney, err) return } if !flags.CheckSetting { applogger.Info("当前写入最新价格:%v,%v", keyPrice, subMsg.Content.Tick.Close) } } }() // Write in the map to determine whether to repeat subscription ms.MoneySpotsMapSymbol.Store(symbolStr, symbolStr) } } } func SubscribeQuotesMoneyContract(ctx context.Context, uo *Data, ms *MoneySymbol) { for { select { case symbol, _ := <-ms.MoneyContractMap: symbolStr := string(symbol) usdToUsdt := strings.Replace(symbolStr, "USD", "USDT", -1) // Prevent duplicate subscription to CtrIp _, ok := ms.MoneyContractMapSymbol.Load(symbolStr) if ok { time.Sleep(5 * time.Second) continue } // Write to the redis list hot cache for initializing subscriptions when the program is pulled up checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketContract, symbolStr).Result() if err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.MarketContract.HExists:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } if !checkBool { if err = uo.redisDB.HSet(context.Background(), setting.MarketContract, symbolStr, symbolStr).Err(); err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.MarketContract.HSet:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } } // 订阅插针价格 go func() { for { var data string data, err = uo.redisDB.Get(context.Background(), flags.ContractSystemPriceSetUp).Result() if err != nil || data == "[]" { time.Sleep(5 * time.Second) continue } var dataList []MoneyContractSetUp if err = json.Unmarshal([]byte(data), &dataList); err != nil { time.Sleep(5 * time.Second) continue } for _, value := range dataList { checkStart := len(utils.StrReplace(value.BeginTime)) checkEnd := len(utils.StrReplace(value.EndTime)) if checkStart == 0 || checkEnd == 0 { continue } if value.SelfContractCode == symbolStr { // Determine if it is the current subscription type var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypePrice) } if !utils.CheckTimeUTC(value.BeginTime, value.EndTime) { continue } if err = memory.MoneyContractPriceSetUp.Set(keyPrice, []byte(utils.DecimalsPrice(value.Price))); err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.MoneyContractPriceSetUp.set:%v", common.ErrMoney, err) continue } if !flags.CheckSetting { applogger.Info("当前写入插针最新价格:%v,%v", keyPrice, value.Price) } } } time.Sleep(2 * time.Second) } }() // 订阅实时价格 go func() { topIc := fmt.Sprintf("market.%v.detail", usdToUsdt) pubSub := uo.redisDB.Subscribe(ctx, topIc) defer pubSub.Close() if _, err = pubSub.Receive(ctx); err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.Receive:%v", common.ErrMoney, err) return } chp := pubSub.Channel() for msg := range chp { var subMsg QuitesMoneyContract if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.QuitesMoneyContract.Unmarshal:%v", common.ErrMoney, err) close(ms.MoneyContractMap) return } // 交易类型:0最新价,1买入,2卖出 var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypePrice) } if err = memory.MoneyContractCache.Set(keyPrice, []byte(subMsg.Content.Tick.Close)); err != nil { applogger.Error("%v SubscribeQuotesMoneyContract.Set:%v", common.ErrMoney, err) continue } if !flags.CheckSetting { applogger.Info("当前写入最新价格:%v,%v", keyPrice, subMsg.Content.Tick.Close) } } }() // Write in the map to determine whether to repeat subscription ms.MoneyContractMapSymbol.Store(symbolStr, symbolStr) } } } func SubscribeQuotesMoneyForex(ctx context.Context, uo *Data, ms *MoneySymbol) { for { select { case symbol, _ := <-ms.MoneyForexMap: symbolStr := string(symbol) // Prevent duplicate subscription to CtrIp _, ok := ms.MoneyForexMapSymbol.Load(symbolStr) if ok { time.Sleep(5 * time.Second) continue } // Write to the redis list hot cache for initializing subscriptions when the program is pulled up checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketForex, symbolStr).Result() if err != nil { applogger.Error("%v SubscribeQuotesMoney.MarketMoney.HExists:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } if !checkBool { if err = uo.redisDB.HSet(context.Background(), setting.MarketForex, symbolStr, symbolStr).Err(); err != nil { applogger.Error("%v SubscribeQuotesMoney.MarketMoney.HSet:%v", common.ErrMoney, err) time.Sleep(5 * time.Second) continue } } // 订阅实时报价 go func() { topIc := fmt.Sprintf("%v.Forex", symbolStr) pubSub := uo.redisDB.Subscribe(ctx, topIc) defer pubSub.Close() if _, err = pubSub.Receive(ctx); err != nil { applogger.Error("%v SubscribeQuotesMoney.Receive:%v", common.ErrMoney, err) return } chp := pubSub.Channel() for msg := range chp { var subMsg MoneyForexJsonData if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("%v SubscribeQuotesMoney.QuitesMoney.Unmarshal:%v", common.ErrMoney, err) close(ms.MoneyForexMap) return } // 交易类型:0最新价,1买入,2卖出 var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Wh, symbolStr, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Wh, symbolStr, flags.TradeTypePrice) } price := strconv.FormatFloat(subMsg.Close, 'g', -1, 64) if err = memory.MoneyForexImmediateCache.Set(keyPrice, []byte(price)); err != nil { applogger.Error("%v priceKey Set:%v", common.ErrMoney, err) continue } if !flags.CheckSetting { applogger.Info("当前写入外汇即时最新报价:%v,%v,%v", symbolStr, keyPrice, price) } } }() // Write in the map to determine whether to repeat subscription ms.MoneyForexMapSymbol.Store(symbolStr, symbolStr) } } } // OrderSubAdminMoneySubscribeBySum // // @Description: 综合-(现货|合约|外汇)持仓订单浮动盈亏计算 // @param ctx // @param uo func OrderSubAdminMoneySubscribeBySum(uo *Data) { for { topIc := setting.AdminMoneySubscribe hashMap, err := uo.redisDB.HGetAll(context.Background(), topIc).Result() if err != nil { applogger.Error("orderSubAdminMoneySubscribe.HGetAll:%v", err) time.Sleep(5 * time.Second) continue } var hashList []money.MoneyTallyCache for _, value := range hashMap { var msg money.MoneyTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } switch msg.Status { case flags.Entrust: // 挂单 case flags.Position: // 持仓 hashList = append(hashList, msg) default: // 平仓|撤单 err = uo.redisDB.HDel(context.Background(), topIc, msg.OrderId).Err() if err != nil { applogger.Error("AdminMoneySubscribe.HDel:%v", err) continue } } } applogger.Info("综合(现货|合约|外汇)数据量统计:%v", len(hashList)) // 统计外汇总持仓订单浮动盈亏 priceSum := decimal.Zero for _, value := range hashList { orderModel := moneyData.MoneyOrderProcessing(topIc, value) if orderModel != nil { var subPrice decimal.Decimal newPrice, err := decimal.NewFromString(orderModel.Price) if err != nil { continue } openPrice, err := decimal.NewFromString(orderModel.OpenPrice) if err != nil { continue } switch value.Order.TradeType { case flags.TradeTypeBuy: // 买涨 = 新价格 - 开仓价 subPrice = newPrice.Sub(openPrice) case flags.TradeTypeSell: // 卖跌 = 开仓价 - 新价格 subPrice = openPrice.Sub(newPrice) default: subPrice = decimal.Zero } orderNumber := decimal.RequireFromString(value.Order.OrderNumber) pryNum := decimal.RequireFromString(value.Order.PryNum) price := subPrice.Mul(orderNumber).Mul(pryNum) // 外汇浮动盈亏计算 priceSum = priceSum.Add(price) // 累加盈亏统计 } } // 写入缓存 if err = memory.MoneyTotalFloating.Set(flags.FloatingZh, []byte(priceSum.String())); err != nil { applogger.Error("统计(现货|合约|外汇)持仓订单浮动盈亏错误:%v", err) time.Sleep(5 * time.Second) continue } applogger.Info("统计(现货|合约|外汇)持仓订单浮动盈亏:%v", priceSum) time.Sleep(600 * time.Millisecond) } } // MoneyTransactionEntrust // // @Description: 综合-(现货|合约|外汇)委托订单 // @param ctx func MoneyTransactionEntrust(ctx context.Context) { for { MoneyTransactionCalculationEntrust(ctx) time.Sleep(400 * time.Millisecond) } } // MoneyTransactionCalculationEntrust // // @Description: // @param ctx func MoneyTransactionCalculationEntrust(ctx context.Context) { var wg sync.WaitGroup entrustList, err := Reds.HGetAll(context.Background(), setting.MarketMoneyEntrust).Result() if err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.HGetAll:%v", common.ErrMoney, err) return } for _, value := range entrustList { var msg = make(chan money.MoneyTallyCache, 1) var entrust money.MoneyTallyCache if err = json.Unmarshal([]byte(value), &entrust); err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.Unmarshal:%v", common.ErrMoney, err) continue } // 不同市场的即时价格 var newPrice decimal.Decimal switch entrust.Order.Type { case flags.SpotsMarketType: // 现货 newPrice, err = GetMoneySpotsPrice(entrust.Symbol) if err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.GetMoneySpotsPrice:%v", common.ErrMoney, err) continue } case flags.ContractMarketType: // 合约 newPrice, err = GetMoneyContractPrice(entrust.Symbol) if err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.GetMoneyContractPrice:%v", common.ErrMoney, err) continue } case flags.ForexMarketType: // 外汇 newPrice, err = GetMoneyForexPriceNew(entrust.Symbol) if err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.GetMoneyForexPriceNew:%v", common.ErrMoney, err) continue } default: continue } wg.Add(1) go func(value string) { resultMsg := MoneyConcurrentComputingEntrust(&entrust, newPrice, &wg) msg <- resultMsg }(value) // 处理并发结果(需要改成通过信道通知) select { case resultMsg, _ := <-msg: switch resultMsg.Status { case flags.Entrust: // 挂单中 if !flags.CheckSetting { applogger.Info("从信道接收综合(现货|合约|外汇)挂单信息:%v", resultMsg) } case flags.Position: // 持仓中 if !flags.CheckSetting { applogger.Info("从信道接收到综合(现货|合约|外汇)持仓信息:%v", resultMsg) } // 处理开仓逻辑 if err = MoneyLiquidationEntrust(ctx, &resultMsg); err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.MoneyLiquidationEntrust:%v", common.ErrMoney, err) continue } // 写入持仓缓存列表 data, err := json.Marshal(resultMsg) if err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.Marshal:%v", common.ErrMoney, err) continue } // 写入外汇持仓缓存队列 if err = Reds.HSet(context.Background(), setting.MarketMoneyPosition, resultMsg.OrderId, string(data)).Err(); err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.MarketMoneyPosition.HSet:%v", common.ErrMoney, err) continue } case flags.Close: // 平仓(现货) if !flags.CheckSetting { applogger.Info("从信道接收到综合(现货|合约|外汇)平仓信息:%v", resultMsg) } _, err = MoneyOrdersClosingDB(ctx, Msql, resultMsg.UserId, resultMsg.Order, resultMsg.OrderId, resultMsg.ClosingPrice) if err != nil { applogger.Error("%v MoneyTransactionCalculationEntrust.MoneyClosingPosition:%v", common.ErrMoney, err) } } } } wg.Wait() applogger.Info("综合(现货|合约|外汇)挂单并发计算执行完毕......") } // MoneyConcurrentComputingEntrust // // @Description: 综合-(现货|合约|外汇)监控挂单缓存列表业务处理 // @param order // @param orderOld // @param newPrice // @param bidPrice // @param askPrice // @param wg // @return money.MoneyTallyCache func MoneyConcurrentComputingEntrust(order *money.MoneyTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) money.MoneyTallyCache { var deleteCache bool var dealPrice, closePrice string var limitPrice = decimal.RequireFromString(order.Order.LimitPrice) // 限价-价格 var status = order.Status // 原始状态 switch order.Order.TradeType { case flags.TradeTypeBuy: // 买涨 if !flags.CheckSetting { applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status) } // 限价买入逻辑判定 -> 挂单金额 > 当前价格 if limitPrice.Cmp(newPrice) > 0 { deleteCache = true dealPrice = limitPrice.String() // 开仓价格 } case flags.TradeTypeSell: // 买跌 if !flags.CheckSetting { applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status) } // 限价卖入逻辑判定 -> 挂单金额 < 当前价格 if limitPrice.Cmp(newPrice) < 0 { deleteCache = true dealPrice = limitPrice.String() // 开仓价格 } default: } if deleteCache { switch order.Order.Type { case flags.SpotsMarketType: // 现货 order.Status = flags.Close closePrice = dealPrice default: // 合约|外汇 order.Status = flags.Position closePrice = decimal.Zero.String() } // 删除挂单缓存列表 if err := Reds.HDel(context.Background(), setting.MarketMoneyEntrust, order.OrderId).Err(); err != nil { applogger.Error("%v MoneyConcurrentComputingEntrust.MarketMoneyEntrust.HDel:%v", common.ErrMoney, err) order.Status = status dealPrice = decimal.Zero.String() } } wg.Done() return money.MoneyTallyCache{ UserId: order.UserId, // 用户Id OrderId: order.OrderId, // 订单Id Symbol: order.Symbol, // 下单交易对 OpenPrice: dealPrice, // 开仓价格 ClosingPrice: closePrice, // 平仓价格 Status: order.Status, // 订单状态 Order: order.Order, // 下单信息 } } // MoneyLiquidationEntrust // // @Description: 综合-(现货|合约|外汇)挂单-->开仓业务处理 // @param ctx // @param order // @return error func MoneyLiquidationEntrust(ctx context.Context, order *money.MoneyTallyCache) error { // 1、外汇开仓操作 err := MoneyOpenPosition(ctx, Msql, order.UserId, order.OrderId, order.OpenPrice, order.Order) if err != nil { applogger.Error("%v MoneyLiquidationEntrust.MoneyOpenPosition:%v", common.ErrMoney, err) return err } // 2、开仓更新用户外汇订阅缓存订单状态 userSubKey := virtual.OrderIdListKey(setting.MoneySubscribe, order.UserId) if err = UpdateMoneySubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Position, order.ClosingPrice); err != nil { applogger.Error("%v MoneyLiquidationEntrust.MoneySubscribe:%v", common.ErrMoney, err) return err } // 3、开仓更新管理员外汇订阅缓存订单状态 if err = UpdateMoneySubscribeHashStatusByOrderId(order.OrderId, setting.AdminMoneySubscribe, flags.Position, order.ClosingPrice); err != nil { applogger.Error("%v MoneyLiquidationEntrust.AdminMoneySubscribe:%v", common.ErrMoney, err) return err } return nil } // MoneyTransactionPosition // // @Description: 综合-(现货|合约|外汇)持仓业务处理 // @param ctx func MoneyTransactionPosition(ctx context.Context) { for { MoneyTransactionCalculationPosition(ctx) time.Sleep(400 * time.Millisecond) } } // GetUserAmountByUserId // // @Description: 查询:用户-资产-订单 func GetUserAmountByUserId() (map[int64]decimal.Decimal, map[int64][]money.MoneyTallyCache, error) { var userAmountMap = make(map[int64]decimal.Decimal) var userOrderMap = make(map[int64][]money.MoneyTallyCache) // 分组查询用户总资产 var userMoneyList []JsonData if err := Msql.Table(flags.BotUserMoney). Select("user_id,sum(usable_num+frozen_num) as total_amount"). Where("stock_id = ?", flags.MoneyUnit). GroupBy("user_id"). Find(&userMoneyList); err != nil { applogger.Error("GetUserAmountByUserId err:%v", common.ErrMoney, err) return userAmountMap, userOrderMap, err } for _, value := range userMoneyList { userAmountMap[value.UserId] = decimal.RequireFromString(value.TotalAmount) } // 查询缓存订单 entrustList, err := Reds.HGetAll(context.Background(), setting.MarketMoneyPosition).Result() if err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.LRange:%v", common.ErrMoney, err) return userAmountMap, userOrderMap, err } for _, value := range entrustList { var entrust money.MoneyTallyCache if err = json.Unmarshal([]byte(value), &entrust); err != nil { continue } userOrderMap[entrust.UserId] = append(userOrderMap[entrust.UserId], entrust) } return userAmountMap, userOrderMap, err } // MoneyTransactionCalculationPosition // // @Description: 综合-(现货|合约|外汇)监控持仓缓存列表(注意:止盈止损|强行平仓) // @param ctx func MoneyTransactionCalculationPosition(ctx context.Context) { userByAmount, userByOrder, err := GetUserAmountByUserId() if err != nil { return } //applogger.Debug("用户资产:%v,用户持仓订单:%v", userByAmount, userByOrder) var wg sync.WaitGroup for key, value := range userByAmount { var msg = make(chan map[bool][]money.MoneyTallyCache, 1) orderList, ok := userByOrder[key] if ok { wg.Add(1) go func(value decimal.Decimal) { var check bool // 判定是否爆仓 var floatingPLSum decimal.Decimal // 浮动盈亏 var resultListMsg []money.MoneyTallyCache // 订单信息 var resultMapMsg = make(map[bool][]money.MoneyTallyCache, 1) // 推送订单信息-状态判定(持仓|平仓) for _, vOrder := range orderList { var newPrice decimal.Decimal switch vOrder.Order.Type { case flags.ContractMarketType: // 合约 newPrice, err = GetMoneyContractPrice(vOrder.Symbol) if err != nil { applogger.Warn("%v MoneyTransactionCalculationPosition.GetMoneyContractPrice:%v", common.ErrMoney, err) continue } case flags.ForexMarketType: // 外汇 newPrice, err = GetMoneyForexPriceNew(vOrder.Symbol) if err != nil { applogger.Warn("%v MoneyTransactionCalculationPosition.GetMoneyForexPriceNew:%v", common.ErrMoney, err) continue } default: continue } result, floatingPL := MoneyConcurrentComputingPosition(&vOrder, newPrice, value) floatingPLSum = floatingPLSum.Add(floatingPL) resultListMsg = append(resultListMsg, result) } // TODO: 判定当前用户订单总浮动盈亏是否触发爆仓 if value.Sub(floatingPLSum.Abs()).Cmp(decimal.Zero) <= 0 { check = true } else { check = false } resultMapMsg[check] = resultListMsg wg.Done() // 返回处理消息 msg <- resultMapMsg }(value) // 处理并发结果 select { case resultMsg, _ := <-msg: // check为true全部平仓,反之循环判定订单是否要平仓 for check, message := range resultMsg { if check { for _, moneyStr := range message { moneyStr.Status = flags.StrongParity moneyStr.ClosingPrice = moneyStr.StrongParity // 清理缓存订单 if err = Reds.HDel(context.Background(), setting.MarketMoneyPosition, moneyStr.OrderId).Err(); err != nil { applogger.Error("%v MoneyConcurrentComputingPosition.MarketMoneyPosition.HDel:%v", common.ErrMoney, err) continue } // 用户订单平仓 if err = MoneyLiquidationPosition(ctx, &moneyStr); err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.MoneyLiquidationPosition:%v", common.ErrMoney, err) continue } } } else { for _, moneyStr := range message { switch moneyStr.Status { case flags.Position: // 持仓 if !flags.CheckSetting { applogger.Info("从信道接收到综合(现货|合约|外汇)持仓信息:%v", resultMsg) } case flags.Close: // 平仓 if !flags.CheckSetting { applogger.Info("从信道接收到综合(现货|合约|外汇)平仓信息:%v", resultMsg) } // 清理缓存订单 if err = Reds.HDel(context.Background(), setting.MarketMoneyPosition, moneyStr.OrderId).Err(); err != nil { applogger.Error("%v MoneyConcurrentComputingPosition.MarketMoneyPosition.HDel:%v", common.ErrMoney, err) continue } // 用户订单平仓 if err = MoneyLiquidationPosition(ctx, &moneyStr); err != nil { applogger.Error("%v MoneyTransactionCalculationPosition.MoneyLiquidationPosition:%v", common.ErrMoney, err) continue } } } } } } } } wg.Wait() applogger.Info("综合(现货|合约|外汇)持仓并发计算执行完毕......") } // MoneyConcurrentComputingPosition // // @Description: 综合-(现货|合约|外汇)持仓缓存列表业务处理 // @param order // @param newPrice // @param totalAmount // @return money.MoneyTallyCache // @return decimal.Decimal func MoneyConcurrentComputingPosition(order *money.MoneyTallyCache, newPrice, totalAmount decimal.Decimal) (money.MoneyTallyCache, decimal.Decimal) { var checkBool bool // 标识是否平仓 var dealPrice string // 平仓价 var floatingPL decimal.Decimal // 浮动盈亏 var openPrice = decimal.RequireFromString(order.OpenPrice) // 限价 var orderNumber = decimal.RequireFromString(order.Order.OrderNumber) // 仓位 var pryNum = decimal.RequireFromString(order.Order.PryNum) // 杠杆 // 下单判定设置(false无设置|true止盈止损) _, stopWinPrice, stopLossPrice, _ := MoneyVoteStopType(order.Order) switch order.Order.TradeType { case flags.TradeTypeBuy: // 买涨(强平) if !flags.CheckSetting { applogger.Info("用户ID:%v,持仓开仓价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status) } // 买涨挂止损止盈,止盈价必须<当前价,止损价必须>当前价; if !stopWinPrice.IsZero() { if stopWinPrice.Cmp(newPrice) < 0 { checkBool = true dealPrice = newPrice.String() // 平仓价格 } } if !stopLossPrice.IsZero() { if stopLossPrice.Cmp(newPrice) > 0 { checkBool = true dealPrice = newPrice.String() // 平仓价格 } } // 强行平仓(做多) if !checkBool { floatingPL = newPrice.Sub(openPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum) } case flags.TradeTypeSell: // 买跌(存在强行平仓) if !flags.CheckSetting { applogger.Info("用户ID:%v,限价持仓买跌下单价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status) } // 买跌挂止损止盈,止盈价必须>当前价,止损价必须<当前价; if !stopWinPrice.IsZero() { if stopWinPrice.Cmp(newPrice) > 0 { checkBool = true dealPrice = newPrice.String() // 平仓价格 } } if !stopLossPrice.IsZero() { if stopLossPrice.Cmp(newPrice) < 0 { checkBool = true dealPrice = newPrice.String() // 平仓价格 } } // 强行平仓(做空) if !checkBool { floatingPL = openPrice.Sub(newPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum) } default: } /* 强行平仓(做多|做空) 1、做多:(最新成交价-开仓成交价)*仓位 1、做空:(开仓成交价-最新成交价)*仓位 2、当浮动盈亏 >= 保证金的70%订单强行平仓 */ if floatingPL.IsNegative() { explosiveAssets := totalAmount.Mul(order.Order.System.CompelNum) // 保证金(order.Order.EarnestMoney) = 总资产 if !flags.CheckSetting { applogger.Info("总资产:%v,强平资产:%v,浮动盈亏:%v,强行平仓判定:%v", totalAmount, explosiveAssets, floatingPL, floatingPL.Abs().Cmp(explosiveAssets)) } if floatingPL.Abs().Cmp(explosiveAssets) >= 0 { checkBool = true dealPrice = newPrice.String() } } // 判定订单状态 if checkBool { order.Status = flags.Close } return money.MoneyTallyCache{ UserId: order.UserId, // 用户Id OrderId: order.OrderId, // 订单Id Symbol: order.Symbol, // 下单交易对 ClosingPrice: dealPrice, // 平仓价格 Status: order.Status, // 订单状态 Order: order.Order, // 下单信息 StrongParity: newPrice.String(), // 强平价格 }, floatingPL } // MoneyLiquidationPosition // // @Description: 综合-(现货|合约|外汇)平仓操作 // @param ctx // @param order // @return error func MoneyLiquidationPosition(ctx context.Context, order *money.MoneyTallyCache) error { // 1、平仓操作 status, err := strconv.Atoi(order.Status) if err != nil { status = 3 } err = MoneyClosingPosition(ctx, Msql, order.OrderId, order.ClosingPrice, order.Order, status) if err != nil { applogger.Error("%v MoneyLiquidationPosition.MoneyClosingPosition:%v", common.ErrMoney, err) return err } // 2、平仓更新用户外汇订阅订单状态 userSubKey := virtual.OrderIdListKey(setting.MoneySubscribe, order.UserId) if err = UpdateMoneySubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Close, order.ClosingPrice); err != nil { applogger.Error("%v MoneyLiquidationPosition.MoneySubscribe:%v", common.ErrMoney, err) return err } // 3、平仓更新管理员外汇订阅订单状态 if err = UpdateMoneySubscribeHashStatusByOrderId(order.OrderId, setting.AdminMoneySubscribe, flags.Close, order.ClosingPrice); err != nil { applogger.Error("%v MoneyLiquidationPosition.AdminMoneySubscribe:%v", common.ErrMoney, err) return err } return nil } // MoneyVoteStopType // // @Description: 设置综合-(现货|合约|外汇)止盈止损 // @param order // @return bool // @return decimal.Decimal // @return decimal.Decimal // @return error func MoneyVoteStopType(order structure.MoneyOrder) (bool, decimal.Decimal, decimal.Decimal, error) { var checkBool bool var stopWinPrice, stopLossPrice decimal.Decimal switch order.StopType { case flags.StopTypeNone: // 暂无设置止盈止损 checkBool = false case flags.StopTypeSet: // 设置止盈止损 checkBool = true if len(order.StopWinPrice) != 0 { stopWinPrice = decimal.RequireFromString(order.StopWinPrice) } if len(order.StopLossPrice) != 0 { stopLossPrice = decimal.RequireFromString(order.StopLossPrice) } default: return checkBool, stopWinPrice, stopLossPrice, flags.ErrContractThirteen } return checkBool, stopWinPrice, stopLossPrice, nil } // GetMoneyForexPrice // // @Description: TODO: 综合-外汇交易对卖一买一最新报价 // @param symbol // @param tradeType // @return decimal.Decimal // @return error func GetMoneyForexPrice(symbol string, tradeType int64) (decimal.Decimal, error) { var err error var newPrice []byte var cacheKey string switch tradeType { case 1: // 外汇交易对买一最新报价 cacheKey = publicData.SymbolCache(flags.Wh, symbol, flags.TradeTypeBuy) newPrice, err = memory.GetMoneyForexCache(cacheKey) case 2: // 外汇交易对卖一最新报价 cacheKey = publicData.SymbolCache(flags.Wh, symbol, flags.TradeTypeSell) newPrice, err = memory.GetMoneyForexCache(cacheKey) } if err != nil { applogger.Error("%v GetMoneyForexPrice.Get:%v", common.ErrMoney, err) return decimal.Decimal{}, err } priceList, err := decimal.NewFromString(string(newPrice)) if err != nil { applogger.Error("%v GetMoneyForexPrice.Get:%v", common.ErrMoney, err) return decimal.Decimal{}, err } applogger.Info("MoneyCode:%v,topIc:%v,MoneyPrice:%v", symbol, cacheKey, priceList) return priceList, nil } // GetMoneyForexPriceNew // // @Description: 综合-外汇交易对即时报价 // @param symbol // @return decimal.Decimal // @return error func GetMoneyForexPriceNew(symbol string) (decimal.Decimal, error) { var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = SymbolCache(flags.Wh, symbol, flags.TradeTypeAdminPrice) } else { keyPrice = SymbolCache(flags.Wh, symbol, flags.TradeTypePrice) } priceNew, err := memory.GetMoneyForexImmediateCache(keyPrice) if err != nil { return decimal.Decimal{}, err } priceList, err := decimal.NewFromString(string(priceNew)) if err != nil { applogger.Error("%v GetMoneyForexPriceNew.Get:%v", common.ErrMoney, err) return decimal.Decimal{}, err } applogger.Info("MoneyCode:%v,topIc:%v,MoneyPrice:%v", symbol, keyPrice, priceList) return priceList, nil } // GetMoneySpotsPrice // // @Description: 综合-现货即时报价 // @param symbol // @return decimal.Decimal // @return error func GetMoneySpotsPrice(symbol string) (decimal.Decimal, error) { var err error var newByte []byte var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Xh, symbol, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Xh, symbol, flags.TradeTypePrice) } newByte, err = memory.MoneySpotsCache.Get(keyPrice) if err != nil { applogger.Warn("%v GetMoneySpotsPrice.Get:%v", common.ErrMoney, err) return decimal.Decimal{}, err } priceNew := decimal.RequireFromString(string(newByte)) return priceNew, nil } // GetMoneyContractPrice // // @Description: 综合-合约即时报价 // @param symbol // @return decimal.Decimal // @return error func GetMoneyContractPrice(symbol string) (decimal.Decimal, error) { var newPrice decimal.Decimal var keyPrice string if flags.CheckEnvironment == flags.CheckAdmin { keyPrice = publicData.SymbolCache(flags.Hy, symbol, flags.TradeTypeAdminPrice) } else { keyPrice = publicData.SymbolCache(flags.Hy, symbol, flags.TradeTypePrice) } priceByte, err := memory.GetMoneyContractCache(keyPrice) if err != nil { applogger.Error("%v GetMoneyContractPrice.Get:%v", common.ErrMoney, err) return decimal.Decimal{}, err } newPrice, err = decimal.NewFromString(string(priceByte)) if err != nil { return decimal.Decimal{}, err } applogger.Info("contractCode:%v,topIc:%v,contractPrice:%v", symbol, keyPrice, newPrice) return newPrice, nil }