You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
334 lines
10 KiB
334 lines
10 KiB
package data
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/shopspring/decimal"
|
|
"matchmaking-system/internal/data/memory"
|
|
"matchmaking-system/internal/data/socket/publicData"
|
|
"matchmaking-system/internal/data/tradedeal/virtual"
|
|
"matchmaking-system/internal/pkg/flags"
|
|
"matchmaking-system/internal/pkg/logging/applogger"
|
|
"matchmaking-system/internal/pkg/logging/common"
|
|
"matchmaking-system/internal/pkg/setting"
|
|
"matchmaking-system/internal/pkg/utils"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// SecondSymbol
|
|
// @Description: 秒合约Map设置
|
|
type SecondSymbol struct {
|
|
SecondMap chan []byte // 秒合约订单交易对 -- 用户秒合约下单通知订阅
|
|
SecondMapSymbol sync.Map // 秒合约订阅交易对簿
|
|
}
|
|
|
|
// InitSubscribeQuotesSecond
|
|
//
|
|
// @Description: 秒合约初始化
|
|
// @param ctx
|
|
// @param uo
|
|
// @param contract
|
|
func InitSubscribeQuotesSecond() {
|
|
for {
|
|
listSpots, err := LoadLRangeList(setting.MarketContract)
|
|
if err != nil {
|
|
applogger.Error("%v InitSubscribeQuotesSecond.LoadLRangeList:%v", common.ErrSecond, err)
|
|
return
|
|
}
|
|
|
|
for _, value := range listSpots {
|
|
// Prevent duplicate subscription to CtrIp
|
|
_, ok := second.SecondMapSymbol.Load(value)
|
|
if ok {
|
|
continue
|
|
}
|
|
|
|
second.SecondMap <- []byte(value)
|
|
}
|
|
|
|
time.Sleep(10 * time.Second)
|
|
}
|
|
}
|
|
|
|
// SubscribeQuotesSecond
|
|
//
|
|
// @Description: 秒合约订阅
|
|
// @param ctx
|
|
// @param uo
|
|
// @param contract
|
|
func SubscribeQuotesSecond(ctx context.Context, uo *Data, second *SecondSymbol) {
|
|
for {
|
|
select {
|
|
case symbol, _ := <-second.SecondMap:
|
|
symbolStr := string(symbol)
|
|
|
|
// Prevent duplicate subscription to CtrIp
|
|
_, ok := second.SecondMapSymbol.Load(symbolStr)
|
|
if ok {
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
|
|
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
|
|
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketContract, symbolStr).Result()
|
|
if err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.MarketContract.HExists:%v", common.ErrSecond, err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
if !checkBool {
|
|
if err = uo.redisDB.HSet(context.Background(), setting.MarketContract, symbolStr, symbolStr).Err(); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.MarketContract.HSet:%v", common.ErrSecond, err)
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
}
|
|
|
|
// 订阅插针价格
|
|
go func() {
|
|
for {
|
|
var data string
|
|
data, err = uo.redisDB.Get(context.Background(), flags.ContractSystemPriceSetUp).Result()
|
|
if err != nil || data == "[]" {
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
var dataList []ContractSetUp
|
|
if err = json.Unmarshal([]byte(data), &dataList); err != nil {
|
|
time.Sleep(5 * time.Second)
|
|
continue
|
|
}
|
|
for _, value := range dataList {
|
|
checkStart := len(utils.StrReplace(value.BeginTime))
|
|
checkEnd := len(utils.StrReplace(value.EndTime))
|
|
if checkStart == 0 || checkEnd == 0 {
|
|
continue
|
|
}
|
|
if value.SelfContractCode == symbolStr {
|
|
// Determine if it is the current subscription type
|
|
priceKey := publicData.SymbolCache(flags.Hy, symbolStr, flags.TradeTypePrice)
|
|
if !utils.CheckTimeUTC(value.BeginTime, value.EndTime) {
|
|
continue
|
|
}
|
|
if err = memory.SecondPriceSetUp.Set(priceKey, []byte(utils.DecimalsPrice(value.Price))); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.SecondPriceSetUp.set:%v", common.ErrSecond, err)
|
|
continue
|
|
}
|
|
if !flags.CheckSetting {
|
|
applogger.Info("当前写入插针最新价格:%v,%v", priceKey, value.Price)
|
|
}
|
|
}
|
|
}
|
|
time.Sleep(2 * time.Second)
|
|
}
|
|
}()
|
|
|
|
// 订阅实时价格
|
|
go func() {
|
|
topIc := fmt.Sprintf("market.%v.detail", utils.ReplaceStrByValue(symbolStr, "/"))
|
|
pubSub := uo.redisDB.Subscribe(ctx, topIc)
|
|
defer pubSub.Close()
|
|
if _, err = pubSub.Receive(ctx); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.Receive:%v", common.ErrSecond, err)
|
|
return
|
|
}
|
|
chp := pubSub.Channel()
|
|
for msg := range chp {
|
|
var subMsg QuitesContract
|
|
if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.QuitesContract.Unmarshal:%v", common.ErrSecond, err)
|
|
close(second.SecondMap)
|
|
return
|
|
}
|
|
// 交易类型:0最新价,1买入,2卖出
|
|
price := publicData.SymbolCache(flags.Sd, symbolStr, flags.TradeTypePrice)
|
|
if err = memory.SecondCache.Set(price, []byte(subMsg.Content.Tick.Close)); err != nil {
|
|
applogger.Error("%v SubscribeQuotesSecond.Set:%v", common.ErrSecond, err)
|
|
continue
|
|
}
|
|
if !flags.CheckSetting {
|
|
applogger.Info("当前写入最新价格:%v,%v", price, subMsg.Content.Tick.Close)
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Write in the map to determine whether to repeat subscription
|
|
second.SecondMapSymbol.Store(symbolStr, symbolStr)
|
|
}
|
|
}
|
|
}
|
|
|
|
// SecondTransactionPosition
|
|
//
|
|
// @Description: 秒合约-持仓业务处理
|
|
// @param ctx
|
|
func SecondTransactionPosition(ctx context.Context) {
|
|
for {
|
|
SecondTransactionCalculationPosition(ctx)
|
|
time.Sleep(400 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
// SecondTransactionCalculationPosition
|
|
//
|
|
// @Description: 监控秒合约持仓缓存列表(注意:止盈止损|强行平仓)
|
|
// @param ctx
|
|
func SecondTransactionCalculationPosition(ctx context.Context) {
|
|
var wg sync.WaitGroup
|
|
|
|
entrustList, err := Reds.HGetAll(context.Background(), setting.MarketSecondPosition).Result()
|
|
if err != nil {
|
|
applogger.Error("%v SecondTransactionCalculationPosition.LRange:%v", common.ErrSecond, err)
|
|
return
|
|
}
|
|
|
|
for _, value := range entrustList {
|
|
var msg = make(chan virtual.ContractTallyCache, 1)
|
|
var entrust virtual.ContractTallyCache
|
|
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
|
|
applogger.Error("%v SecondTransactionCalculationPosition.Unmarshal:%v", common.ErrSecond, err)
|
|
continue
|
|
}
|
|
|
|
// 查询时间
|
|
var newPrice decimal.Decimal
|
|
newPrice, err = GetSecondPrice(entrust.Symbol)
|
|
if err != nil {
|
|
applogger.Error("%v SecondTransactionCalculationPosition.GetSecondPrice:%v", common.ErrSecond, err)
|
|
continue
|
|
}
|
|
|
|
wg.Add(1)
|
|
go func(value string) {
|
|
resultMsg := SecondConcurrentComputingPosition(&entrust, newPrice, &wg)
|
|
msg <- resultMsg
|
|
}(value)
|
|
|
|
// 处理并发结果
|
|
select {
|
|
case resultMsg, _ := <-msg:
|
|
switch resultMsg.Status {
|
|
case flags.Position: // 持仓
|
|
if !flags.CheckSetting {
|
|
applogger.Info("从信道接收到秒合约持仓信息:%v", resultMsg)
|
|
}
|
|
case flags.Close: // 平仓
|
|
if !flags.CheckSetting {
|
|
applogger.Info("从信道接收到秒合约平仓信息:%v", resultMsg)
|
|
}
|
|
|
|
if err = SecondLiquidationPosition(ctx, &resultMsg); err != nil {
|
|
applogger.Error("%v SecondTransactionCalculationPosition.ContractLiquidationPosition:%v", common.ErrSecond, err)
|
|
continue
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
wg.Wait()
|
|
applogger.Info("秒合约持仓并发计算执行完毕......")
|
|
}
|
|
|
|
// SecondConcurrentComputingPosition
|
|
//
|
|
// @Description: 秒合约持仓缓存列表业务处理
|
|
// @param order
|
|
// @param newPrice
|
|
// @param bidPrice
|
|
// @param askPrice
|
|
// @param wg
|
|
// @return tradedeal.ContractTallyCache
|
|
func SecondConcurrentComputingPosition(order *virtual.ContractTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) virtual.ContractTallyCache {
|
|
var dealPrice string // 平仓价格
|
|
var checkBool bool // 删除缓存状态
|
|
var status = order.Status // 原始价格
|
|
|
|
time1 := time.Now() // 通过时间判定是否平仓
|
|
time2 := order.Order.StopTime
|
|
|
|
if time1.After(time2) {
|
|
dealPrice = newPrice.String() // 时间1在时间2之后
|
|
checkBool = true
|
|
} else if time1.Equal(time2) {
|
|
dealPrice = newPrice.String() // 时间1等于时间2
|
|
checkBool = true
|
|
}
|
|
|
|
if checkBool {
|
|
order.Status = flags.Close // 删除合约持仓缓存列表数据
|
|
if err := Reds.HDel(context.Background(), setting.MarketSecondPosition, order.OrderId).Err(); err != nil {
|
|
applogger.Error("%v ContractConcurrentComputingPosition.MarketContractPosition.HDel:%v", common.ErrSecond, err)
|
|
order.Status = status
|
|
}
|
|
}
|
|
|
|
wg.Done()
|
|
return virtual.ContractTallyCache{
|
|
UserId: order.UserId, // 用户Id
|
|
OrderId: order.OrderId, // 订单Id
|
|
Symbol: order.Symbol, // 下单交易对
|
|
ClosingPrice: dealPrice, // 平仓价格
|
|
Status: order.Status, // 订单状态
|
|
Order: order.Order, // 下单信息
|
|
}
|
|
}
|
|
|
|
// SecondLiquidationPosition
|
|
//
|
|
// @Description: 合约平仓操作
|
|
// @param ctx
|
|
// @param order
|
|
// @return error
|
|
func SecondLiquidationPosition(ctx context.Context, order *virtual.ContractTallyCache) error {
|
|
// 1、平仓操作
|
|
err := SecondClosingPosition(ctx, Msql, order.OrderId, order.ClosingPrice, order.Order)
|
|
if err != nil {
|
|
applogger.Error("%v SecondLiquidationPosition.ContractClosingPosition:%v", common.ErrSecond, err)
|
|
return err
|
|
}
|
|
|
|
// 2、平仓更新用户合约订阅订单状态
|
|
userSubKey := virtual.OrderIdListKey(setting.SecondSubscribe, order.UserId)
|
|
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Close, order.ClosingPrice); err != nil {
|
|
applogger.Error("%v SecondLiquidationPosition.SecondSubscribe:%v", common.ErrSecond, err)
|
|
return err
|
|
}
|
|
|
|
// 3、平仓更新管理员合约订阅订单状态
|
|
if err = UpdateContractSubscribeHashStatusByOrderId(order.OrderId, setting.AdminSecondSubscribe, flags.Close, order.ClosingPrice); err != nil {
|
|
applogger.Error("%v SecondLiquidationPosition.AdminSecondSubscribe:%v", common.ErrSecond, err)
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetSecondPrice
|
|
//
|
|
// @Description: 合约价格
|
|
// @param symbol
|
|
// @return decimal.Decimal
|
|
// @return decimal.Decimal
|
|
// @return decimal.Decimal
|
|
// @return error
|
|
func GetSecondPrice(symbol string) (decimal.Decimal, error) {
|
|
var newPrice decimal.Decimal
|
|
|
|
key := publicData.SymbolCache(flags.Sd, symbol, flags.TradeTypePrice)
|
|
priceByte, err := memory.GetSecondCache(key)
|
|
if err != nil {
|
|
applogger.Error("%v GetSecondPrice.Get:%v", common.ErrSecond, err)
|
|
return decimal.Decimal{}, err
|
|
}
|
|
|
|
newPrice, err = decimal.NewFromString(string(priceByte))
|
|
if err != nil {
|
|
return decimal.Decimal{}, err
|
|
}
|
|
|
|
applogger.Info("secondCode:%v,topIc:%v,secondPrice:%v", symbol, key, newPrice)
|
|
|
|
return newPrice, nil
|
|
}
|
|
|