You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1250 lines
41 KiB

package data
import (
"context"
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"matchmaking-system/internal/biz/structure"
"matchmaking-system/internal/data/memory"
"matchmaking-system/internal/data/mq/consumer"
"matchmaking-system/internal/data/socket/publicData"
"matchmaking-system/internal/data/socket/shareData"
"matchmaking-system/internal/data/tradedeal/share"
"matchmaking-system/internal/data/tradedeal/virtual"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
"matchmaking-system/internal/pkg/logging/common"
"matchmaking-system/internal/pkg/setting"
"matchmaking-system/internal/pkg/utils"
"sync"
"time"
)
/*
法国股行情订阅
1、接收用户法国股下单的交易对法国股行情
2、将订阅行情写入内存中用于并发计算
3、挂单(市价|限价)缓存队列处理-开仓
4、持仓缓存队列处理-平仓
*/
// ShareFurMessage
// @Description:
type ShareFurMessage struct {
Symbol string `json:"symbol,omitempty"` // 交易所:股票代码
StockCode string `json:"stock_code,omitempty"` // 股票代码
StockName string `json:"stock_name,omitempty"` // 股票名,可能为空
Price decimal.Decimal `json:"price,omitempty"` // 实时价格
UpDownRate decimal.Decimal `json:"up_down_rate,omitempty"` // 涨跌%
UpDown decimal.Decimal `json:"up_down,omitempty"` // 涨跌额
TradeV decimal.Decimal `json:"trade_v,omitempty"` // 技术评级指标值
TradeK string `json:"trade_k,omitempty"` // 技术评级
Vol int64 `json:"vol,omitempty"` // 成交量
TurnoverPriceTotal decimal.Decimal `json:"turnover_price_total,omitempty"` // 成交量*价格
PriceTotal string `json:"price_total,omitempty"` // 总市值
PE string `json:"p_e,omitempty"` // pe
Eps string `json:"eps,omitempty"` // EPS
EmployeesNumber string `json:"employees_number,omitempty"` // 雇员
Plate string `json:"plate,omitempty"` // 板块
Desc string `json:"desc,omitempty"` // 描述
PriceCode string `json:"price_code,omitempty"` // 价格类型
Country string `json:"country,omitempty"` // 国家
Ts int64 `json:"ts,omitempty"` // 时间
Token string `json:"token,omitempty"` // Token 令牌
}
// ShareEurSymbol
// @Description:
type ShareFurSymbol struct {
FurMap chan []byte // 订单交易对 -- 用户下单通知订阅
FurSetMap chan []byte // 订单交易对 -- 用户盘前|盘后行情订阅
FurMapSymbol sync.Map // 订阅交易簿 -- 过滤重复
FurSetMapSymbol sync.Map // 订阅交易簿 -- 过滤重复
}
// InitCacheSymbolShareFur
//
// @Description:
// @param ctx
// @param uo
func InitCacheSymbolShareFur(ctx context.Context, uo *Data) {
for {
thaList, err := GetBotStockFurListByStockId(ctx, uo)
if err != nil {
return
}
for _, value := range thaList {
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketShareFur, value).Result()
if err != nil {
applogger.Error("%v InitCacheSymbolShareFur.MarketShareEur.HExists:%v", common.ErrShareFur, err)
time.Sleep(5 * time.Second)
continue
}
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), setting.MarketShareFur, value, value).Err(); err != nil {
applogger.Error("%v InitCacheSymbolShareFur.MarketShareEur.HSet:%v", common.ErrShareFur, err)
time.Sleep(5 * time.Second)
continue
}
}
if !flags.CheckSetting {
applogger.Info("初始化法股票代码:%v", value)
}
}
time.Sleep(10 * time.Second)
}
}
// InitSubscribeShareFur
//
// @Description: 状态机加载订阅
// @param ctx
// @param uo
// @param shareTha
func InitSubscribeShareFur(shareFur *ShareFurSymbol, check bool) {
for {
var key string
if check {
key = setting.MarketShareBlkFur
} else {
key = setting.MarketShareFur
}
listSpots, err := LoadLRangeList(key)
if err != nil {
applogger.Error("%v InitSubscribeShareEur.LoadLRangeList:%v", common.ErrShareFur, err)
return
}
for _, value := range listSpots {
// Handling duplicate subscriptions
_, ok := shareFur.FurMapSymbol.Load(value)
if ok {
continue
}
shareFur.FurMap <- []byte(value)
}
time.Sleep(10 * time.Second)
}
}
// InitShareFurCloseCachePrice
//
// @Description:
// @param shareSgd
// @param check
func InitShareFurCloseCachePrice(check bool) {
for {
var key string
if check {
key = setting.MarketShareBlkFur
} else {
key = setting.MarketShareFur
}
listSpots, err := LoadLRangeList(key)
if err != nil {
applogger.Error("%v InitShareFurCloseCachePrice.LoadLRangeList:%v", common.ErrShareFur, err)
return
}
for _, value := range listSpots {
SubscribeFurClosePrice(value) // 处理闭盘取价
}
time.Sleep(20 * time.Second)
}
}
// InitShareFurCloseNewPrice
//
// @Description: 同步实时行情
// @param check
func InitShareFurCloseNewPrice(check bool) {
for {
var key string
if check {
key = setting.MarketShareBlkFur
} else {
key = setting.MarketShareFur
}
listSpots, err := LoadLRangeList(key)
if err != nil {
applogger.Error("%v InitShareFurCloseNewPrice.LoadLRangeList:%v", common.ErrShareFur, err)
time.Sleep(20 * time.Second)
continue
}
for _, value := range listSpots {
SubscribeFurCloseNewPrice(value)
}
time.Sleep(500 * time.Millisecond)
}
}
// InitShareFurPriceSetUp
//
// @Description: 同步插针价
// @param check
func InitShareFurPriceSetUp(check bool) {
for {
var key string
if check {
key = setting.MarketShareBlkFur
} else {
key = setting.MarketShareFur
}
listSpots, err := LoadLRangeList(key)
if err != nil {
applogger.Error("%v InitShareFurPriceSetUp.LoadLRangeList:%v", common.ErrShareFur, err)
time.Sleep(20 * time.Second)
continue
}
for _, value := range listSpots {
furKey := publicData.SymbolCache(flags.Fur, value, flags.TradeTypePrice)
furSetCache := fmt.Sprintf("%v%v:%v", flags.StockTradePrices, flags.ShareFurMarketType, value)
vw, err := GetBeforeAndAfterSetPrice(furSetCache)
if err != nil || len(vw) == 0 {
_ = memory.ShareFurPriceSetUp.Delete(furKey)
continue
}
if err = memory.ShareFurPriceSetUp.Set(furKey, []byte(vw)); err != nil {
applogger.Error("%v InitShareFurPriceSetUp.ShareFurPriceSetUp:%v", common.ErrShareFur, err)
}
}
time.Sleep(500 * time.Millisecond)
}
}
// SubscribeFurHSetCodeList
//
// @Description: 同步列表缓存
// @param symbolStr
// @param check
func SubscribeFurHSetCodeList(symbolStr string, check bool) {
var key string
if check {
key = setting.MarketShareBlkFur
} else {
key = setting.MarketShareFur
}
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := Reds.HExists(context.Background(), key, symbolStr).Result()
if err != nil {
return
}
if !checkBool {
if err := Reds.HSet(context.Background(), key, symbolStr, symbolStr).Err(); err != nil {
applogger.Error("%v SubscribeFurHSetCodeList.%v.HSet:%v", common.ErrShareFur, key, err)
return
}
}
}
// SubscribeFurCloseNewPrice
//
// @Description: 获取行情价
// @param symbolStr
// @param check
func SubscribeFurCloseNewPrice(symbolStr string) {
// 获取市场真实开闭盘时间
timeValue, checkWeekday := MarketCheckOpenAndCloseTime(flags.ShareFurMarketType)
// 判定是否盘中
if utils.CheckInPlateTime(timeValue) && checkWeekday {
// 盘中实时价格
realTimePrice, err := Reds.HGet(context.Background(), flags.ShareFranceClosingNewPriceKey, symbolStr).Result()
if err != nil {
realTimePrice = decimal.Zero.String()
}
if realTimePrice != decimal.Zero.String() {
keys := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypePrice)
if err = memory.ShareFurCache.Set(keys, []byte(realTimePrice)); err != nil {
applogger.Error("%v SubscribeFurCloseNewPrice.ShareFurCache:%v", common.ErrShareFur, err)
}
}
}
}
// SubscribeFurClosePrice
//
// @Description: 获取闭盘价
// @param symbolStr
func SubscribeFurClosePrice(symbolStr string) {
var err error
var sub = decimal.Zero
var closeNewPrice, closingPrice, price string
// 闭盘价格
closeNewPrice, err = Reds.HGet(context.Background(), flags.ShareFranceClosingNewPriceKey, symbolStr).Result()
if err != nil || closeNewPrice == decimal.Zero.String() {
closeNewPrice, err = Reds.HGet(context.Background(), flags.ShareFranceClosingPriceKey, symbolStr).Result()
if err != nil || closeNewPrice == decimal.Zero.String() {
closeNewPrice = decimal.Zero.String()
}
}
// 市价(实时价格)
closingPrice, err = Reds.HGet(context.Background(), flags.ShareFranceBeforeClose, symbolStr).Result()
if err != nil {
closingPrice = decimal.Zero.String()
}
// 判定取值数据
if closeNewPrice != decimal.Zero.String() {
price = closeNewPrice
} else {
price = closingPrice
}
closeKey := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypeClosePrice)
if err = memory.ShareFurClosePrice.Set(closeKey, []byte(price)); err != nil {
applogger.Error("%v SubscribeFurClosePrice.Set.price:%v", common.ErrShareFur, err)
}
// 写入涨跌幅价格判定缓存(计算涨跌幅 = 最新价格 - 闭盘价格(昨天关闭价格或者上次毕盘价格))
cn := closeNewPrice != decimal.Zero.String()
cp := closingPrice != decimal.Zero.String()
lcn := utils.IsNumber(closeNewPrice)
lcp := utils.IsNumber(closingPrice)
if cn && cp && lcn && lcp {
sub = decimal.RequireFromString(closingPrice).Sub(decimal.RequireFromString(closeNewPrice))
}
chgKey := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypeChg) // 涨跌幅标识Key
if err = memory.ShareFurChgMark.Set(chgKey, []byte(sub.String())); err != nil {
applogger.Error("%v SubscribeFurClosePrice.Set.price:%v", common.ErrShareFur, err)
}
}
// OrderSubAdminShareFurSubscribeBySum
//
// @Description: 持仓订单浮动盈亏计算
// @param ctx
// @param uo
func OrderSubAdminShareFurSubscribeBySum(uo *Data) {
for {
var topIc = setting.AdminShareFurSubscribe
hashMap, err := uo.redisDB.HGetAll(context.Background(), topIc).Result()
if err != nil {
applogger.Error("OrderSubAdminShareFurSubscribeBySum.HGetAll:%v", err)
time.Sleep(5 * time.Second)
continue
}
var hashList []share.ShareFurTallyCache
for _, value := range hashMap {
var msg share.ShareFurTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
switch msg.Status {
case flags.Entrust: // 挂单
case flags.Position: // 持仓
hashList = append(hashList, msg)
default: // 平仓|撤单
err = uo.redisDB.HDel(context.Background(), topIc, msg.OrderId).Err()
if err != nil {
applogger.Error("OrderSubAdminShareFurSubscribeBySum.HDel:%v", err)
continue
}
}
}
applogger.Info("法股数据量统计:%v", len(hashList))
// 统计德股总持仓订单浮动盈亏
priceSum := decimal.Zero
for _, value := range hashList {
orderModel := shareData.ShareFurOrderProcessing(topIc, value)
if orderModel != nil {
var subPrice decimal.Decimal
newPrice, err := decimal.NewFromString(orderModel.Price)
if err != nil {
continue
}
openPrice, err := decimal.NewFromString(orderModel.OpenPrice)
if err != nil {
continue
}
switch value.Order.TradeType {
case flags.TradeTypeBuy: // 买涨 = 新价格 - 开仓价
subPrice = newPrice.Sub(openPrice)
case flags.TradeTypeSell: // 卖跌 = 开仓价 - 新价格
subPrice = openPrice.Sub(newPrice)
default:
subPrice = decimal.Zero
}
price := subPrice.Mul(decimal.RequireFromString(value.Order.OrderNumber))
priceSum = priceSum.Add(price) // 累加盈亏统计
}
}
// 写入缓存
if err = memory.ShareFurFloating.Set(flags.FloatingFur, []byte(priceSum.String())); err != nil {
applogger.Error("统计法股持仓订单浮动盈亏错误:%v", err)
time.Sleep(5 * time.Second)
continue
}
applogger.Info("统计的法股持仓订单浮动盈亏:%v", priceSum)
time.Sleep(1 * time.Second)
}
}
// SubscribeShareFur
//
// @Description: TODO: 订阅
// @param ctx
// @param uo
// @param shareFur
func SubscribeShareFur(ctx context.Context, shareFur *ShareFurSymbol, check bool) {
for {
select {
case symbol, _ := <-shareFur.FurMap:
symbolStr := string(symbol)
topIc := fmt.Sprintf("%v.%v", symbolStr, flags.CountryFur)
// Handling duplicate subscriptions
_, ok := shareFur.FurMapSymbol.Load(symbolStr)
if ok {
time.Sleep(5 * time.Second)
continue
}
// Transaction determination
var key string
if check {
key = setting.MarketShareBlkFur // 大宗股交易
} else {
key = setting.MarketShareFur // 法股交易
}
// Write to the redis list hot cache for initializing subscriptions when the program is pulled up
checkBool, err := Reds.HExists(context.Background(), key, symbolStr).Result()
if err != nil {
time.Sleep(5 * time.Second)
continue
}
if !checkBool {
if err := Reds.HSet(context.Background(), key, symbolStr, symbolStr).Err(); err != nil {
applogger.Error("%v SubscribeShareFur.%v.HSet:%v", common.ErrShareFur, key, err)
time.Sleep(5 * time.Second)
continue
}
}
// 处理订阅实时行情数据
go func() {
symbolStrCache := symbolStr
pubSub := Reds.Subscribe(ctx, topIc)
defer pubSub.Close()
if _, err := pubSub.Receive(ctx); err != nil {
shareFur.FurMapSymbol.Delete(symbolStrCache)
return
}
ch := pubSub.Channel()
for {
select {
case msg, _ := <-ch:
var subMsg ShareFurMessage
if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
applogger.Error("%v SubscribeShareFur.Unmarshal:%v", common.ErrShareFur, err)
time.Sleep(5 * time.Second)
continue
}
SubscribeFurPrice(subMsg.Price.String(), symbolStr, flags.RealTime)
default:
Reds.Ping(ctx) // 如果没有消息,发送 PING 命令以维护连接
time.Sleep(10 * time.Second)
}
}
}()
// 监控盘前|盘后设置价格
_, okc := shareFur.FurSetMapSymbol.Load(symbolStr)
if !okc {
go func() {
for {
keyCache := fmt.Sprintf("%v%v:%v", flags.StockTradePrices, flags.ShareFurMarketType, symbolStr)
vw, err := GetBeforeAndAfterSetPrice(keyCache)
if err != nil || len(vw) == 0 {
priceCache := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypePrice)
memory.ShareFurPriceSetUp.Delete(priceCache)
time.Sleep(15 * time.Second)
continue
}
SubscribeFurPrice(vw, symbolStr, flags.SetUp)
time.Sleep(10 * time.Second)
}
}()
// Write cache to determine whether to repeatedly open the protocol
shareFur.FurSetMapSymbol.Store(symbolStr, symbolStr)
}
// Write in the map to determine whether to repeat subscription
shareFur.FurMapSymbol.Store(symbolStr, symbolStr)
}
}
}
// SubscribeFurPrice
//
// @Description: TODO: 订阅获取法股实时价格或者[盘前|盘后]设置价格
// @param setPrice
// @param symbolStr
// @param check
func SubscribeFurPrice(setPrice, symbolStr string, check int) {
var err error
var sub = decimal.Zero
// 计算涨跌幅
yClosingPrice, err := Reds.HGet(context.Background(), flags.ShareFranceClosingPriceKey, symbolStr).Result()
if err != nil || yClosingPrice == decimal.Zero.String() {
yClosingPrice, err = Reds.HGet(context.Background(), flags.ShareFranceClosingNewPriceKey, symbolStr).Result()
if err != nil || yClosingPrice == decimal.Zero.String() {
yClosingPrice = setPrice
sub = decimal.Zero
}
}
// 写入涨跌幅缓存数据
yc := yClosingPrice != decimal.Zero.String()
sp := setPrice != decimal.Zero.String()
lyc := utils.IsNumber(yClosingPrice)
lsp := utils.IsNumber(setPrice)
if yc && sp && lyc && lsp {
sub = decimal.RequireFromString(setPrice).Sub(decimal.RequireFromString(yClosingPrice))
}
chgKey := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypeChg)
if err = memory.ShareFurChgMark.Set(chgKey, []byte(sub.String())); err != nil {
applogger.Error("%v SubscribeFurPrice.ShareEurChgMark.Set.price:%v", common.ErrShareFur, err)
}
if !flags.CheckSetting {
applogger.Debug("涨跌幅交易对内存key:%v,涨跌幅数值:%v", symbolStr, sub)
}
// 交易类型:0最新价,1买入,2卖出
price := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypePrice)
switch check {
case flags.RealTime: // 实时价格(优先级高)
if err = memory.ShareFurCache.Set(price, []byte(setPrice)); err != nil {
applogger.Error("%v SubscribeFurPrice.ShareEurCache.Set.price:%v", common.ErrShareFur, err)
}
case flags.SetUp: // 设置[盘前|盘后]价格
if err = memory.ShareFurPriceSetUp.Set(price, []byte(setPrice)); err != nil {
applogger.Error("%v SubscribeFurPrice.ShareEurPriceSetUp:%v", common.ErrShareFur, err)
}
}
if !flags.CheckSetting {
applogger.Debug("股票交易对内存Key:%v,最新价格:%v", price, setPrice)
}
}
// SubscribeFurSetForcedClosure
//
// @Description: TODO: 处理法股强平阈值
// @param symbolStr
func SubscribeFurSetForcedClosure(symbolStr string) {
flatRatio := GetCacheForcedClosure(flags.StockFURSystemSetUpKey, symbolStr)
chgKey := publicData.SymbolCache(flags.Fur, symbolStr, flags.TradeTypeForcedClosure)
if err := memory.ShareFurForcedClosure.Set(chgKey, []byte(flatRatio.String())); err != nil {
applogger.Error("%v SubscribeFurSetForcedClosure.ShareFurForcedClosure err:%v", common.ErrShareFur, err)
return
}
}
// ShareFurTransactionCalculationEntrust
//
// @Description: 法股委托订单
// @param ctx
func ShareFurTransactionEntrust(ctx context.Context) {
for {
ShareFurTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareFurTransactionCalculationEntrust
//
// @Description: 法股挂单缓存队列计算
// @param ctx
func ShareFurTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 全局变量处理
if CheckGlobalTread(flags.FurMarket) {
time.Sleep(5 * time.Second)
return
}
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareFurEntrust).Result()
if err != nil {
applogger.Error("%v ShareFurTransactionCalculationEntrust.HGetAll:%v", common.ErrShareFur, err)
return
}
for _, value := range orderMap {
var msg = make(chan share.ShareFurTallyCache, 1)
var entrust share.ShareFurTallyCache
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareFurTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareFur, err)
continue
}
// 股票实时价格
var newPrice decimal.Decimal
newPrice, err = GetShareFurPrice(entrust.Symbol)
if err != nil {
applogger.Warn("%v ShareEurTransactionCalculationEntrust.GetShareFurPrice:%v---%v", common.ErrShareFur, entrust.Symbol, err)
continue
}
// 判定涨跌幅
//chg := GetShareChgFur(flags.Fur, entrust.Symbol) // 涨跌幅标识
//if share.CheckOrderFurByChg(setting.MarketShareFurEntrust, entrust, chg) {
// continue
//}
wg.Add(1)
go func(value string) {
var resultMsg share.ShareFurTallyCache
switch entrust.Order.DealType {
case flags.DealTypeLimited: // 限价挂单
resultMsg = ShareFurConcurrentComputingEntrustLimited(&entrust, newPrice, &wg)
case flags.DealTypeMarket: // 市价挂单(开盘即成交)
resultMsg = ShareFurConcurrentComputingEntrustMarket(&entrust, newPrice, &wg)
}
msg <- resultMsg // 通知管道异步处理订单操作
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收法股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到法股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareFurLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Warn("%v ShareFurTransactionCalculationEntrust.ShareFurLiquidationEntrust:%v", common.ErrShareFur, err)
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Warn("%v ShareFurTransactionCalculationEntrust.Marshal:%v", common.ErrShareFur, err)
continue
}
// TODO: 写入持仓消息队列
//if err = uo.mqProducer.Entrust.PushNotice(byteStr); err != nil {
// applogger.Error("%v ShareFurTransactionCalculationEntrust.PushNotice:%v", common.ErrShareFur, err)
// continue
//}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareFurPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("法股挂单并发计算执行完毕......")
}
// ShareFurConcurrentComputingEntrustLimited
//
// @Description: 法股挂单(限价|市价)缓存列表业务处理
// @param order
// @param newPrice
// @param bidPrice
// @param askPrice
// @param wg
// @return tradedeal.ShareFurTallyCache
func ShareFurConcurrentComputingEntrustLimited(order *share.ShareFurTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) share.ShareFurTallyCache {
var deleteCache bool
var dealPrice string
var limitPrice = decimal.RequireFromString(order.Order.LimitPrice) // 限价-价格
var status = order.Status // 原始价格
switch order.Order.TradeType {
case flags.TradeTypeBuy: // 买涨
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status)
}
// 限价买入逻辑判定 -> 挂单金额 > 当前价格
if limitPrice.Cmp(newPrice) > 0 {
deleteCache = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Add(difference).String() // 开仓价格
}
case flags.TradeTypeSell: // 买跌
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status)
}
// 限价卖入逻辑判定 -> 挂单金额 < 当前价格
if limitPrice.Cmp(newPrice) < 0 {
deleteCache = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Sub(difference).String() // 开仓价格
}
}
if deleteCache {
order.Status = flags.Position
if err := Reds.HDel(context.Background(), setting.MarketShareFurEntrust, order.OrderId).Err(); err != nil {
applogger.Error("%v ShareFurConcurrentComputingEntrustLimited.MarketShareFurEntrust.HDel:%v", common.ErrShareFur, err)
order.Status = status
}
}
wg.Done()
return share.ShareFurTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
OpenPrice: dealPrice, // 开仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
ClosingTime: order.ClosingTime, // 平仓时间
}
}
// ShareFurConcurrentComputingEntrustMarket
//
// @Description: 法股市价下单进入挂单队列(休市)
// @param order
// @param newPrice
// @param bidPrice
// @param askPrice
// @param wg
// @return tradedeal.ShareFurTallyCache
func ShareFurConcurrentComputingEntrustMarket(order *share.ShareFurTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) share.ShareFurTallyCache {
var deleteCache bool
var dealPrice string
var marketPrice = decimal.RequireFromString(order.Order.MarketPrice) // 市价-价格
var status = order.Status // 原始价格记录
switch order.Order.TradeType {
case flags.TradeTypeBuy: // 买涨
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,限价买入下单价格:%v,最新市价:%v,原始状态:%v", order.UserId, marketPrice, newPrice, order.Status)
}
deleteCache = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Add(difference).String() // 开仓价格
case flags.TradeTypeSell: // 买跌
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,原始状态:%v", order.UserId, marketPrice, newPrice, order.Status)
}
deleteCache = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Sub(difference).String() // 开仓价格
}
if deleteCache {
order.Status = flags.Position
if err := Reds.HDel(context.Background(), setting.MarketShareFurEntrust, order.OrderId).Err(); err != nil {
applogger.Error("%v ShareFurConcurrentComputingEntrustMarket.HDel:%v", common.ErrShareFur, err)
order.Status = status
}
}
wg.Done()
return share.ShareFurTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
OpenPrice: dealPrice, // 开仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
ClosingTime: order.ClosingTime, // 平仓时间
}
}
// ShareBlockFurConcurrentComputingEntrustMarket
//
// @Description: 大宗(法股)交易
// @param order
// @param newPrice
// @param wg
// @return share.ShareFurTallyCache
func ShareBlockFurConcurrentComputingEntrustMarket(order *share.ShareFurTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) share.ShareFurTallyCache {
var status = order.Status // 原始状态
order.Status = flags.Position
if err := Reds.HDel(context.Background(), setting.MarketShareBlkEntrust, order.OrderId).Err(); err != nil {
applogger.Error("%v ShareBlockFurConcurrentComputingEntrustMarket.HDel:%v", common.ErrShareBlk, err)
order.Status = status
}
wg.Done()
return share.ShareFurTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
OpenPrice: newPrice.String(), // 开仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
ClosingTime: order.ClosingTime, // 平仓时间
}
}
// ShareFurMqOpenBusiness
//
// @Description: TODO: 法股持仓消息队列
// @param ctx
// @param uo
func ShareFurMqOpenBusiness(ctx context.Context, uo *Data) {
go func() {
uo.mqConsumer.Entrust.ConsumerNotice()
}()
for {
select {
case message, ok := <-consumer.ConsumerEntrustMq.ShareFurMq:
if !ok {
time.Sleep(5 * time.Second)
applogger.Error("%v ShareFurMqOpenBusiness.ShareFurMq err....", common.ErrShareFur)
continue
}
// 持仓订单缓存数据序列化解析
var resultMsg share.ShareFurTallyCache
if err := json.Unmarshal(message, &resultMsg); err != nil {
applogger.Error("%v ShareFurMqOpenBusiness.Unmarshal:%v", common.ErrShareFur, err)
time.Sleep(5 * time.Second)
continue
}
// 持仓平仓
if err := ShareFurLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareFurMqOpenBusiness.ShareFurLiquidationEntrust:%v", common.ErrShareFur, err)
continue
}
default:
// TODO: 是否处理空队列缓存的情况
applogger.Info("这里没有监控到持仓mq队列消息.........")
time.Sleep(2 * time.Second)
}
}
}
// ShareFurLiquidationEntrust
//
// @Description: 法股挂单-->开仓业务处理
// @param ctx
// @param order
// @return error
func ShareFurLiquidationEntrust(ctx context.Context, order *share.ShareFurTallyCache) error {
// 1、开盘
err := StockFurOpenOrder(ctx, Msql, order.UserId, order.OrderId, order.OpenPrice, order.Order)
if err != nil {
applogger.Error("%v ShareFurLiquidationEntrust.StockFurOpenOrder:%v", common.ErrShareFur, err)
return err
}
// 大宗(新加坡股)交易判定
var entrustKey, adminKey string
if !CheckTypeStatus(order.Order.Type) {
entrustKey = setting.ShareFurSubscribe
adminKey = setting.AdminShareFurSubscribe
} else {
entrustKey = setting.ShareBlkSubscribe
adminKey = setting.AdminShareBlkSubscribe
}
// 2、更新用户订阅缓存列表订单状态
updateKey := virtual.OrderIdListKey(entrustKey, order.UserId)
if err = UpdateShareFurSubscribeHashStatusByOrderId(order.OrderId, updateKey, flags.Position, order.OpenPrice); err != nil {
applogger.Error("%v ShareFurLiquidationEntrust.Entrust.%v:%v", common.ErrShareFur, entrustKey, err)
return err
}
// 3、更新管理员订阅缓存列表订单状态
if err = UpdateShareFurSubscribeHashStatusByOrderId(order.OrderId, adminKey, flags.Position, order.OpenPrice); err != nil {
applogger.Error("%v ShareFurLiquidationEntrust.Entrust.%v:%v", common.ErrShareFur, adminKey, err)
return err
}
return nil
}
// ShareFurTransactionPosition
//
// @Description: 法股持仓订单
// @param ctx
func ShareFurTransactionPosition(ctx context.Context) {
for {
ShareFurTransactionCalculationPosition(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareFurTransactionCalculationPosition
//
// @Description: 法股持仓缓存队列计算
// @param ctx
func ShareFurTransactionCalculationPosition(ctx context.Context) {
var wg sync.WaitGroup
// 判定是否交易
if CheckGlobalTread(flags.FurMarket) {
time.Sleep(5 * time.Second)
return
}
// TODO: 获取IPO未支付订单缓存
userIdMap, _ := GetBotUserArrears(ctx)
// 获取持仓缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareFurPosition).Result()
if err != nil {
applogger.Error("%v ShareFurTransactionCalculationPosition.HGetAll:%v", common.ErrShareFur, err)
return
}
for _, value := range orderMap {
var msg = make(chan share.ShareFurTallyCache, 1)
var entrust share.ShareFurTallyCache
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareFurTransactionCalculationPosition.Unmarshal:%v", common.ErrShareFur, err)
continue
}
// TODO: 判定是否未IPO未支付订单-不能平仓
_, isOk := userIdMap[entrust.UserId]
if isOk {
continue
}
// 处理时间
if !CheckShareSystemTime(entrust.ClosingTime) {
applogger.Debug("法股:%v,平仓时间:%v暂未到达...", entrust.Symbol, entrust.ClosingTime)
continue
}
// TODO: 实时获取强平阈值
//entrust.Order.System.StrongFlatRatio = GetForcedClosureValue(flags.Sgd, entrust.Symbol)
// 股票实时价格
var newPrice decimal.Decimal
newPrice, err = GetShareFurPrice(entrust.Symbol)
if err != nil {
applogger.Warn("%v ShareFurTransactionCalculationPosition.GetShareFurPrice:%v--%v", common.ErrShareFur, entrust.Symbol, err)
continue
}
// 判定涨跌幅
//chg := GetShareChgFur(flags.Fur, entrust.Symbol) // 涨跌幅标识
//if share.CheckOrderFurByChg(setting.MarketShareFurEntrust, entrust, chg) {
// continue
//}
// 判定配额强平
checkFlattening := GetStrongFlatteningThreshold(entrust.UserId)
wg.Add(1)
go func(value string) {
resultMsg := ShareFurConcurrentComputingPosition(&entrust, newPrice, &wg, checkFlattening)
msg <- resultMsg
}(value)
// 处理并发结果
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Position: // 持仓
if !flags.CheckSetting {
applogger.Debug("从信道接收到法股持仓信息:%v", resultMsg)
}
case flags.Close: // 平仓
if !flags.CheckSetting {
applogger.Debug("从信道接收到法股平仓信息:%v", resultMsg)
}
// TODO: 平仓操作(优化写入队列中等待平仓)
//var byteStr []byte
//byteStr, err = json.Marshal(resultMsg)
//if err != nil {
// applogger.Error("%v ShareFurTransactionCalculationPosition.Marshal:%v", common.ErrShareFur, err)
// continue
//}
//if err = uo.mqProducer.Position.PushNotice(byteStr); err != nil {
// applogger.Error("%v ShareFurTransactionCalculationPosition.PushNotice:%v", common.ErrShareFur, err)
// continue
//}
// 平仓操作
if err = ShareFurLiquidationPosition(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareFurTransactionCalculationPosition.ShareFurLiquidationPosition:%v", common.ErrShareFur, err)
continue
}
}
}
}
wg.Wait()
applogger.Info("法股持仓并发计算执行完毕......")
}
// ShareFurConcurrentComputingPosition
//
// @Description: 法股持仓缓存列表业务处理
// @param order
// @param newPrice
// @param bidPrice
// @param askPrice
// @param wg
// @return tradedeal.ShareFurTallyCache
func ShareFurConcurrentComputingPosition(order *share.ShareFurTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup, checkFlattening bool) share.ShareFurTallyCache {
var deleteCache, checkBool bool
var dealPrice string
var sellShort decimal.Decimal
var openPrice = decimal.RequireFromString(order.OpenPrice) // 限价
var orderNumber = decimal.RequireFromString(order.Order.OrderNumber) // 仓位
var pryNum decimal.Decimal
lenPryNum := len(utils.ReplaceStr(order.Order.PryNum)) == 0
if lenPryNum || order.Order.PryNum == flags.SetZero || order.Order.System.UserStatus != flags.SetThree || !utils.IsNumberInt(order.Order.PryNum) {
pryNum = decimal.RequireFromString(flags.SetOne)
} else {
pryNum = decimal.RequireFromString(order.Order.PryNum)
}
// 下单判定设置(false无设置|true止盈止损)
_, stopWinPrice, stopLossPrice, _ := StockEurVoteStopType(order.Order)
switch order.Order.TradeType {
case flags.TradeTypeBuy: // 买涨(强平)
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,持仓开仓价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status)
}
// 买涨:挂止损止盈,止盈价必须<当前价,止损价必须>当前价;
if !stopWinPrice.IsZero() {
if stopWinPrice.Cmp(newPrice) < 0 {
checkBool = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Sub(difference).String() // 平仓价格
}
}
if !stopLossPrice.IsZero() {
if stopLossPrice.Cmp(newPrice) > 0 {
checkBool = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Add(difference).String() // 平仓价格
}
}
// 强行平仓(做多)
if !checkBool {
sellShort = newPrice.Sub(openPrice).Mul(orderNumber)
}
case flags.TradeTypeSell: // 买跌(存在强行平仓)
if !flags.CheckSetting {
applogger.Debug("用户ID:%v,限价持仓买跌下单价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status)
}
// 买跌:挂止损止盈,止盈价必须>当前价,止损价必须<当前价;
if !stopWinPrice.IsZero() {
if stopWinPrice.Cmp(newPrice) > 0 {
checkBool = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Add(difference).String() // 平仓价格
}
}
if !stopLossPrice.IsZero() {
if stopLossPrice.Cmp(newPrice) < 0 {
checkBool = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
dealPrice = newPrice.Sub(difference).String() // 平仓价格
}
}
// 强行平仓(做空)
if !checkBool {
sellShort = openPrice.Sub(newPrice).Mul(orderNumber)
}
}
/* 强行平仓(做多|做空)
1、做多:(最新成交价-开仓成交价)*仓位
2、做空:(开仓成交价-最新成交价)*仓位
3、当浮动盈亏 >= 保证金的70%订单强行平仓
*/
if sellShort.IsNegative() && checkFlattening {
orderAmount := newPrice.Mul(decimal.RequireFromString(order.Order.OrderNumber)).Div(pryNum) // 订单金额 = 当前价格 * 订单数量 / 杠杆
floatPrice := orderAmount.Mul(decimal.RequireFromString(order.Order.System.StrongFlatRatio))
if !flags.CheckSetting {
applogger.Debug("强平保证金:%v,强平浮动70:%v,强行平仓判定:%v", orderAmount, floatPrice, sellShort.Abs().Cmp(floatPrice))
}
if sellShort.Abs().Cmp(floatPrice) >= 0 {
checkBool = true
difference := newPrice.Mul(utils.Difference()) // 设置价差
switch order.Order.TradeType {
case flags.TradeTypeBuy: //买涨
dealPrice = newPrice.Sub(difference).String() // 平仓价格
case flags.TradeTypeSell: // 买跌
dealPrice = newPrice.Add(difference).String() // 平仓价格
}
}
}
if checkBool {
deleteCache = true
order.Status = flags.Close
}
if deleteCache {
// 根据订单ID-删除持仓列表缓存订单
if err := Reds.HDel(context.Background(), setting.MarketShareFurPosition, order.OrderId).Err(); err != nil {
applogger.Error("%v ShareFurConcurrentComputingPosition.Position.HDel:%v", common.ErrShareFur, err)
}
}
wg.Done()
return share.ShareFurTallyCache{
UserId: order.UserId, // 用户Id
OrderId: order.OrderId, // 订单Id
Symbol: order.Symbol, // 下单交易对
ClosingPrice: dealPrice, // 平仓价格
Status: order.Status, // 订单状态
Order: order.Order, // 下单信息
ClosingTime: order.ClosingTime, // 平仓时间
}
}
// ShareFurMqClosingBusiness
//
// @Description: TODO: 处理法股平仓消息队列
// @param ctx
// @param uo
func ShareFurMqClosingBusiness(ctx context.Context, uo *Data) {
go func() {
uo.mqConsumer.Position.ConsumerNotice()
}()
for {
select {
case message, ok := <-consumer.ConsumerPositionMq.ShareFurMq:
if !ok {
time.Sleep(5 * time.Second)
applogger.Error("%v ShareFurMqClosingBusiness.ShareFurMq err....", common.ErrShareFur)
continue
}
applogger.Info("接收到平仓信息:%v", string(message))
// 持仓订单缓存数据序列化解析
var resultMsg share.ShareFurTallyCache
if err := json.Unmarshal(message, &resultMsg); err != nil {
applogger.Error("%v ShareFurMqClosingBusiness.Unmarshal:%v", common.ErrShareFur, err)
time.Sleep(5 * time.Second)
continue
}
// 持仓平仓
if err := ShareFurLiquidationPosition(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareFurMqClosingBusiness.ShareFurLiquidationPosition:%v", common.ErrShareFur, err)
continue
}
default:
// TODO: 是否处理空队列缓存的情况
time.Sleep(2 * time.Second)
}
}
}
// ShareFurLiquidationPosition
//
// @Description: 法股持仓-->平仓业务处理
// @param ctx
// @param order
// @return error
func ShareFurLiquidationPosition(ctx context.Context, order *share.ShareFurTallyCache) error {
// 1、平仓操作
err := StockFurClosingOrder(ctx, Msql, order.OrderId, order.ClosingPrice, order.Order)
if err != nil {
applogger.Error("%v ShareEurLiquidationPosition.StockFurClosingOrder:%v", common.ErrShareFur, err)
return err
}
// 2、平仓更新用户订阅订单状态
userSubKey := virtual.OrderIdListKey(setting.ShareFurSubscribe, order.UserId)
if err = UpdateShareFurSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Close, flags.SetNull); err != nil {
applogger.Error("%v ShareFurLiquidationPosition.Position.ShareFurSubscribe:%v", common.ErrShareFur, err)
return err
}
// 3、平仓更新管理员订阅订单状态
if err = UpdateShareFurSubscribeHashStatusByOrderId(order.OrderId, setting.AdminShareFurSubscribe, flags.Close, flags.SetNull); err != nil {
applogger.Error("%v ShareFurLiquidationPosition.Position.AdminShareFurSubscribe:%v", common.ErrShareFur, err)
return err
}
return nil
}
// StockFurVoteStopType
//
// @Description: 法股判定取值止损止盈
// @param order
// @return bool
// @return decimal.Decimal
// @return decimal.Decimal
// @return error
func StockFurVoteStopType(order structure.ShareOrder) (bool, decimal.Decimal, decimal.Decimal, error) {
var checkBool bool
var stopWinPrice, stopLossPrice decimal.Decimal
switch order.StopType {
case flags.StopTypeNone: // 暂无设置止盈止损
checkBool = false
case flags.StopTypeSet: // 设置止盈止损
checkBool = true
stopWinPrice, _ = decimal.NewFromString(order.StopWinPrice)
stopLossPrice, _ = decimal.NewFromString(order.StopLossPrice)
default:
return checkBool, stopWinPrice, stopLossPrice, flags.ErrContractThirteen
}
return checkBool, stopWinPrice, stopLossPrice, nil
}
// GetShareFurPrice
//
// @Description: 法股票价格
// @param symbol
// @return decimal.Decimal
// @return decimal.Decimal
// @return decimal.Decimal
// @return error
func GetShareFurPrice(symbol string) (decimal.Decimal, error) {
var err error
var priceByte []byte
var newPrice decimal.Decimal
key := publicData.SymbolCache(flags.Fur, symbol, flags.TradeTypePrice)
priceByte, err = memory.GetShareFurCache(key)
if err != nil {
return decimal.Decimal{}, err
}
newPrice, err = decimal.NewFromString(string(priceByte))
if err != nil {
return decimal.Decimal{}, err
}
applogger.Info("GetShareFurPrice:%v,topIc:%v,shareFurPrice:%v", symbol, key, newPrice)
return newPrice, nil
}