You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1439 lines
48 KiB

package data
import (
"context"
"encoding/json"
"github.com/shopspring/decimal"
"matchmaking-system/internal/data/memory"
"matchmaking-system/internal/data/socket/shareData"
"matchmaking-system/internal/data/tradedeal/share"
"matchmaking-system/internal/pkg/flags"
"matchmaking-system/internal/pkg/logging/applogger"
"matchmaking-system/internal/pkg/logging/common"
"matchmaking-system/internal/pkg/setting"
"sync"
"time"
)
/*
大宗(美股|泰股|马股|印尼股|印度股|新加坡股|港股|英股)交易股票行情订阅
1、接收用户大宗交易股下单的交易对大宗交易股行情
2、将订阅行情写入内存中用于并发计算
3、挂单(市价|限价)缓存队列处理-开仓
4、持仓缓存队列处理-平仓
*/
// InitCacheSymbolShareBlk
//
// @Description: 初始化大宗交易代码列表
// @param ctx
// @param uo
func InitCacheSymbolShareBlk(ctx context.Context, uo *Data) {
for {
idnList, err := GetBotStockBlockListByStockId(ctx, uo)
if err != nil {
return
}
for key, value := range idnList {
var cacheKey string
switch key {
case flags.ShareUsMarketType: // 美股
cacheKey = setting.MarketShareBlkUs
case flags.ShareThaMarketType: // 泰股
cacheKey = setting.MarketShareBlkTha
case flags.ShareMysMarketType: // 马股
cacheKey = setting.MarketShareBlkMys
case flags.ShareIdnMarketType: // 印尼股
cacheKey = setting.MarketShareBlkIdn
case flags.ShareInrMarketType: // 印度股
cacheKey = setting.MarketShareBlkInr
case flags.ShareSgdMarketType: // 新加坡股
cacheKey = setting.MarketShareBlkSgd
case flags.ShareHkdMarketType: // 港股
cacheKey = setting.MarketShareBlkHkd
case flags.ShareGbxMarketType: // 英股
cacheKey = setting.MarketShareBlkGbx
case flags.ShareEurMarketType: // 德股
cacheKey = setting.MarketShareBlkEur
case flags.ShareFurMarketType: // 法股
cacheKey = setting.MarketShareBlkFur
case flags.ShareBrlMarketType: // 巴西股
cacheKey = setting.MarketShareBlkBrl
case flags.ShareJpyMarketType: // 日股
cacheKey = setting.MarketShareBlkJpy
default:
continue
}
for _, vue := range value {
if err = InitRedisBlk(uo, vue, cacheKey); err != nil {
continue
}
}
}
time.Sleep(10 * time.Second)
}
}
// InitRedisBlk
//
// @Description:
// @param uo
// @param vue
// @param key
// @return error
func InitRedisBlk(uo *Data, vue, key string) error {
checkBool, err := Reds.HExists(context.Background(), key, vue).Result()
if err != nil {
applogger.Error("%v InitCacheSymbolShareBlk.%v.HExists:%v", common.ErrShareBlk, key, err)
return err
}
if !checkBool {
if err = uo.redisDB.HSet(context.Background(), key, vue, vue).Err(); err != nil {
applogger.Error("%v InitCacheSymbolShareBlk.%v.HSet:%v", common.ErrShareBlk, key, err)
return err
}
}
if !flags.CheckSetting {
applogger.Info("初始化大宗交易股票代码:%v", vue)
}
return nil
}
// InitSubscribeShareBlk
//
// @Description: 初始化大宗交易代码缓存
// @param shareBlk
func InitSubscribeShareBlk(
shareUs *ShareUsSymbol, shareTha *ShareThaSymbol,
shareMys *ShareMysSymbol, shareIdn *ShareIdnSymbol,
shareInr *ShareInrSymbol, shareSgd *ShareSgdSymbol,
shareHkd *ShareHkdSymbol, shareGbx *ShareGbxSymbol,
shareEur *ShareEurSymbol, shareFur *ShareFurSymbol,
shareBrl *ShareBrlSymbol, shareJpy *ShareJpySymbol) {
go InitSubscribeShareUs(shareUs, true) // 初始化美股交易代码缓存
go InitSubscribeShareTha(shareTha, true) // 初始化泰股交易代码缓存
go InitSubscribeShareMys(shareMys, true) // 初始化马股交易代码缓存
go InitSubscribeShareIdn(shareIdn, true) // 初始化印尼股交易代码缓存
go InitSubscribeShareInr(shareInr, true) // 初始化印度股交易代码缓存
go InitSubscribeShareSgd(shareSgd, true) // 初始化新加坡股交易代码缓存
go InitSubscribeShareHkd(shareHkd, true) // 初始化港股交易代码缓存
go InitSubscribeShareGbx(shareGbx, true) // 初始化英股交易代码缓存
go InitSubscribeShareEur(shareEur, true) // 初始化德股交易代码缓存
go InitSubscribeShareFur(shareFur, true) // 初始化法股交易代码缓存
go InitSubscribeShareBrl(shareBrl, true) // 初始化巴西股交易代码缓存
go InitSubscribeShareJpy(shareJpy, true) // 初始化日本股交易代码缓存
}
// InitShareBlkCloseCachePrice
//
// @Description:
// @param shareUs
// @param shareTha
// @param shareMys
// @param shareIdn
// @param shareInr
// @param shareSgd
// @param shareHkd
func InitShareBlkCloseCachePrice() {
go InitShareUsCloseCachePrice(true) // 初始化美股交易代码闭盘价缓存
go InitShareThaCloseCachePrice(true) // 初始化泰股交易代码闭盘价缓存
go InitShareMysCloseCachePrice(true) // 初始化马股交易代码闭盘价缓存
go InitShareIdnCloseCachePrice(true) // 初始化印尼股交易代码闭盘价缓存
go InitShareInrCloseCachePrice(true) // 初始化印度股交易代码闭盘价缓存
go InitShareSgdCloseCachePrice(true) // 初始化新加坡股交易代码闭盘价缓存
go InitShareHkdCloseCachePrice(true) // 初始化港股交易代码闭盘价缓存
go InitShareGbxCloseCachePrice(true) // 初始化英股交易代码闭盘价缓存
go InitShareEurCloseCachePrice(true) // 初始化德股交易代码闭盘价缓存
go InitShareFurCloseCachePrice(true) // 初始化法股交易代码闭盘价缓存
go InitShareBrlCloseCachePrice(true) // 初始化巴西股交易代码闭盘价缓存
go InitShareJpyCloseCachePrice(true) // 初始化日本股交易代码闭盘价缓存
}
// InitShareBlkSetUpPrice
//
// @Description:
func InitShareBlkSetUpPrice() {
go InitShareUsPriceSetUp(true) // 初始化美股交易代码插针价缓存
go InitShareThaPriceSetUp(true) // 初始化泰股交易代码插针价缓存
go InitShareMysPriceSetUp(true) // 初始化马股交易代码插针价缓存
go InitShareIdnPriceSetUp(true) // 初始化印尼股交易代码插针价缓存
go InitShareInrPriceSetUp(true) // 初始化印度股交易代码插针价缓存
go InitShareSgdPriceSetUp(true) // 初始化新加坡股交易代码插针价缓存
go InitShareHkdPriceSetUp(true) // 初始化港股交易代码插针价缓存
go InitShareGbxPriceSetUp(true) // 初始化英股交易代码插针价缓存
go InitShareEurPriceSetUp(true) // 初始化德股交易代码插针价缓存
go InitShareFurPriceSetUp(true) // 初始化法股交易代码插针价缓存
go InitShareBrlPriceSetUp(true) // 初始化巴西股交易代码插针价缓存
go InitShareJpyPriceSetUp(true) // 初始化日本股交易代码插针价缓存
}
// SubscribeShareBlk
//
// @Description: 大宗交易下单交易对行情订阅
// @param ctx
// @param data
// @param shareUs 美股
// @param shareTha 泰股
// @param shareMys 马股
// @param shareIdn 印尼股
// @param shareInr 印度股
// @param shareSgd 新加坡股
// @param shareHkd 港股
// @param shareGbx 英股
func SubscribeShareBlk() {
go InitShareUsCloseNewPrice(true)
go InitShareThaCloseNewPrice(true)
go InitShareMysCloseNewPrice(true)
go InitShareIdnCloseNewPrice(true)
go InitShareInrCloseNewPrice(true)
go InitShareSgdCloseNewPrice(true)
go InitShareHkdCloseNewPrice(true)
go InitShareGbxCloseNewPrice(true)
go InitShareFurCloseNewPrice(true)
go InitShareEurCloseNewPrice(true)
go InitShareBrlCloseNewPrice(true)
go InitShareJpyCloseNewPrice(true)
}
// OrderSubAdminShareBlkSubscribeBySum
//
// @Description: 大宗交易持仓订单浮动盈亏
// @param uo
func OrderSubAdminShareBlkSubscribeBySum() {
for {
var topIc = setting.AdminShareBlkSubscribe
hashMap, err := Reds.HGetAll(context.Background(), topIc).Result()
if err != nil {
applogger.Error("OrderSubAdminShareBlkSubscribeBySum.HGetAll:%v", err)
time.Sleep(5 * time.Second)
continue
}
var hashList []share.ShareBlkTallyCache
for _, value := range hashMap {
var msg share.ShareBlkTallyCache
if err = json.Unmarshal([]byte(value), &msg); err != nil {
time.Sleep(5 * time.Second)
continue
}
switch msg.Status {
case flags.Entrust: // 挂单
case flags.Position: // 持仓
hashList = append(hashList, msg)
default: // 平仓|撤单
err = Reds.HDel(context.Background(), topIc, msg.OrderId).Err()
if err != nil {
applogger.Error("OrderSubAdminShareBlkSubscribeBySum.HDel:%v", err)
continue
}
}
}
applogger.Info("大宗交易股数据量统计:%v", len(hashList))
// 统计泰股总持仓订单浮动盈亏
priceSum := decimal.Zero
for _, value := range hashList {
orderModel := shareData.ShareBlkOrderProcessing(topIc, 1, value)
if orderModel != nil {
if len(orderModel.Price) == 0 || len(orderModel.OpenPrice) == 0 {
continue
}
var subPrice decimal.Decimal
newPrice := decimal.RequireFromString(orderModel.Price)
openPrice := decimal.RequireFromString(orderModel.OpenPrice)
switch value.Order.TradeType {
case flags.TradeTypeBuy: // 买涨 = 新价格 - 开仓价
subPrice = newPrice.Sub(openPrice)
case flags.TradeTypeSell: // 卖跌 = 开仓价 - 新价格
subPrice = openPrice.Sub(newPrice)
default:
subPrice = decimal.Zero
}
price := subPrice.Mul(decimal.RequireFromString(value.Order.OrderNumber))
priceSum = priceSum.Add(price) // 累加盈亏统计
}
}
// 写入缓存
if err = memory.ShareBlkFloating.Set(flags.FloatingBlk, []byte(priceSum.String())); err != nil {
applogger.Error("统计大宗交易股持仓订单浮动盈亏错误:%v", err)
time.Sleep(5 * time.Second)
continue
}
applogger.Info("统计大宗交易股持仓订单浮动盈亏:%v", priceSum)
time.Sleep(1 * time.Second)
}
}
// ShareBlkTransactionEntrust
//
// @Description: 大宗(美股|泰股|马股|印尼股|印度股|新加坡股|港股|英股|巴西股|日本股)交易挂单信息缓存队列
// @param ctx
func ShareBlkTransactionEntrust(ctx context.Context) {
go ShareBlkUsTransactionEntrust(ctx)
go ShareBlkThaTransactionEntrust(ctx)
go ShareBlkMysTransactionEntrust(ctx)
go ShareBlkIdnTransactionEntrust(ctx)
go ShareBlkInrTransactionEntrust(ctx)
go ShareBlkSgdTransactionEntrust(ctx)
go ShareBlkHkdTransactionEntrust(ctx)
go ShareBlkGbxTransactionEntrust(ctx)
go ShareBlkEurTransactionEntrust(ctx)
go ShareBlkFurTransactionEntrust(ctx)
go ShareBlkBrlTransactionEntrust(ctx)
go ShareBlkJpyTransactionEntrust(ctx)
}
// ShareBlkUsTransactionEntrust
//
// @Description: 大宗(美股)交易
// @param ctx
func ShareBlkUsTransactionEntrust(ctx context.Context) {
for {
ShareBlkUsTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkThaTransactionEntrust
//
// @Description: 大宗(泰股)交易
// @param ctx
func ShareBlkThaTransactionEntrust(ctx context.Context) {
for {
ShareBlkThaTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkMysTransactionEntrust
//
// @Description: 大宗(马股)交易
// @param ctx
func ShareBlkMysTransactionEntrust(ctx context.Context) {
for {
ShareBlkMysTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkIdnTransactionEntrust
//
// @Description: 大宗(印尼股)交易
// @param ctx
func ShareBlkIdnTransactionEntrust(ctx context.Context) {
for {
ShareBlkIdnTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkInrTransactionEntrust
//
// @Description: 大宗(印度股)交易
// @param ctx
func ShareBlkInrTransactionEntrust(ctx context.Context) {
for {
ShareBlkInrTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkSgdTransactionEntrust
//
// @Description: 大宗(新加坡股)交易
// @param ctx
func ShareBlkSgdTransactionEntrust(ctx context.Context) {
for {
ShareBlkSgdTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkHkdTransactionEntrust
//
// @Description: 大宗(港股)交易
// @param ctx
func ShareBlkHkdTransactionEntrust(ctx context.Context) {
for {
ShareBlkHkdTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkGbxTransactionEntrust
//
// @Description: 大宗(英股)交易
// @param ctx
func ShareBlkGbxTransactionEntrust(ctx context.Context) {
for {
ShareBlkGbxTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkEurTransactionEntrust
//
// @Description: 大宗(德股)交易
// @param ctx
func ShareBlkEurTransactionEntrust(ctx context.Context) {
for {
ShareBlkEurTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkFurTransactionEntrust
//
// @Description: 大宗(德股)交易
// @param ctx
func ShareBlkFurTransactionEntrust(ctx context.Context) {
for {
ShareBlkFurTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkBrlTransactionEntrust
//
// @Description: 大宗(巴西股)交易
// @param ctx
func ShareBlkBrlTransactionEntrust(ctx context.Context) {
for {
ShareBlkBrlTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkJpyTransactionEntrust
//
// @Description: 大宗(日本股)交易
// @param ctx
func ShareBlkJpyTransactionEntrust(ctx context.Context) {
for {
ShareBlkJpyTransactionCalculationEntrust(ctx)
time.Sleep(600 * time.Millisecond)
}
}
// ShareBlkUsTransactionCalculationEntrust
//
// @Description: 大宗(美股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkUsTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustUs share.ShareUsTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareUsTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareUsMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustUs = share.ShareUsTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockUsConcurrentComputingEntrustMarket(&entrustUs, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(美股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(美股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareUsLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareUsLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(美股)交易股挂单并发计算执行完毕......")
}
// ShareBlkThaTransactionCalculationEntrust
//
// @Description: 大宗(泰股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkThaTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustTha share.ShareThaTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareThaTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareThaMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustTha = share.ShareThaTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockThaConcurrentComputingEntrustMarket(&entrustTha, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(泰股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(泰股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareThaLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareThaLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(泰股)交易股挂单并发计算执行完毕......")
}
// ShareBlkMysTransactionCalculationEntrust
//
// @Description: 大宗(马股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkMysTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustMys share.ShareMysTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareMysTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareMysMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustMys = share.ShareMysTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockMysConcurrentComputingEntrustMarket(&entrustMys, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(马股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(马股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareMysLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareMysLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(马股)交易股挂单并发计算执行完毕......")
}
// ShareBlkIdnTransactionCalculationEntrust
//
// @Description: 大宗(印尼股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkIdnTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustIdn share.ShareIdnTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareIdnTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareIdnMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustIdn = share.ShareIdnTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockIdnConcurrentComputingEntrustMarket(&entrustIdn, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(印尼股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(印尼股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareIdnLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareIdnLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(印尼股)交易股挂单并发计算执行完毕......")
}
// ShareBlkInrTransactionCalculationEntrust
//
// @Description: 大宗(印度股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkInrTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustInr share.ShareInrTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareInrTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareInrMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustInr = share.ShareInrTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockInrConcurrentComputingEntrustMarket(&entrustInr, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(印度股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(印度股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareInrLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareInrLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(印度股)交易股挂单并发计算执行完毕......")
}
// ShareBlkSgdTransactionCalculationEntrust
//
// @Description: 大宗(新加坡股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkSgdTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustSgd share.ShareSgdTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareSgdTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareSgdMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustSgd = share.ShareSgdTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockSgdConcurrentComputingEntrustMarket(&entrustSgd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(新加坡股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(新加坡股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareSgdLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareSgdLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(新加坡股)交易股挂单并发计算执行完毕......")
}
// ShareBlkHkdTransactionCalculationEntrust
//
// @Description: 大宗(港股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkHkdTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrust share.ShareBlkTallyCache
var entrustHkd share.ShareHkdTallyCache
var msg = make(chan share.ShareHkdTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareHkdMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustHkd = share.ShareHkdTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockHkdConcurrentComputingEntrustMarket(&entrustHkd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(港股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(港股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareHkdLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareHkdLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(港股)交易股挂单并发计算执行完毕......")
}
// ShareBlkGbxTransactionCalculationEntrust
//
// @Description: 大宗(英股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkGbxTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkGbxTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustSgd share.ShareGbxTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareGbxTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareGbxMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustSgd = share.ShareGbxTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockGbxConcurrentComputingEntrustMarket(&entrustSgd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(英股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(英股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareGbxLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.ShareGbxLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(英股)交易股挂单并发计算执行完毕......")
}
// ShareBlkEurTransactionCalculationEntrust
//
// @Description: 大宗(德股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkEurTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkEurTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustSgd share.ShareEurTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareEurTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkEurTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareEurMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustSgd = share.ShareEurTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockEurConcurrentComputingEntrustMarket(&entrustSgd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(德股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(德股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareEurLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkEurTransactionCalculationEntrust.ShareEurLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkEurTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(德股)交易股挂单并发计算执行完毕......")
}
// ShareBlkFurTransactionCalculationEntrust
//
// @Description: 大宗(法股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkFurTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkFurTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustSgd share.ShareFurTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareFurTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkFurTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareFurMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustSgd = share.ShareFurTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockFurConcurrentComputingEntrustMarket(&entrustSgd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(法股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(法股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareFurLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkFurTransactionCalculationEntrust.ShareFurLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkFurTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(法股)交易股挂单并发计算执行完毕......")
}
// ShareBlkBrlTransactionCalculationEntrust
//
// @Description: 大宗(巴西股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkBrlTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkBrlTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustSgd share.ShareBrlTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareBrlTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkBrlTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareBrlMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustSgd = share.ShareBrlTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockBrlConcurrentComputingEntrustMarket(&entrustSgd, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(巴西股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(巴西股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareBrlLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkBrlTransactionCalculationEntrust.ShareBrlLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkBrlTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(巴西股)交易股挂单并发计算执行完毕......")
}
// ShareBlkJpyTransactionCalculationEntrust
//
// @Description: 大宗(日股)交易挂单业务逻辑处理
// @param ctx
func ShareBlkJpyTransactionCalculationEntrust(ctx context.Context) {
var wg sync.WaitGroup
// 获取挂单缓存列表
orderMap, err := Reds.HGetAll(context.Background(), setting.MarketShareBlkEntrust).Result()
if err != nil {
applogger.Error("%v ShareBlkJpyTransactionCalculationEntrust.HGetAll:%v", common.ErrShareBlk, err)
return
}
for _, value := range orderMap {
var newPrice decimal.Decimal
var entrustJpy share.ShareJpyTallyCache
var entrust share.ShareBlkTallyCache
var msg = make(chan share.ShareJpyTallyCache, 1)
if err = json.Unmarshal([]byte(value), &entrust); err != nil {
applogger.Error("%v ShareBlkJpyTransactionCalculationEntrust.Unmarshal:%v", common.ErrShareBlk, err)
continue
}
if entrust.Order.Type != flags.ShareJpyMarketType {
continue
}
newPrice = decimal.RequireFromString(entrust.OpenPrice) // 固定开仓设置价格
entrustJpy = share.ShareJpyTallyCache{
UserId: entrust.UserId,
OrderId: entrust.OrderId,
Symbol: entrust.Symbol,
Status: entrust.Status,
OpenPrice: entrust.OpenPrice,
ClosingPrice: entrust.ClosingPrice,
ClosingTime: entrust.ClosingTime,
Order: entrust.Order,
}
wg.Add(1)
go func(value string) {
resultMsg := ShareBlockJpyConcurrentComputingEntrustMarket(&entrustJpy, newPrice, &wg)
msg <- resultMsg
}(value)
// 处理并发结果(需要改成通过信道通知)
select {
case resultMsg, _ := <-msg:
switch resultMsg.Status {
case flags.Entrust: // 挂单中
if !flags.CheckSetting {
applogger.Debug("从信道接收大宗(日股)交易股挂单信息:%v", resultMsg)
}
case flags.Position: // 持仓中
if !flags.CheckSetting {
applogger.Debug("从信道接收到大宗(日股)交易股持仓信息:%v", resultMsg)
}
// 开盘逻辑
if err = ShareJpyLiquidationEntrust(ctx, &resultMsg); err != nil {
applogger.Error("%v ShareBlkJpyTransactionCalculationEntrust.ShareJpyLiquidationEntrust:%v", common.ErrShareBlk, err)
resultMsg.Status = flags.Entrust
resultMsg.OpenPrice = decimal.Zero.String()
byteStr, _ := json.Marshal(resultMsg)
_ = Reds.HSet(context.Background(), setting.MarketShareBlkEntrust, resultMsg.OrderId, string(byteStr)).Err()
continue
}
// 写入持仓缓存列表
var byteStr []byte
byteStr, err = json.Marshal(resultMsg)
if err != nil {
applogger.Error("%v ShareBlkJpyTransactionCalculationEntrust.Marshal:%v", common.ErrShareBlk, err)
continue
}
// 写入持仓队列缓存
if err = Reds.HSet(context.Background(), setting.MarketShareBlkPosition, resultMsg.OrderId, string(byteStr)).Err(); err != nil {
continue
}
}
}
}
wg.Wait()
applogger.Info("大宗(日股)交易股挂单并发计算执行完毕......")
}