You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

365 lines
10 KiB

package data
import (
"context"
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"matchmaking-system/internal/data/memory"
"matchmaking-system/internal/data/socket/publicData"
"matchmaking-system/internal/data/tradedeal/virtual"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
"matchmaking-system/internal/pkg/logging/common"
"matchmaking-system/internal/pkg/setting"
"matchmaking-system/internal/pkg/utils"
"strings"
"sync"
"time"
)
/*
现货行情订阅
1、初始化现货订阅信息
2、接收用户现货下单的交易对
3、根据交易对订阅现货行情数据
4、写入内存中用于并发计算
5、记录在热缓存中
*/
// QuitesSpots
// @Description:
type QuitesSpots struct {
ServersID string `json:"serversId"`
Content struct {
Ch string `json:"ch"`
Ts int64 `json:"ts"`
Tick struct {
Open string `json:"open"`
High string `json:"high"`
Low string `json:"low"`
Close string `json:"close"`
Amount string `json:"amount"`
Count int `json:"count"`
Bid string `json:"bid"`
BidSize string `json:"bidSize"`
Ask string `json:"ask"`
AskSize string `json:"askSize"`
LastPrice string `json:"lastPrice"`
LastSize string `json:"lastSize"`
} `json:"Tick"`
Data any `json:"Data"`
} `json:"content"`
Symbol string `json:"symbol"`
}
// SpotsSymbol
// @Description:
type SpotsSymbol struct {
SpotsMap chan []byte // 现货订单交易对 -- 用户现货下单通知订阅
SpotsMapSymbol sync.Map // 现货订阅交易对簿(防止重复订阅情况)
}
// InitCacheSymbolSpots
//
// @Description: 初始化当前数据表中的现货交易对
// @param ctx
// @param uo
func InitCacheSymbolSpots(ctx context.Context, uo *Data) {
digitalList, err := GetBotDigitalList(ctx, uo)
if err != nil {
return
}
for _, value := range digitalList {
vue := strings.ToLower(value)
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, vue).Result()
if err != nil {
applogger.Error("%v InitCacheSymbolSpots.MarketSpots.HExists:%v", common.ErrSpots, err)
return
}
applogger.Info("初始化现货交易对:%v", value)
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, vue, vue).Err(); err != nil {
applogger.Error("%v InitCacheSymbolSpots.HSet:%v", common.ErrSpots, err)
return
}
}
}
}
// InitSubscribeQuotesSpots
//
// @Description: 状态机加载订阅
// @param ctx
// @param uo
// @param spots
func InitSubscribeQuotesSpots(spots *SpotsSymbol) {
for {
listSpots, err := LoadLRangeList(setting.MarketSpots)
if err != nil {
applogger.Error("%v InitSubscribeQuotesSpots.LoadLRangeList:%v", common.ErrSpots, err)
return
}
for _, value := range listSpots {
// Prevent duplicate subscription to CtrIp
_, ok := spots.SpotsMapSymbol.Load(value)
if ok {
continue
}
spots.SpotsMap <- []byte(value)
}
time.Sleep(10 * time.Second)
}
}
// SubscribeQuotesSpots
//
// @Description: 现货订阅
// @param ctx
// @param uo
// @param spots
func SubscribeQuotesSpots(ctx context.Context, uo *Data, spots *SpotsSymbol) {
for {
select {
case symbol, _ := <-spots.SpotsMap:
symbolStr := string(symbol)
// Prevent duplicate subscription to CtrIp
_, ok := spots.SpotsMapSymbol.Load(symbolStr)
if ok {
time.Sleep(5 * time.Second)
continue
}
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, symbolStr).Result()
if err != nil {
applogger.Error("%v SubscribeQuotesSpots.MarketSpots.HExists:%v", common.ErrSpots, err)
time.Sleep(5 * time.Second)
continue
}
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, symbolStr, symbolStr).Err(); err != nil {
applogger.Error("%v SubscribeQuotesSpots.HSet:%v", common.ErrSpots, err)
time.Sleep(5 * time.Second)
continue
}
}
go func() {
topIc := fmt.Sprintf("market.%vusdt.ticker", symbolStr)
pubSub := uo.redisDB.Subscribe(ctx, topIc)
defer pubSub.Close()
if _, err = pubSub.Receive(ctx); err != nil {
applogger.Error("%v SubscribeQuotesSpots.Receive:%v", common.ErrSpots, err)
return
}
ch := pubSub.Channel()
for msg := range ch {
var subMsg QuitesSpots
if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
applogger.Error("%v SubscribeQuotesSpots.Unmarshal:%v", common.ErrSpots, err)
close(spots.SpotsMap)
return
}
// 交易类型:0最新价,1买入,2卖出 xh-symbol-0
price := publicData.SymbolCache(flags.Xh, symbolStr, flags.TradeTypePrice)
if err = memory.SpotsCache.Set(price, []byte(subMsg.Content.Tick.Close)); err != nil {
applogger.Error("%v SubscribeQuotesSpots.Set.price:%v", common.ErrSpots, err)
return
}
if !flags.CheckSetting {
applogger.Info("当前写入最新价格:%v,%v", price, subMsg.Content.Tick.Close)
}
}
}()
// Write in the map to determine whether to repeat subscription
spots.SpotsMapSymbol.Store(symbolStr, symbolStr)
}
}
}
// SpotsTransaction
//
// @Description:
// @param ctx
func SpotsTransaction() {
for {
SpotsTransactionCalculation()
time.Sleep(400 * time.Millisecond)
}
}
// SpotsTransactionCalculation
//
// @Description:
// @param ctx
func SpotsTransactionCalculation() {
var wg sync.WaitGroup
entrustList, err := Reds.HGetAll(context.Background(), setting.MarketSpotsEntrust).Result()
if err != nil {
applogger.Warn("%v SpotsTransactionCalculation.LRange:%v", common.ErrSpots, err)
return
}
for _, value := range entrustList {
var msg = make(chan virtual.SpotsTallyCache, 1)
var entrust virtual.SpotsTallyCache
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v SpotsTransactionCalculation.Unmarshal:%v", common.ErrSpots, err)
return
}
var newByte []byte
newByte, err = memory.SpotsCache.Get(publicData.SymbolCache(flags.Xh, entrust.Symbol, flags.TradeTypePrice))
if err != nil {
applogger.Warn("%v SpotsTransactionCalculation.Get:%v", common.ErrSpots, err)
continue
}
priceNew := decimal.RequireFromString(string(newByte))
wg.Add(1)
go func(value string) {
resultMsg := SpotsConcurrentComputing(&entrust, priceNew, &wg)
msg <- resultMsg
}(value)
// 处理并发结果
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust:
if !flags.CheckSetting {
applogger.Info("现货从信道接收挂单信息:%v", resultMsg)
}
case flags.Close:
if !flags.CheckSetting {
applogger.Info("现货从信道接收平仓信息:%v", resultMsg)
}
SpotLiquidation(&resultMsg)
}
}
}
wg.Wait()
applogger.Info("现货并发计算执行完毕......")
}
// SpotsConcurrentComputing
//
// @Description: 现货下单判定
// @param order
// @param priceNew
// @param wg
// @return tradedeal.SpotsTallyCache
func SpotsConcurrentComputing(order *virtual.SpotsTallyCache, priceNew decimal.Decimal, wg *sync.WaitGroup) virtual.SpotsTallyCache {
limitPrice := decimal.RequireFromString(order.Order.LimitPrice)
status := order.Status
var deleteCache bool
var dealPrice string
switch order.Order.TradeType {
case flags.TradeTypeBuy: // buy
if !flags.CheckSetting {
applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status)
}
// 限价买入逻辑判定 -> 买入金额 >= 最优对手价格(市价)
if limitPrice.Cmp(priceNew) >= 0 {
deleteCache = true
difference := priceNew.Mul(utils.Difference()) // 设置价差
dealPrice = priceNew.Add(difference).String() // 平仓价格
}
case flags.TradeTypeSell: // sell
if !flags.CheckSetting {
applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status)
}
// 限价卖入逻辑判定 -> 卖出金额 <= 最优对手价格(市价)
if limitPrice.Cmp(priceNew) <= 0 {
deleteCache = true
difference := priceNew.Mul(utils.Difference()) // 设置价差
dealPrice = priceNew.Sub(difference).String() // 平仓价格
}
}
// 判定订单状态
if deleteCache {
order.Status = flags.Close
}
// 清理现货挂单缓存列表
if deleteCache {
if err := Reds.HDel(context.Background(), setting.MarketSpotsEntrust, order.OrderId).Err(); err != nil {
applogger.Error("%v SpotsConcurrentComputing.MarketSpotsEntrust.HDel:%v", common.ErrSpots, err)
order.Status = status
dealPrice = limitPrice.String()
}
}
wg.Done()
return virtual.SpotsTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
ClosingPrice: dealPrice, // 平仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
}
}
// SpotLiquidation
//
// @Description: 现货平仓操作
// @param order
func SpotLiquidation(order *virtual.SpotsTallyCache) {
// 现货挂单平仓
orderId, err := SpotOrdersClosingDB(context.Background(), Msql, order.UserId, order.Order, order.OrderId, order.ClosingPrice)
if err != nil || len(orderId) == 0 {
applogger.Error("%v SpotLiquidation.SpotOrdersClosingDB:%v", common.ErrSpots, err)
return
}
// 更新用户订单订阅缓存列表
if err = UpdateSpotsSubscribeStatusHashByOrderId(order.UserId, order.OrderId, setting.SpotsSubscribe, flags.Close); err != nil {
applogger.Error("%v SpotLiquidation.UpdateSpotsSubscribeStatusByOrderId:%v", common.ErrSpots, err)
return
}
}
// LoadLRangeList
//
// @Description: 加载本地缓存队列数据
// @param ctx
// @param uo
// @param key
// @return map[string]string
// @return error
func LoadLRangeList(key string) (map[string]string, error) {
elements, err := Reds.HGetAll(context.Background(), key).Result()
if err != nil {
applogger.Error("%v LoadLRangeList.HGetAll:%v", common.ErrSpots, err)
return map[string]string{}, err
}
return elements, err
}
// SymbolCache
//
// @Description: 获取现货交易对缓存Key
// @param mark
// @param symbol
// @param types
// @return string
func SymbolCache(mark, symbol string, types int) string {
return fmt.Sprintf("%v-%v-%v", mark, symbol, types)
}