package socket import ( "context" "encoding/json" "github.com/shopspring/decimal" "matchmaking-system/internal/data" "matchmaking-system/internal/data/socket/shareData" "matchmaking-system/internal/data/tradedeal/share" "matchmaking-system/internal/data/tradedeal/virtual" "matchmaking-system/internal/pkg/flags" "matchmaking-system/internal/pkg/logging/applogger" models "matchmaking-system/internal/pkg/model" "matchmaking-system/internal/pkg/setting" "time" ) // orderSubShareUsSubscribe // // @Description: 用户美股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareUsSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareUsSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareUsSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareUsTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareUsOrderProcessing(setting.ShareUsSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理美股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareUsSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's US stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels subscription to US stock orders.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareUsMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketUsCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } /** 统计用户市场资产数据 1、用户市场总资产: 冻结 + 可用 + 总浮动盈亏 2、用户市场资产: 可用、冻结 3、用户市场累计盈亏(订单表-->平仓状态): 1、买涨:订单量 * (平仓价 - 开仓价) 2、买跌:订单量 * (开仓价 - 平仓价) 4、用户市场总手续费(统计订单表-->【持仓和平仓】状态): 1、交易手续费 2、申购手续费 */ if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareUsMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStock []models.BotUserStock err = data.Msql.Table(flags.BotUserStock).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareUsBasicUnit).Find(&botUserStock) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStock { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketUsCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockTrade []models.BotStockTrade err = data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockTradeFee models.BotStockTrade totalFee, err := data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserUsPreStockOrder models.BotUserUsPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserUsPreStockOrder).Where("user_id = ?", userId).Sum(botUserUsPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketUsCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 usPriceSum := GetShareUsByPriceSum(userId, setting.AdminShareUsSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(usPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = usPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market US stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to US stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareMysSubscribe // // @Description: 用户马股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareMysSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareMysSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareMysSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareMysTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareMysOrderProcessing(setting.ShareMysSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理马股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareMysSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels subscription to horse stock orders.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareMysMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketMysCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareMysMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockMys []models.BotUserStockMys err = data.Msql.Table(flags.BotUserStockMys).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareMysBasicUnit).Find(&botUserStockMys) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockMys { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketMysCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockMysTrade []models.BotStockMysTrade err = data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockMysTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockMysTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockMysTradeFee models.BotStockMysTrade totalFee, err := data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockMysTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserMysPreStockOrder models.BotUserMysPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserMysPreStockOrder).Where("user_id = ?", userId).Sum(botUserMysPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketMysCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareMysByPriceSum(userId, setting.AdminShareMysSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Mys stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Mys stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareThaSubscribe // // @Description: 用户泰股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareThaSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareThaSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareThaSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareThaTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareThaOrderProcessing(setting.ShareThaSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理泰股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareUsSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User Tai Stock Subscription Cache Order Error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels subscription to Thai stock orders.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareThaMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketThaCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareThaMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockTha []models.BotUserStockTha err = data.Msql.Table(flags.BotUserStockTha).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareThaBasicUnit).Find(&botUserStockTha) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockTha { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketThaCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockThaTrade []models.BotStockThaTrade err = data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockThaTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockThaTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockThaTradeFee models.BotStockThaTrade totalFee, err := data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockThaTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserThaPreStockOrder models.BotUserThaPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserThaPreStockOrder).Where("user_id = ?", userId).Sum(botUserThaPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketThaCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareThaByPriceSum(userId, setting.AdminShareThaSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Tha stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Tha stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareIdnSubscribe // // @Description: 用户印尼股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareIdnSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareIdnSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareIdnSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareIdnTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareIdnOrderProcessing(setting.ShareIdnSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理印尼股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareUsSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Indonesian stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Indonesian stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareIdnMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketIdnCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareIdnMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockIdn []models.BotUserStockIdn err = data.Msql.Table(flags.BotUserStockIdn).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareIdnBasicUnit).Find(&botUserStockIdn) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockIdn { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketIdnCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockIdnTrade []models.BotStockIdnTrade err = data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockIdnTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockIdnTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockIdnTradeFee models.BotStockIdnTrade totalFee, err := data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockIdnTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserIdnPreStockOrder models.BotUserIdnPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserIdnPreStockOrder).Where("user_id = ?", userId).Sum(botUserIdnPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketIdnCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareIdnByPriceSum(userId, setting.AdminShareIdnSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Idn stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Idn stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareInrSubscribe // // @Description: 用户印度股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareInrSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareInrSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareInrSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareInrTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareInrOrderProcessing(setting.ShareInrSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理印度股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareInrSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Indian stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Indian stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareInrMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketInrCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareInrMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockIn []models.BotUserStockIn err = data.Msql.Table(flags.BotUserStockIn).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareInrBasicUnit).Find(&botUserStockIn) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockIn { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketInrCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockInTrade []models.BotStockInTrade err = data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockInTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockInTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockInTradeFee models.BotStockInTrade totalFee, err := data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockInTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserInPreStockOrder models.BotUserInPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserInPreStockOrder).Where("user_id = ?", userId).Sum(botUserInPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketInrCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 inrPriceSum := GetShareInrByPriceSum(userId, setting.AdminShareInrSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(inrPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = inrPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market INR stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to INR stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareSgdSubscribe // // @Description: 用户新加坡股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareSgdSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareSgdSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareSgdSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareSgdTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareSgdOrderProcessing(setting.ShareSgdSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareSgdSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Singapore stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Singapore stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareSgdMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketSgdCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareSgdMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockSgd []models.BotUserStockSgd err = data.Msql.Table(flags.BotUserStockSgd).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareSgdBasicUnit).Find(&botUserStockSgd) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockSgd { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketSgdCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockSgdTrade []models.BotStockSgdTrade err = data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockSgdTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockSgdTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockSgdTradeFee models.BotStockSgdTrade totalFee, err := data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockSgdTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserSgdPreStockOrder models.BotUserSgdPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserSgdPreStockOrder).Where("user_id = ?", userId).Sum(botUserSgdPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketSgdCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareSgdByPriceSum(userId, setting.AdminShareSgdSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Sgd stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Sgd stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareHkdSubscribe // // @Description: 用户港股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareHkdSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SubscribeShareHkd { orderTokenKey := virtual.OrderIdListKey(setting.ShareHkdSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareHkdTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareHkdOrderProcessing(setting.ShareHkdSubscribe, 0, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareHkdSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Hong Kong stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Singapore stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareHkdMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketHkdCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareHkdMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockHkd []models.BotUserStockHkd err = data.Msql.Table(flags.BotUserStockHkd).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareHkdBasicUnit).Find(&botUserStockHkd) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockHkd { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketHkdCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockHkdTrade []models.BotStockHkdTrade err = data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockHkdTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockHkdTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockHkdTradeFee models.BotStockHkdTrade totalFee, err := data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockHkdTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserHkdPreStockOrder models.BotUserHkdPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserHkdPreStockOrder).Where("user_id = ?", userId).Sum(botUserHkdPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketHkdCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareHkdByPriceSum(userId, setting.AdminShareHkdSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Hkd stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Hkd stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareHkdSubscribe // // @Description: 用户英股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareGbxSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareGbxSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareGbxSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareGbxTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareGbxOrderProcessing(setting.ShareGbxSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareGbxSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's UK stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Uk stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareGbxMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketGbxCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareGbxMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockGbx []models.BotUserStockGbx err = data.Msql.Table(flags.BotUserStockGbx).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareGbxBasicUnit).Find(&botUserStockGbx) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockGbx { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketGbxCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockGbxTrade []models.BotStockGbxTrade err = data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockGbxTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockGbxTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } pricePl := sumValue.Mul(orderNum) marketProfitAndLoss = marketProfitAndLoss.Add(pricePl) } // 统计用户市场总手续费 var botStockGbxTradeFee models.BotStockGbxTrade totalFee, err := data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockGbxTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserGbxPreStockOrder models.BotUserGbxPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserGbxPreStockOrder).Where("user_id = ?", userId).Sum(botUserGbxPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketGbxCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareGbxByPriceSum(userId, setting.AdminShareGbxSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Gbx stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Gbx stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareSgdSubscribe // // @Description: 用户德股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareEurSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareEurSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareEurSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareEurTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareEurOrderProcessing(setting.ShareEurSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("ShareEurSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Eur stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Eur stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareEurMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketEurCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareEurMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockEur []models.BotUserStockEur err = data.Msql.Table(flags.BotUserStockEur).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareEurBasicUnit).Find(&botUserStockEur) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockEur { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketEurCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockEurTrade []models.BotStockEurTrade err = data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockEurTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockEurTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockEurTradeFee models.BotStockEurTrade totalFee, err := data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockEurTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserEurPreStockOrder models.BotUserEurPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserEurPreStockOrder).Where("user_id = ?", userId).Sum(botUserEurPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketEurCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareEurByPriceSum(userId, setting.AdminShareEurSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Eur stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Eur stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareFurSubscribe // // @Description: 用户法股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareFurSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareFurSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareFurSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareFurTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareFurOrderProcessing(setting.ShareFurSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("orderSubShareFurSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Fur stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Fur stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareFurMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketFurCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareFurMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockFur []models.BotUserStockFur err = data.Msql.Table(flags.BotUserStockFur).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareFurBasicUnit).Find(&botUserStockFur) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockFur { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketFurCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockFurTrade []models.BotStockFurTrade err = data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockFurTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockFurTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockFurTradeFee models.BotStockFurTrade totalFee, err := data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockFurTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserFurPreStockOrder models.BotUserFurPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserFurPreStockOrder).Where("user_id = ?", userId).Sum(botUserFurPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketFurCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareFurByPriceSum(userId, setting.AdminShareFurSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Fur stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Fur stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareBrlSubscribe // // @Description: 用户巴西股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareBrlSubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareBrlSubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareBrlSubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareBrlTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareBrlOrderProcessing(setting.ShareBrlSubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("orderSubShareBrlSubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Brl stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Brl stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareBrlMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketBrlCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareBrlMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockBrl []models.BotUserStockBrl err = data.Msql.Table(flags.BotUserStockBrl).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareBrlBasicUnit).Find(&botUserStockBrl) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockBrl { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketBrlCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockBrlTrade []models.BotStockBrlTrade err = data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockBrlTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockBrlTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockBrlTradeFee models.BotStockBrlTrade totalFee, err := data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockBrlTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserBrlPreStockOrder models.BotUserBrlPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserBrlPreStockOrder).Where("user_id = ?", userId).Sum(botUserBrlPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketBrlCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareBrlByPriceSum(userId, setting.AdminShareBrlSubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Brl stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Brl stock orders.") return } } time.Sleep(800 * time.Millisecond) } } // orderSubShareFurSubscribe // // @Description: 用户日股订单订阅 // @receiver u // @param psgMsg func (u *Client) orderSubShareJpySubscribe(psgMsg *SymbolMessage) { for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareJpySubscribe { orderTokenKey := virtual.OrderIdListKey(setting.ShareJpySubscribe, userId) orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result() if err != nil { time.Sleep(5 * time.Second) continue } for _, value := range orderList { var msg share.ShareJpyTallyCache if err = json.Unmarshal([]byte(value), &msg); err != nil { time.Sleep(5 * time.Second) continue } orderModel := shareData.ShareJpyOrderProcessing(setting.ShareJpySubscribe, msg) if orderModel != nil { _, ok := u.symbol.Load(psgMsg.Symbol) if ok { // 清理股票股订阅缓存订单中(撤单|平仓)状态的订单 if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close { err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err() if err != nil { time.Sleep(5 * time.Second) applogger.Error("orderSubShareJpySubscribe.HDel:%v", err) continue } } orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User's Jpy stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户(挂单|持仓)订阅 } else { applogger.Info("User cancels Jpy stock order subscription.") return } } } } time.Sleep(600 * time.Millisecond) } } func (u *Client) orderSubShareJpyMarketSubscribe(psgMsg *SymbolMessage) { var n int var MarketJpyCache decimal.Decimal // 缓存记录 var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏 var marketTotalFee decimal.Decimal // 用户市场总手续费 for { userId, err := GetUserIdByToken(u.token) if err != nil { time.Sleep(5 * time.Second) cleanSubscriptionKey(u) break } if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareJpyMarketSubscribe { var marketTotalAssets decimal.Decimal // 用户市场总资产 var marketAvailable decimal.Decimal // 用户市场可用 var marketFreeze decimal.Decimal // 用户市场冻结 var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏 // 统计用户市场可用余额和冻结余额 var botUserStockJpy []models.BotUserStockJp err = data.Msql.Table(flags.BotUserStockJpy).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareJpyBasicUnit).Find(&botUserStockJpy) if err != nil { marketAvailable = decimal.Zero marketFreeze = decimal.Zero } for _, value := range botUserStockJpy { marketAvailable = decimal.RequireFromString(value.UsableNum) marketFreeze = decimal.RequireFromString(value.FrozenNum) } // 判定是否存在订单 if marketAvailable.IsZero() || marketFreeze.IsZero() { if n == 0 { tradeCount, _ := data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).In("status", 1, 3).Count() if tradeCount > 0 { n = 1 } } } if marketAvailable.Cmp(MarketJpyCache) != 0 || n == 1 { // 统计用户市场累计盈亏 var botStockJpyTrade []models.BotStockJpTrade err = data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockJpyTrade) if err != nil { marketProfitAndLoss = decimal.Zero } for _, value := range botStockJpyTrade { sumValue := decimal.Zero openPrice := decimal.RequireFromString(value.DealPrice) closePrice := decimal.RequireFromString(value.ClosingPrice) orderNum := decimal.RequireFromString(value.OrderNumber) switch value.TradeType { case 1: // 买张 sumValue = closePrice.Sub(openPrice) default: // 买跌 sumValue = openPrice.Sub(closePrice) } marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum)) } // 统计用户市场总手续费 var botStockJpyTradeFee models.BotStockJpTrade totalFee, err := data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockJpyTradeFee, "service_cost", "closing_cost") if err != nil || len(totalFee) != 2 { marketTotalFee = decimal.Zero } marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1])) var botUserJpyPreStockOrder models.BotUserJpPreStockOrder totalPreFee, err := data.Msql.Table(flags.BotUserJpyPreStockOrder).Where("user_id = ?", userId).Sum(botUserJpyPreStockOrder, "get_fee") if err != nil { marketTotalFee = decimal.Zero } marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee)) MarketJpyCache = marketAvailable n = 2 } // 用户市场总浮动盈亏 pLPriceSum := GetShareJpyByPriceSum(userId, setting.AdminShareJpySubscribe) // 统计用户市场总资产 marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum) // 统计用户市场总浮动盈亏 marketFloatingPL = pLPriceSum orderModel := &UserMarketStatistics{ UserMarkerSubscribe: psgMsg.Symbol, UserMarketTotalAssets: marketTotalAssets.String(), UserMarketAvailable: marketAvailable.String(), UserMarketFreeze: marketFreeze.String(), UserMarketProfitAndLoss: marketProfitAndLoss.String(), UserMarketTotalFee: marketTotalFee.String(), UserMarketFloatingPL: marketFloatingPL.String(), } _, ok := u.symbol.Load(psgMsg.Symbol) if ok { orderStr, err := json.Marshal(orderModel) if err != nil { applogger.Error("User market Jpy stock subscription cache order error:%v", err) time.Sleep(5 * time.Second) continue } u.msg <- orderStr // 用户市场统计订阅 } else { applogger.Info("User market cancels subscription to Jpy stock orders.") return } } time.Sleep(800 * time.Millisecond) } }