You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

2037 lines
74 KiB

2 months ago
package socket
import (
"context"
"encoding/json"
"github.com/shopspring/decimal"
"matchmaking-system/internal/data"
"matchmaking-system/internal/data/socket/shareData"
"matchmaking-system/internal/data/tradedeal/share"
"matchmaking-system/internal/data/tradedeal/virtual"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
models "matchmaking-system/internal/pkg/model"
"matchmaking-system/internal/pkg/setting"
"time"
)
// orderSubShareUsSubscribe
//
// @Description: 用户美股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareUsSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareUsSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareUsSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareUsTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareUsOrderProcessing(setting.ShareUsSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理美股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareUsSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's US stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels subscription to US stock orders.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareUsMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketUsCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
/** 统计用户市场资产数据
1用户市场总资产 冻结 + 可用 + 总浮动盈亏
2用户市场资产: 可用冻结
3用户市场累计盈亏(订单表-->平仓状态)
1买涨订单量 * (平仓价 - 开仓价)
2买跌订单量 * (开仓价 - 平仓价)
4用户市场总手续费(统计订单表-->持仓和平仓状态):
1交易手续费
2申购手续费
*/
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareUsMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStock []models.BotUserStock
err = data.Msql.Table(flags.BotUserStock).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareUsBasicUnit).Find(&botUserStock)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStock {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketUsCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockTrade []models.BotStockTrade
err = data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockTradeFee models.BotStockTrade
totalFee, err := data.Msql.Table(flags.BotStockTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserUsPreStockOrder models.BotUserUsPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserUsPreStockOrder).Where("user_id = ?", userId).Sum(botUserUsPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketUsCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
usPriceSum := GetShareUsByPriceSum(userId, setting.AdminShareUsSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(usPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = usPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market US stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to US stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareMysSubscribe
//
// @Description: 用户马股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareMysSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareMysSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareMysSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareMysTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareMysOrderProcessing(setting.ShareMysSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理马股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareMysSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels subscription to horse stock orders.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareMysMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketMysCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareMysMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockMys []models.BotUserStockMys
err = data.Msql.Table(flags.BotUserStockMys).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareMysBasicUnit).Find(&botUserStockMys)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockMys {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketMysCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockMysTrade []models.BotStockMysTrade
err = data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockMysTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockMysTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockMysTradeFee models.BotStockMysTrade
totalFee, err := data.Msql.Table(flags.BotStockMysTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockMysTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserMysPreStockOrder models.BotUserMysPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserMysPreStockOrder).Where("user_id = ?", userId).Sum(botUserMysPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketMysCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareMysByPriceSum(userId, setting.AdminShareMysSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Mys stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Mys stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareThaSubscribe
//
// @Description: 用户泰股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareThaSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareThaSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareThaSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareThaTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareThaOrderProcessing(setting.ShareThaSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理泰股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareUsSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User Tai Stock Subscription Cache Order Error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels subscription to Thai stock orders.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareThaMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketThaCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareThaMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockTha []models.BotUserStockTha
err = data.Msql.Table(flags.BotUserStockTha).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareThaBasicUnit).Find(&botUserStockTha)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockTha {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketThaCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockThaTrade []models.BotStockThaTrade
err = data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockThaTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockThaTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockThaTradeFee models.BotStockThaTrade
totalFee, err := data.Msql.Table(flags.BotStockThaTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockThaTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserThaPreStockOrder models.BotUserThaPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserThaPreStockOrder).Where("user_id = ?", userId).Sum(botUserThaPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketThaCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareThaByPriceSum(userId, setting.AdminShareThaSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Tha stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Tha stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareIdnSubscribe
//
// @Description: 用户印尼股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareIdnSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareIdnSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareIdnSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareIdnTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareIdnOrderProcessing(setting.ShareIdnSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理印尼股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareUsSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Indonesian stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Indonesian stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareIdnMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketIdnCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareIdnMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockIdn []models.BotUserStockIdn
err = data.Msql.Table(flags.BotUserStockIdn).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareIdnBasicUnit).Find(&botUserStockIdn)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockIdn {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketIdnCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockIdnTrade []models.BotStockIdnTrade
err = data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockIdnTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockIdnTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockIdnTradeFee models.BotStockIdnTrade
totalFee, err := data.Msql.Table(flags.BotStockIdnTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockIdnTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserIdnPreStockOrder models.BotUserIdnPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserIdnPreStockOrder).Where("user_id = ?", userId).Sum(botUserIdnPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketIdnCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareIdnByPriceSum(userId, setting.AdminShareIdnSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Idn stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Idn stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareInrSubscribe
//
// @Description: 用户印度股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareInrSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareInrSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareInrSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareInrTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareInrOrderProcessing(setting.ShareInrSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理印度股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareInrSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Indian stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Indian stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareInrMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketInrCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareInrMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockIn []models.BotUserStockIn
err = data.Msql.Table(flags.BotUserStockIn).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareInrBasicUnit).Find(&botUserStockIn)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockIn {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketInrCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockInTrade []models.BotStockInTrade
err = data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockInTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockInTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockInTradeFee models.BotStockInTrade
totalFee, err := data.Msql.Table(flags.BotStockInTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockInTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserInPreStockOrder models.BotUserInPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserInPreStockOrder).Where("user_id = ?", userId).Sum(botUserInPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketInrCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
inrPriceSum := GetShareInrByPriceSum(userId, setting.AdminShareInrSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(inrPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = inrPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market INR stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to INR stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareSgdSubscribe
//
// @Description: 用户新加坡股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareSgdSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareSgdSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareSgdSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareSgdTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareSgdOrderProcessing(setting.ShareSgdSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareSgdSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Singapore stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Singapore stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareSgdMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketSgdCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareSgdMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockSgd []models.BotUserStockSgd
err = data.Msql.Table(flags.BotUserStockSgd).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareSgdBasicUnit).Find(&botUserStockSgd)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockSgd {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketSgdCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockSgdTrade []models.BotStockSgdTrade
err = data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockSgdTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockSgdTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockSgdTradeFee models.BotStockSgdTrade
totalFee, err := data.Msql.Table(flags.BotStockSgdTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockSgdTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserSgdPreStockOrder models.BotUserSgdPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserSgdPreStockOrder).Where("user_id = ?", userId).Sum(botUserSgdPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketSgdCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareSgdByPriceSum(userId, setting.AdminShareSgdSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Sgd stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Sgd stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareHkdSubscribe
//
// @Description: 用户港股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareHkdSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SubscribeShareHkd {
orderTokenKey := virtual.OrderIdListKey(setting.ShareHkdSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareHkdTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareHkdOrderProcessing(setting.ShareHkdSubscribe, 0, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareHkdSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Hong Kong stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Singapore stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareHkdMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketHkdCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareHkdMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockHkd []models.BotUserStockHkd
err = data.Msql.Table(flags.BotUserStockHkd).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareHkdBasicUnit).Find(&botUserStockHkd)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockHkd {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketHkdCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockHkdTrade []models.BotStockHkdTrade
err = data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockHkdTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockHkdTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockHkdTradeFee models.BotStockHkdTrade
totalFee, err := data.Msql.Table(flags.BotStockHkdTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockHkdTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserHkdPreStockOrder models.BotUserHkdPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserHkdPreStockOrder).Where("user_id = ?", userId).Sum(botUserHkdPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketHkdCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareHkdByPriceSum(userId, setting.AdminShareHkdSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Hkd stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Hkd stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareHkdSubscribe
//
// @Description: 用户英股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareGbxSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareGbxSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareGbxSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareGbxTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareGbxOrderProcessing(setting.ShareGbxSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareGbxSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's UK stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Uk stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareGbxMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketGbxCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareGbxMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockGbx []models.BotUserStockGbx
err = data.Msql.Table(flags.BotUserStockGbx).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareGbxBasicUnit).Find(&botUserStockGbx)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockGbx {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketGbxCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockGbxTrade []models.BotStockGbxTrade
err = data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockGbxTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockGbxTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
pricePl := sumValue.Mul(orderNum)
marketProfitAndLoss = marketProfitAndLoss.Add(pricePl)
}
// 统计用户市场总手续费
var botStockGbxTradeFee models.BotStockGbxTrade
totalFee, err := data.Msql.Table(flags.BotStockGbxTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockGbxTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserGbxPreStockOrder models.BotUserGbxPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserGbxPreStockOrder).Where("user_id = ?", userId).Sum(botUserGbxPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketGbxCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareGbxByPriceSum(userId, setting.AdminShareGbxSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Gbx stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Gbx stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareSgdSubscribe
//
// @Description: 用户德股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareEurSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareEurSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareEurSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareEurTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareEurOrderProcessing(setting.ShareEurSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ShareEurSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Eur stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Eur stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareEurMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketEurCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareEurMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockEur []models.BotUserStockEur
err = data.Msql.Table(flags.BotUserStockEur).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareEurBasicUnit).Find(&botUserStockEur)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockEur {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketEurCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockEurTrade []models.BotStockEurTrade
err = data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockEurTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockEurTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockEurTradeFee models.BotStockEurTrade
totalFee, err := data.Msql.Table(flags.BotStockEurTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockEurTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserEurPreStockOrder models.BotUserEurPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserEurPreStockOrder).Where("user_id = ?", userId).Sum(botUserEurPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketEurCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareEurByPriceSum(userId, setting.AdminShareEurSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Eur stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Eur stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareFurSubscribe
//
// @Description: 用户法股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareFurSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareFurSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareFurSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareFurTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareFurOrderProcessing(setting.ShareFurSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("orderSubShareFurSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Fur stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Fur stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareFurMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketFurCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareFurMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockFur []models.BotUserStockFur
err = data.Msql.Table(flags.BotUserStockFur).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareFurBasicUnit).Find(&botUserStockFur)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockFur {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketFurCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockFurTrade []models.BotStockFurTrade
err = data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockFurTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockFurTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockFurTradeFee models.BotStockFurTrade
totalFee, err := data.Msql.Table(flags.BotStockFurTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockFurTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserFurPreStockOrder models.BotUserFurPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserFurPreStockOrder).Where("user_id = ?", userId).Sum(botUserFurPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketFurCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareFurByPriceSum(userId, setting.AdminShareFurSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Fur stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Fur stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareBrlSubscribe
//
// @Description: 用户巴西股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareBrlSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareBrlSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareBrlSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareBrlTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareBrlOrderProcessing(setting.ShareBrlSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("orderSubShareBrlSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Brl stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Brl stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareBrlMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketBrlCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareBrlMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockBrl []models.BotUserStockBrl
err = data.Msql.Table(flags.BotUserStockBrl).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareBrlBasicUnit).Find(&botUserStockBrl)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockBrl {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketBrlCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockBrlTrade []models.BotStockBrlTrade
err = data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockBrlTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockBrlTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockBrlTradeFee models.BotStockBrlTrade
totalFee, err := data.Msql.Table(flags.BotStockBrlTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockBrlTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserBrlPreStockOrder models.BotUserBrlPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserBrlPreStockOrder).Where("user_id = ?", userId).Sum(botUserBrlPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketBrlCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareBrlByPriceSum(userId, setting.AdminShareBrlSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Brl stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Brl stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubShareFurSubscribe
//
// @Description: 用户日股订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubShareJpySubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareJpySubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ShareJpySubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg share.ShareJpyTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := shareData.ShareJpyOrderProcessing(setting.ShareJpySubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理股票股订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("orderSubShareJpySubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User's Jpy stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels Jpy stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubShareJpyMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketJpyCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ShareJpyMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserStockJpy []models.BotUserStockJp
err = data.Msql.Table(flags.BotUserStockJpy).Where("user_id = ?", userId).Where("stock_id = ?", flags.ShareJpyBasicUnit).Find(&botUserStockJpy)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserStockJpy {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketJpyCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botStockJpyTrade []models.BotStockJpTrade
err = data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botStockJpyTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botStockJpyTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botStockJpyTradeFee models.BotStockJpTrade
totalFee, err := data.Msql.Table(flags.BotStockJpyTrade).Where("user_id = ?", userId).In("status", 1, 3).Sums(botStockJpyTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
var botUserJpyPreStockOrder models.BotUserJpPreStockOrder
totalPreFee, err := data.Msql.Table(flags.BotUserJpyPreStockOrder).Where("user_id = ?", userId).Sum(botUserJpyPreStockOrder, "get_fee")
if err != nil {
marketTotalFee = decimal.Zero
}
marketTotalFee = marketTotalFee.Add(decimal.NewFromFloat(totalPreFee))
MarketJpyCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetShareJpyByPriceSum(userId, setting.AdminShareJpySubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Jpy stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Jpy stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}