package socket import ( "context" "encoding/json" "github.com/shopspring/decimal" "matchmaking-system/internal/data" "matchmaking-system/internal/data/socket/virtualData" "matchmaking-system/internal/data/tradedeal/virtual" "matchmaking-system/internal/pkg/flags" "matchmaking-system/internal/pkg/logging/applogger" models "matchmaking-system/internal/pkg/model" "matchmaking-system/internal/pkg/setting" "time" ) // orderSubSpotsSubscribe // // @Description: 用户现货订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubSpotsSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SpotsSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.SpotsSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg virtual.SpotsTallyCache err = json.Unmarshal([]byte(value), &msg) if err != nil { applogger.Error("SpotsSubscribe.Unmarshal:%v", err) time.Sleep(5 * time.Second) continue } orderStr, orderModel := virtualData.SpotOrderProcessing(setting.SpotsSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理现货撤单和完成订单状态 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("SpotsSubscribe.HDel:%v", err) continue } } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubSpotsMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketSpotsCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SpotsMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserDigital []models.BotUserDigital err = data.Msql.Table(flags.BotUserDigital).Where("user_id = ?", userId).Where("digital_id = ?", flags.BasicUnit).Find(&botUserDigital) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserDigital { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotDigitalTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketSpotsCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botDigitalTrade []models.BotDigitalTrade err = data.Msql.Table(flags.BotDigitalTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botDigitalTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botDigitalTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botDigitalTradeTradeFee models.BotDigitalTrade totalFee, err := data.Msql.Table(flags.BotDigitalTrade). Where("user_id = ?", userId). In("status", 1, 3). Sums(botDigitalTradeTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) MarketSpotsCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetSpotsByPriceSum(userId, setting.SpotsSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Spots stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Spots stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubContractSubscribe // // @Description: 用户合约订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubContractSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ContractSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ContractSubscribe, userId) // 获取订阅Key orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg virtual.ContractTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := virtualData.ContractOrderProcessing(setting.ContractSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理合约订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ContractSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("orderSubContractSubscribe.Marshal:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户合约(挂单|持仓)订阅 } else { applogger.Info("User cancels contract order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubContractMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketContractCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ContractMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserContract []models.BotUserContract err = data.Msql.Table(flags.BotUserContract).Where("user_id = ?", userId).Where("contract_id = ?", flags.BasicUnit).Find(&botUserContract) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserContract { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotContractTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketContractCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botContractTrade []models.BotContractTrade err = data.Msql.Table(flags.BotContractTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botContractTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botContractTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botContractTradeFee models.BotContractTrade totalFee, err := data.Msql.Table(flags.BotContractTrade). Where("user_id = ?", userId). In("status", 1, 3). Sums(botContractTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) MarketContractCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetContractByPriceSum(userId, setting.AdminContractSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Contract stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Contract stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubSecondSubscribe // // @Description: 用户秒合约订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubSecondSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SecondSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.SecondSubscribe, userId) // 获取订阅Key orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg virtual.ContractTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := virtualData.ContractOrderProcessing(setting.SecondSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理秒合约订阅缓存订单中(平仓)状态的订单 if orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("orderSubSecondSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("orderSubSecondSubscribe.Marshal:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户合约(挂单|持仓)订阅 } else { applogger.Info("User cancels contract order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubSecondMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketSecondCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SecondMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserContractSec []models.BotUserContractSec err = data.Msql.Table(flags.BotUserContractSec).Where("user_id = ?", userId).Where("contract_id = ?", flags.BasicUnit).Find(&botUserContractSec) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserContractSec { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotContractSecTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketSecondCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botContractSecTrade []models.BotContractSecTrade err = data.Msql.Table(flags.BotContractSecTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botContractSecTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botContractSecTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botContractSecTradeFee models.BotContractSecTrade totalFee, err := data.Msql.Table(flags.BotContractSecTrade). Where("user_id = ?", userId). In("status", 1, 3). Sums(botContractSecTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) MarketSecondCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetSecondByPriceSum(userId, setting.AdminSecondSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Second stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Second stock orders.") return } } time.Sleep(800 * time.Millisecond) } }