package data import ( "context" "encoding/json" "fmt" "github.com/shopspring/decimal" "matchmaking-system/internal/data/memory" "matchmaking-system/internal/data/socket/publicData" "matchmaking-system/internal/data/tradedeal/virtual" "matchmaking-system/internal/pkg/flags" "matchmaking-system/internal/pkg/logging/applogger" "matchmaking-system/internal/pkg/logging/common" "matchmaking-system/internal/pkg/setting" "matchmaking-system/internal/pkg/utils" "strings" "sync" "time" ) /* 现货行情订阅 1、初始化现货订阅信息 2、接收用户现货下单的交易对 3、根据交易对订阅现货行情数据 4、写入内存中用于并发计算 5、记录在热缓存中 */ // QuitesSpots // @Description: type QuitesSpots struct { ServersID string `json:"serversId"` Content struct { Ch string `json:"ch"` Ts int64 `json:"ts"` Tick struct { Open string `json:"open"` High string `json:"high"` Low string `json:"low"` Close string `json:"close"` Amount string `json:"amount"` Count int `json:"count"` Bid string `json:"bid"` BidSize string `json:"bidSize"` Ask string `json:"ask"` AskSize string `json:"askSize"` LastPrice string `json:"lastPrice"` LastSize string `json:"lastSize"` } `json:"Tick"` Data any `json:"Data"` } `json:"content"` Symbol string `json:"symbol"` } // SpotsSymbol // @Description: type SpotsSymbol struct { SpotsMap chan []byte // 现货订单交易对 -- 用户现货下单通知订阅 SpotsMapSymbol sync.Map // 现货订阅交易对簿(防止重复订阅情况) } // InitCacheSymbolSpots // // @Description: 初始化当前数据表中的现货交易对 // @param ctx // @param uo func InitCacheSymbolSpots(ctx context.Context, uo *Data) { digitalList, err := GetBotDigitalList(ctx, uo) if err != nil { return } for _, value := range digitalList { vue := strings.ToLower(value) // Write to the redis list hot cache for initializing subscriptions when the program is pulled up checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, vue).Result() if err != nil { applogger.Error("%v InitCacheSymbolSpots.MarketSpots.HExists:%v", common.ErrSpots, err) return } applogger.Info("初始化现货交易对:%v", value) if !checkBool { if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, vue, vue).Err(); err != nil { applogger.Error("%v InitCacheSymbolSpots.HSet:%v", common.ErrSpots, err) return } } } } // InitSubscribeQuotesSpots // // @Description: 状态机加载订阅 // @param ctx // @param uo // @param spots func InitSubscribeQuotesSpots(spots *SpotsSymbol) { for { listSpots, err := LoadLRangeList(setting.MarketSpots) if err != nil { applogger.Error("%v InitSubscribeQuotesSpots.LoadLRangeList:%v", common.ErrSpots, err) return } for _, value := range listSpots { // Prevent duplicate subscription to CtrIp _, ok := spots.SpotsMapSymbol.Load(value) if ok { continue } spots.SpotsMap <- []byte(value) } time.Sleep(10 * time.Second) } } // SubscribeQuotesSpots // // @Description: 现货订阅 // @param ctx // @param uo // @param spots func SubscribeQuotesSpots(ctx context.Context, uo *Data, spots *SpotsSymbol) { for { select { case symbol, _ := <-spots.SpotsMap: symbolStr := string(symbol) // Prevent duplicate subscription to CtrIp _, ok := spots.SpotsMapSymbol.Load(symbolStr) if ok { time.Sleep(5 * time.Second) continue } // Write to the redis list hot cache for initializing subscriptions when the program is pulled up checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, symbolStr).Result() if err != nil { applogger.Error("%v SubscribeQuotesSpots.MarketSpots.HExists:%v", common.ErrSpots, err) time.Sleep(5 * time.Second) continue } if !checkBool { if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, symbolStr, symbolStr).Err(); err != nil { applogger.Error("%v SubscribeQuotesSpots.HSet:%v", common.ErrSpots, err) time.Sleep(5 * time.Second) continue } } go func() { topIc := fmt.Sprintf("market.%vusdt.ticker", symbolStr) pubSub := uo.redisDB.Subscribe(ctx, topIc) defer pubSub.Close() if _, err = pubSub.Receive(ctx); err != nil { applogger.Error("%v SubscribeQuotesSpots.Receive:%v", common.ErrSpots, err) return } ch := pubSub.Channel() for msg := range ch { var subMsg QuitesSpots if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("%v SubscribeQuotesSpots.Unmarshal:%v", common.ErrSpots, err) close(spots.SpotsMap) return } // 交易类型:0最新价,1买入,2卖出 xh-symbol-0 price := publicData.SymbolCache(flags.Xh, symbolStr, flags.TradeTypePrice) if err = memory.SpotsCache.Set(price, []byte(subMsg.Content.Tick.Close)); err != nil { applogger.Error("%v SubscribeQuotesSpots.Set.price:%v", common.ErrSpots, err) return } if !flags.CheckSetting { applogger.Info("当前写入最新价格:%v,%v", price, subMsg.Content.Tick.Close) } } }() // Write in the map to determine whether to repeat subscription spots.SpotsMapSymbol.Store(symbolStr, symbolStr) } } } // SpotsTransaction // // @Description: // @param ctx func SpotsTransaction() { for { SpotsTransactionCalculation() time.Sleep(400 * time.Millisecond) } } // SpotsTransactionCalculation // // @Description: // @param ctx func SpotsTransactionCalculation() { var wg sync.WaitGroup entrustList, err := Reds.HGetAll(context.Background(), setting.MarketSpotsEntrust).Result() if err != nil { applogger.Warn("%v SpotsTransactionCalculation.LRange:%v", common.ErrSpots, err) return } for _, value := range entrustList { var msg = make(chan virtual.SpotsTallyCache, 1) var entrust virtual.SpotsTallyCache if err = json.Unmarshal([]byte(value), &entrust); err != nil { applogger.Error("%v SpotsTransactionCalculation.Unmarshal:%v", common.ErrSpots, err) return } var newByte []byte newByte, err = memory.SpotsCache.Get(publicData.SymbolCache(flags.Xh, entrust.Symbol, flags.TradeTypePrice)) if err != nil { applogger.Warn("%v SpotsTransactionCalculation.Get:%v", common.ErrSpots, err) continue } priceNew := decimal.RequireFromString(string(newByte)) wg.Add(1) go func(value string) { resultMsg := SpotsConcurrentComputing(&entrust, priceNew, &wg) msg <- resultMsg }(value) // 处理并发结果 select { case resultMsg, _ := <-msg: switch resultMsg.Status { case flags.Entrust: if !flags.CheckSetting { applogger.Info("现货从信道接收挂单信息:%v", resultMsg) } case flags.Close: if !flags.CheckSetting { applogger.Info("现货从信道接收平仓信息:%v", resultMsg) } SpotLiquidation(&resultMsg) } } } wg.Wait() applogger.Info("现货并发计算执行完毕......") } // SpotsConcurrentComputing // // @Description: 现货下单判定 // @param order // @param priceNew // @param wg // @return tradedeal.SpotsTallyCache func SpotsConcurrentComputing(order *virtual.SpotsTallyCache, priceNew decimal.Decimal, wg *sync.WaitGroup) virtual.SpotsTallyCache { limitPrice := decimal.RequireFromString(order.Order.LimitPrice) status := order.Status var deleteCache bool var dealPrice string switch order.Order.TradeType { case flags.TradeTypeBuy: // buy if !flags.CheckSetting { applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status) } // 限价买入逻辑判定 -> 买入金额 >= 最优对手价格(市价) if limitPrice.Cmp(priceNew) >= 0 { deleteCache = true difference := priceNew.Mul(utils.Difference()) // 设置价差 dealPrice = priceNew.Add(difference).String() // 平仓价格 } case flags.TradeTypeSell: // sell if !flags.CheckSetting { applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status) } // 限价卖入逻辑判定 -> 卖出金额 <= 最优对手价格(市价) if limitPrice.Cmp(priceNew) <= 0 { deleteCache = true difference := priceNew.Mul(utils.Difference()) // 设置价差 dealPrice = priceNew.Sub(difference).String() // 平仓价格 } } // 判定订单状态 if deleteCache { order.Status = flags.Close } // 清理现货挂单缓存列表 if deleteCache { if err := Reds.HDel(context.Background(), setting.MarketSpotsEntrust, order.OrderId).Err(); err != nil { applogger.Error("%v SpotsConcurrentComputing.MarketSpotsEntrust.HDel:%v", common.ErrSpots, err) order.Status = status dealPrice = limitPrice.String() } } wg.Done() return virtual.SpotsTallyCache{ UserId: order.UserId, // 用户Id OrderId: order.OrderId, // 订单Id Symbol: order.Symbol, // 下单交易对 ClosingPrice: dealPrice, // 平仓价格 Status: order.Status, // 订单状态 Order: order.Order, // 下单信息 } } // SpotLiquidation // // @Description: 现货平仓操作 // @param order func SpotLiquidation(order *virtual.SpotsTallyCache) { // 现货挂单平仓 orderId, err := SpotOrdersClosingDB(context.Background(), Msql, order.UserId, order.Order, order.OrderId, order.ClosingPrice) if err != nil || len(orderId) == 0 { applogger.Error("%v SpotLiquidation.SpotOrdersClosingDB:%v", common.ErrSpots, err) return } // 更新用户订单订阅缓存列表 if err = UpdateSpotsSubscribeStatusHashByOrderId(order.UserId, order.OrderId, setting.SpotsSubscribe, flags.Close); err != nil { applogger.Error("%v SpotLiquidation.UpdateSpotsSubscribeStatusByOrderId:%v", common.ErrSpots, err) return } } // LoadLRangeList // // @Description: 加载本地缓存队列数据 // @param ctx // @param uo // @param key // @return map[string]string // @return error func LoadLRangeList(key string) (map[string]string, error) { elements, err := Reds.HGetAll(context.Background(), key).Result() if err != nil { applogger.Error("%v LoadLRangeList.HGetAll:%v", common.ErrSpots, err) return map[string]string{}, err } return elements, err } // SymbolCache // // @Description: 获取现货交易对缓存Key // @param mark // @param symbol // @param types // @return string func SymbolCache(mark, symbol string, types int) string { return fmt.Sprintf("%v-%v-%v", mark, symbol, types) }