You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

506 lines
18 KiB

2 months ago
package socket
import (
"context"
"encoding/json"
"github.com/shopspring/decimal"
"matchmaking-system/internal/data"
"matchmaking-system/internal/data/socket/virtualData"
"matchmaking-system/internal/data/tradedeal/virtual"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
models "matchmaking-system/internal/pkg/model"
"matchmaking-system/internal/pkg/setting"
"time"
)
// orderSubSpotsSubscribe
//
// @Description: 用户现货订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubSpotsSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SpotsSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.SpotsSubscribe, userId)
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg virtual.SpotsTallyCache
err = json.Unmarshal([]byte(value), &msg)
if err != nil {
applogger.Error("SpotsSubscribe.Unmarshal:%v", err)
time.Sleep(5 * time.Second)
continue
}
orderStr, orderModel := virtualData.SpotOrderProcessing(setting.SpotsSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理现货撤单和完成订单状态
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("SpotsSubscribe.HDel:%v", err)
continue
}
}
u.msg <- orderStr // 用户(挂单|持仓)订阅
} else {
applogger.Info("User cancels stock order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubSpotsMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketSpotsCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SpotsMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserDigital []models.BotUserDigital
err = data.Msql.Table(flags.BotUserDigital).Where("user_id = ?", userId).Where("digital_id = ?", flags.BasicUnit).Find(&botUserDigital)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserDigital {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotDigitalTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketSpotsCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botDigitalTrade []models.BotDigitalTrade
err = data.Msql.Table(flags.BotDigitalTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botDigitalTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botDigitalTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botDigitalTradeTradeFee models.BotDigitalTrade
totalFee, err := data.Msql.Table(flags.BotDigitalTrade).
Where("user_id = ?", userId).
In("status", 1, 3).
Sums(botDigitalTradeTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
MarketSpotsCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetSpotsByPriceSum(userId, setting.SpotsSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Spots stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Spots stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubContractSubscribe
//
// @Description: 用户合约订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubContractSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ContractSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.ContractSubscribe, userId) // 获取订阅Key
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg virtual.ContractTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := virtualData.ContractOrderProcessing(setting.ContractSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理合约订阅缓存订单中(撤单|平仓)状态的订单
if orderModel.Status == flags.Cancel || orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("ContractSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("orderSubContractSubscribe.Marshal:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户合约(挂单|持仓)订阅
} else {
applogger.Info("User cancels contract order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubContractMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketContractCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.ContractMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserContract []models.BotUserContract
err = data.Msql.Table(flags.BotUserContract).Where("user_id = ?", userId).Where("contract_id = ?", flags.BasicUnit).Find(&botUserContract)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserContract {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotContractTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketContractCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botContractTrade []models.BotContractTrade
err = data.Msql.Table(flags.BotContractTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botContractTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botContractTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botContractTradeFee models.BotContractTrade
totalFee, err := data.Msql.Table(flags.BotContractTrade).
Where("user_id = ?", userId).
In("status", 1, 3).
Sums(botContractTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
MarketContractCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetContractByPriceSum(userId, setting.AdminContractSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Contract stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Contract stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}
// orderSubSecondSubscribe
//
// @Description: 用户秒合约订单订阅
// @receiver u
// @param psgMsg
func (u *Client) orderSubSecondSubscribe(psgMsg *SymbolMessage) {
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SecondSubscribe {
orderTokenKey := virtual.OrderIdListKey(setting.SecondSubscribe, userId) // 获取订阅Key
orderList, err := data.Reds.HGetAll(context.Background(), orderTokenKey).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
for _, value := range orderList {
var msg virtual.ContractTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
orderModel := virtualData.ContractOrderProcessing(setting.SecondSubscribe, msg)
if orderModel != nil {
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
// 清理秒合约订阅缓存订单中(平仓)状态的订单
if orderModel.Status == flags.Close {
err = data.Reds.HDel(context.Background(), orderTokenKey, msg.OrderId).Err()
if err != nil {
time.Sleep(5 * time.Second)
applogger.Error("orderSubSecondSubscribe.HDel:%v", err)
continue
}
}
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("orderSubSecondSubscribe.Marshal:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户合约(挂单|持仓)订阅
} else {
applogger.Info("User cancels contract order subscription.")
return
}
}
}
}
time.Sleep(600 * time.Millisecond)
}
}
func (u *Client) orderSubSecondMarketSubscribe(psgMsg *SymbolMessage) {
var n int
var MarketSecondCache decimal.Decimal // 缓存记录
var marketProfitAndLoss decimal.Decimal // 用户市场累计盈亏
var marketTotalFee decimal.Decimal // 用户市场总手续费
for {
userId, err := GetUserIdByToken(u.token)
if err != nil {
time.Sleep(5 * time.Second)
cleanSubscriptionKey(u)
break
}
if userId != flags.AdministratorsId && psgMsg.Symbol == setting.SecondMarketSubscribe {
var marketTotalAssets decimal.Decimal // 用户市场总资产
var marketAvailable decimal.Decimal // 用户市场可用
var marketFreeze decimal.Decimal // 用户市场冻结
var marketFloatingPL decimal.Decimal // 用户市场总浮动盈亏
// 统计用户市场可用余额和冻结余额
var botUserContractSec []models.BotUserContractSec
err = data.Msql.Table(flags.BotUserContractSec).Where("user_id = ?", userId).Where("contract_id = ?", flags.BasicUnit).Find(&botUserContractSec)
if err != nil {
marketAvailable = decimal.Zero
marketFreeze = decimal.Zero
}
for _, value := range botUserContractSec {
marketAvailable = decimal.RequireFromString(value.UsableNum)
marketFreeze = decimal.RequireFromString(value.FrozenNum)
}
// 判定是否存在订单
if marketAvailable.IsZero() || marketFreeze.IsZero() {
if n == 0 {
tradeCount, _ := data.Msql.Table(flags.BotContractSecTrade).Where("user_id = ?", userId).In("status", 1, 3).Count()
if tradeCount > 0 {
n = 1
}
}
}
if marketAvailable.Cmp(MarketSecondCache) != 0 || n == 1 {
// 统计用户市场累计盈亏
var botContractSecTrade []models.BotContractSecTrade
err = data.Msql.Table(flags.BotContractSecTrade).Where("user_id = ?", userId).Where("status = 3").Find(&botContractSecTrade)
if err != nil {
marketProfitAndLoss = decimal.Zero
}
for _, value := range botContractSecTrade {
sumValue := decimal.Zero
openPrice := decimal.RequireFromString(value.DealPrice)
closePrice := decimal.RequireFromString(value.ClosingPrice)
orderNum := decimal.RequireFromString(value.OrderNumber)
switch value.TradeType {
case 1: // 买张
sumValue = closePrice.Sub(openPrice)
default: // 买跌
sumValue = openPrice.Sub(closePrice)
}
marketProfitAndLoss = marketProfitAndLoss.Add(sumValue.Mul(orderNum))
}
// 统计用户市场总手续费
var botContractSecTradeFee models.BotContractSecTrade
totalFee, err := data.Msql.Table(flags.BotContractSecTrade).
Where("user_id = ?", userId).
In("status", 1, 3).
Sums(botContractSecTradeFee, "service_cost", "closing_cost")
if err != nil || len(totalFee) != 2 {
marketTotalFee = decimal.Zero
}
marketTotalFee = decimal.NewFromFloat(totalFee[0]).Add(decimal.NewFromFloat(totalFee[1]))
MarketSecondCache = marketAvailable
n = 2
}
// 用户市场总浮动盈亏
pLPriceSum := GetSecondByPriceSum(userId, setting.AdminSecondSubscribe)
// 统计用户市场总资产
marketTotalAssets = marketAvailable.Add(marketFreeze).Add(pLPriceSum)
// 统计用户市场总浮动盈亏
marketFloatingPL = pLPriceSum
orderModel := &UserMarketStatistics{
UserMarkerSubscribe: psgMsg.Symbol,
UserMarketTotalAssets: marketTotalAssets.String(),
UserMarketAvailable: marketAvailable.String(),
UserMarketFreeze: marketFreeze.String(),
UserMarketProfitAndLoss: marketProfitAndLoss.String(),
UserMarketTotalFee: marketTotalFee.String(),
UserMarketFloatingPL: marketFloatingPL.String(),
}
_, ok := u.symbol.Load(psgMsg.Symbol)
if ok {
orderStr, err := json.Marshal(orderModel)
if err != nil {
applogger.Error("User market Second stock subscription cache order error:%v", err)
time.Sleep(5 * time.Second)
continue
}
u.msg <- orderStr // 用户市场统计订阅
} else {
applogger.Info("User market cancels subscription to Second stock orders.")
return
}
}
time.Sleep(800 * time.Millisecond)
}
}