You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
365 lines
10 KiB
365 lines
10 KiB
package data
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/shopspring/decimal"
|
|
"matchmaking-system/internal/data/memory"
|
|
"matchmaking-system/internal/data/socket/publicData"
|
|
"matchmaking-system/internal/data/tradedeal/virtual"
|
|
"matchmaking-system/internal/pkg/flags"
|
|
"matchmaking-system/internal/pkg/logging/applogger"
|
|
"matchmaking-system/internal/pkg/logging/common"
|
|
"matchmaking-system/internal/pkg/setting"
|
|
"matchmaking-system/internal/pkg/utils"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
/*
|
|
现货行情订阅
|
|
1、初始化现货订阅信息
|
|
2、接收用户现货下单的交易对
|
|
3、根据交易对订阅现货行情数据
|
|
4、写入内存中用于并发计算
|
|
5、记录在热缓存中
|
|
*/
|
|
|
|
// QuitesSpots
|
|
// @Description:
|
|
type QuitesSpots struct {
|
|
ServersID string `json:"serversId"`
|
|
Content struct {
|
|
Ch string `json:"ch"`
|
|
Ts int64 `json:"ts"`
|
|
Tick struct {
|
|
Open string `json:"open"`
|
|
High string `json:"high"`
|
|
Low string `json:"low"`
|
|
Close string `json:"close"`
|
|
Amount string `json:"amount"`
|
|
Count int `json:"count"`
|
|
Bid string `json:"bid"`
|
|
BidSize string `json:"bidSize"`
|
|
Ask string `json:"ask"`
|
|
AskSize string `json:"askSize"`
|
|
LastPrice string `json:"lastPrice"`
|
|
LastSize string `json:"lastSize"`
|
|
} `json:"Tick"`
|
|
Data any `json:"Data"`
|
|
} `json:"content"`
|
|
Symbol string `json:"symbol"`
|
|
}
|
|
|
|
// SpotsSymbol
|
|
// @Description:
|
|
type SpotsSymbol struct {
|
|
SpotsMap chan []byte // 现货订单交易对 -- 用户现货下单通知订阅
|
|
SpotsMapSymbol sync.Map // 现货订阅交易对簿(防止重复订阅情况)
|
|
}
|
|
|
|
// InitCacheSymbolSpots
|
|
//
|
|
// @Description: 初始化当前数据表中的现货交易对
|
|
// @param ctx
|
|
// @param uo
|
|
func InitCacheSymbolSpots(ctx context.Context, uo *Data) {
|
|
digitalList, err := GetBotDigitalList(ctx, uo)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
for _, value := range digitalList {
|
|
vue := strings.ToLower(value)
|
|
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
|
|
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, vue).Result()
|
|
if err != nil {
|
|
applogger.Error("%v InitCacheSymbolSpots.MarketSpots.HExists:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
applogger.Info("初始化现货交易对:%v", value)
|
|
|
|
if !checkBool {
|
|
if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, vue, vue).Err(); err != nil {
|
|
applogger.Error("%v InitCacheSymbolSpots.HSet:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// InitSubscribeQuotesSpots
|
|
//
|
|
// @Description: 状态机加载订阅
|
|
// @param ctx
|
|
// @param uo
|
|
// @param spots
|
|
func InitSubscribeQuotesSpots(spots *SpotsSymbol) {
|
|
for {
|
|
listSpots, err := LoadLRangeList(setting.MarketSpots)
|
|
if err != nil {
|
|
applogger.Error("%v InitSubscribeQuotesSpots.LoadLRangeList:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
|
|
for _, value := range listSpots {
|
|
// Prevent duplicate subscription to CtrIp
|
|
_, ok := spots.SpotsMapSymbol.Load(value)
|
|
if ok {
|
|
continue
|
|
}
|
|
|
|
spots.SpotsMap <- []byte(value)
|
|
}
|
|
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}
|
|
|
|
// SubscribeQuotesSpots
|
|
//
|
|
// @Description: 现货订阅
|
|
// @param ctx
|
|
// @param uo
|
|
// @param spots
|
|
func SubscribeQuotesSpots(ctx context.Context, uo *Data, spots *SpotsSymbol) {
|
|
for {
|
|
select {
|
|
case symbol, _ := <-spots.SpotsMap:
|
|
symbolStr := string(symbol)
|
|
|
|
// Prevent duplicate subscription to CtrIp
|
|
_, ok := spots.SpotsMapSymbol.Load(symbolStr)
|
|
if ok {
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
|
|
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
|
|
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketSpots, symbolStr).Result()
|
|
if err != nil {
|
|
applogger.Error("%v SubscribeQuotesSpots.MarketSpots.HExists:%v", common.ErrSpots, err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
if !checkBool {
|
|
if err = uo.redisDB.HSet(context.Background(), setting.MarketSpots, symbolStr, symbolStr).Err(); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSpots.HSet:%v", common.ErrSpots, err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
}
|
|
|
|
go func() {
|
|
topIc := fmt.Sprintf("market.%vusdt.ticker", symbolStr)
|
|
pubSub := uo.redisDB.Subscribe(ctx, topIc)
|
|
defer pubSub.Close()
|
|
if _, err = pubSub.Receive(ctx); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSpots.Receive:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
ch := pubSub.Channel()
|
|
for msg := range ch {
|
|
var subMsg QuitesSpots
|
|
if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSpots.Unmarshal:%v", common.ErrSpots, err)
|
|
close(spots.SpotsMap)
|
|
return
|
|
}
|
|
// 交易类型:0最新价,1买入,2卖出 xh-symbol-0
|
|
price := publicData.SymbolCache(flags.Xh, symbolStr, flags.TradeTypePrice)
|
|
if err = memory.SpotsCache.Set(price, []byte(subMsg.Content.Tick.Close)); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSpots.Set.price:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
if !flags.CheckSetting {
|
|
applogger.Info("当前写入最新价格:%v,%v", price, subMsg.Content.Tick.Close)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Write in the map to determine whether to repeat subscription
|
|
spots.SpotsMapSymbol.Store(symbolStr, symbolStr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SpotsTransaction
|
|
//
|
|
// @Description:
|
|
// @param ctx
|
|
func SpotsTransaction() {
|
|
for {
|
|
SpotsTransactionCalculation()
|
|
time.Sleep(400 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// SpotsTransactionCalculation
|
|
//
|
|
// @Description:
|
|
// @param ctx
|
|
func SpotsTransactionCalculation() {
|
|
var wg sync.WaitGroup
|
|
|
|
entrustList, err := Reds.HGetAll(context.Background(), setting.MarketSpotsEntrust).Result()
|
|
if err != nil {
|
|
applogger.Warn("%v SpotsTransactionCalculation.LRange:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
|
|
for _, value := range entrustList {
|
|
var msg = make(chan virtual.SpotsTallyCache, 1)
|
|
var entrust virtual.SpotsTallyCache
|
|
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
|
|
applogger.Error("%v SpotsTransactionCalculation.Unmarshal:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
|
|
var newByte []byte
|
|
newByte, err = memory.SpotsCache.Get(publicData.SymbolCache(flags.Xh, entrust.Symbol, flags.TradeTypePrice))
|
|
if err != nil {
|
|
applogger.Warn("%v SpotsTransactionCalculation.Get:%v", common.ErrSpots, err)
|
|
continue
|
|
}
|
|
|
|
priceNew := decimal.RequireFromString(string(newByte))
|
|
|
|
wg.Add(1)
|
|
go func(value string) {
|
|
resultMsg := SpotsConcurrentComputing(&entrust, priceNew, &wg)
|
|
msg <- resultMsg
|
|
}(value)
|
|
|
|
// 处理并发结果
|
|
select {
|
|
case resultMsg, _ := <-msg:
|
|
switch resultMsg.Status {
|
|
case flags.Entrust:
|
|
if !flags.CheckSetting {
|
|
applogger.Info("现货从信道接收挂单信息:%v", resultMsg)
|
|
}
|
|
case flags.Close:
|
|
if !flags.CheckSetting {
|
|
applogger.Info("现货从信道接收平仓信息:%v", resultMsg)
|
|
}
|
|
SpotLiquidation(&resultMsg)
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
applogger.Info("现货并发计算执行完毕......")
|
|
}
|
|
|
|
// SpotsConcurrentComputing
|
|
//
|
|
// @Description: 现货下单判定
|
|
// @param order
|
|
// @param priceNew
|
|
// @param wg
|
|
// @return tradedeal.SpotsTallyCache
|
|
func SpotsConcurrentComputing(order *virtual.SpotsTallyCache, priceNew decimal.Decimal, wg *sync.WaitGroup) virtual.SpotsTallyCache {
|
|
limitPrice := decimal.RequireFromString(order.Order.LimitPrice)
|
|
status := order.Status
|
|
|
|
var deleteCache bool
|
|
var dealPrice string
|
|
|
|
switch order.Order.TradeType {
|
|
case flags.TradeTypeBuy: // buy
|
|
if !flags.CheckSetting {
|
|
applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status)
|
|
}
|
|
|
|
// 限价买入逻辑判定 -> 买入金额 >= 最优对手价格(市价)
|
|
if limitPrice.Cmp(priceNew) >= 0 {
|
|
deleteCache = true
|
|
difference := priceNew.Mul(utils.Difference()) // 设置价差
|
|
dealPrice = priceNew.Add(difference).String() // 平仓价格
|
|
}
|
|
case flags.TradeTypeSell: // sell
|
|
if !flags.CheckSetting {
|
|
applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, priceNew, limitPrice.Cmp(priceNew), order.Status)
|
|
}
|
|
|
|
// 限价卖入逻辑判定 -> 卖出金额 <= 最优对手价格(市价)
|
|
if limitPrice.Cmp(priceNew) <= 0 {
|
|
deleteCache = true
|
|
difference := priceNew.Mul(utils.Difference()) // 设置价差
|
|
dealPrice = priceNew.Sub(difference).String() // 平仓价格
|
|
}
|
|
}
|
|
// 判定订单状态
|
|
if deleteCache {
|
|
order.Status = flags.Close
|
|
}
|
|
// 清理现货挂单缓存列表
|
|
if deleteCache {
|
|
if err := Reds.HDel(context.Background(), setting.MarketSpotsEntrust, order.OrderId).Err(); err != nil {
|
|
applogger.Error("%v SpotsConcurrentComputing.MarketSpotsEntrust.HDel:%v", common.ErrSpots, err)
|
|
order.Status = status
|
|
dealPrice = limitPrice.String()
|
|
}
|
|
}
|
|
|
|
wg.Done()
|
|
return virtual.SpotsTallyCache{
|
|
UserId: order.UserId, // 用户Id
|
|
OrderId: order.OrderId, // 订单Id
|
|
Symbol: order.Symbol, // 下单交易对
|
|
ClosingPrice: dealPrice, // 平仓价格
|
|
Status: order.Status, // 订单状态
|
|
Order: order.Order, // 下单信息
|
|
}
|
|
}
|
|
|
|
// SpotLiquidation
|
|
//
|
|
// @Description: 现货平仓操作
|
|
// @param order
|
|
func SpotLiquidation(order *virtual.SpotsTallyCache) {
|
|
// 现货挂单平仓
|
|
orderId, err := SpotOrdersClosingDB(context.Background(), Msql, order.UserId, order.Order, order.OrderId, order.ClosingPrice)
|
|
if err != nil || len(orderId) == 0 {
|
|
applogger.Error("%v SpotLiquidation.SpotOrdersClosingDB:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
|
|
// 更新用户订单订阅缓存列表
|
|
if err = UpdateSpotsSubscribeStatusHashByOrderId(order.UserId, order.OrderId, setting.SpotsSubscribe, flags.Close); err != nil {
|
|
applogger.Error("%v SpotLiquidation.UpdateSpotsSubscribeStatusByOrderId:%v", common.ErrSpots, err)
|
|
return
|
|
}
|
|
}
|
|
|
|
// LoadLRangeList
|
|
//
|
|
// @Description: 加载本地缓存队列数据
|
|
// @param ctx
|
|
// @param uo
|
|
// @param key
|
|
// @return map[string]string
|
|
// @return error
|
|
func LoadLRangeList(key string) (map[string]string, error) {
|
|
elements, err := Reds.HGetAll(context.Background(), key).Result()
|
|
if err != nil {
|
|
applogger.Error("%v LoadLRangeList.HGetAll:%v", common.ErrSpots, err)
|
|
return map[string]string{}, err
|
|
}
|
|
|
|
return elements, err
|
|
}
|
|
|
|
// SymbolCache
|
|
//
|
|
// @Description: 获取现货交易对缓存Key
|
|
// @param mark
|
|
// @param symbol
|
|
// @param types
|
|
// @return string
|
|
func SymbolCache(mark, symbol string, types int) string {
|
|
return fmt.Sprintf("%v-%v-%v", mark, symbol, types)
|
|
}
|
|
|