You can not select more than 25 topics
			Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
		
		
		
		
		
			
		
			
				
					
					
						
							853 lines
						
					
					
						
							27 KiB
						
					
					
				
			
		
		
		
			
			
			
				
					
				
				
					
				
			
		
		
	
	
							853 lines
						
					
					
						
							27 KiB
						
					
					
				| package data | |
| 
 | |
| import ( | |
| 	"context" | |
| 	"encoding/json" | |
| 	"fmt" | |
| 	"github.com/shopspring/decimal" | |
| 	"matchmaking-system/internal/biz/structure" | |
| 	"matchmaking-system/internal/data/memory" | |
| 	"matchmaking-system/internal/data/mq/consumer" | |
| 	"matchmaking-system/internal/data/socket/forexData" | |
| 	"matchmaking-system/internal/data/socket/publicData" | |
| 	"matchmaking-system/internal/data/tradedeal/virtual" | |
| 	"matchmaking-system/internal/pkg/flags" | |
| 	"matchmaking-system/internal/pkg/logging/applogger" | |
| 	"matchmaking-system/internal/pkg/logging/common" | |
| 	"matchmaking-system/internal/pkg/setting" | |
| 	"strconv" | |
| 	"sync" | |
| 	"time" | |
| 
 | |
| 	forexd "matchmaking-system/internal/data/tradedeal/forex" | |
| ) | |
| 
 | |
| /* | |
| 外汇行情订阅 | |
| 1、初始化现货订阅信息 | |
| 2、接收用户现货下单的交易对 | |
| 3、根据交易对订阅现货行情数据 | |
| 4、写入内存中用于并发计算 | |
| 5、记录在热缓存中 | |
| */ | |
| 
 | |
| // QuitesForex | |
| // @Description: | |
| type QuitesForex struct { | |
| 	Ev string  `json:"ev"` // 事件类型 | |
| 	P  string  `json:"p"`  // 交易对 | |
| 	A  float64 `json:"a"`  // 买价 | |
| 	X  int     `json:"x"`  // 交易所标识符 | |
| 	B  float64 `json:"b"`  // 卖价 | |
| 	T  int64   `json:"t"`  // 时间 | |
| } | |
| type ForexJsonData struct { | |
| 	Event     string  `json:"ev"`   // 事件类型(实时数据) | |
| 	Pair      string  `json:"pair"` // 货币对 | |
| 	Open      float64 `json:"o"`    // 开盘价 | |
| 	Close     float64 `json:"c"`    // 收盘价 | |
| 	High      float64 `json:"h"`    // 最高价 | |
| 	Low       float64 `json:"l"`    // 最低价 | |
| 	Volume    int     `json:"v"`    // 交易量 | |
| 	Timestamp int64   `json:"s"`    // 时间戳 | |
| } | |
| 
 | |
| // ForexSymbol | |
| // @Description: | |
| type ForexSymbol struct { | |
| 	ForexMap       chan []byte // 外汇订单交易对 -- 用户外汇下单通知订阅 | |
| 	ForexMapSymbol sync.Map    // 外汇订阅交易对簿 | |
| } | |
| 
 | |
| // InitCacheSymbolForex | |
| // | |
| //	@Description: | |
| //	@param ctx | |
| //	@param uo | |
| func InitCacheSymbolForex(ctx context.Context, uo *Data) { | |
| 	contractList, err := GetBotForexList(ctx, uo) | |
| 	if err != nil { | |
| 		return | |
| 	} | |
| 
 | |
| 	for _, value := range contractList { | |
| 		// Write to the redis list hot cache for initializing subscriptions when the program is pulled up | |
| 		checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketForex, value).Result() | |
| 		if err != nil { | |
| 			applogger.Error("%v InitCacheSymbolSpots.MarketForex.HExists:%v", common.ErrForex, err) | |
| 			return | |
| 		} | |
| 		applogger.Info("初始化外汇交易对:%v", value) | |
| 
 | |
| 		if !checkBool { | |
| 			if err = uo.redisDB.HSet(context.Background(), setting.MarketForex, value, value).Err(); err != nil { | |
| 				applogger.Error("%v InitCacheSymbolSpots.HSet:%v", common.ErrForex, err) | |
| 				return | |
| 			} | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // InitSubscribeQuotesForex | |
| // | |
| //	@Description: | |
| //	@param ctx | |
| //	@param uo | |
| //	@param contract | |
| func InitSubscribeQuotesForex(contract *ForexSymbol) { | |
| 	for { | |
| 		listSpots, err := LoadLRangeList(setting.MarketForex) | |
| 		if err != nil { | |
| 			applogger.Error("%v InitSubscribeQuotesForex.LoadLRangeList:%v", common.ErrForex, err) | |
| 			return | |
| 		} | |
| 
 | |
| 		for _, value := range listSpots { | |
| 			// Prevent duplicate subscription to CtrIp | |
| 			_, ok := contract.ForexMapSymbol.Load(value) | |
| 			if ok { | |
| 				continue | |
| 			} | |
| 
 | |
| 			contract.ForexMap <- []byte(value) | |
| 		} | |
| 
 | |
| 		time.Sleep(10 * time.Second) | |
| 	} | |
| } | |
| 
 | |
| // SubscribeQuotesForex | |
| // | |
| //	@Description: 外汇订阅 | |
| //	@param ctx | |
| //	@param uo | |
| //	@param contract | |
| func SubscribeQuotesForex(ctx context.Context, uo *Data, forex *ForexSymbol) { | |
| 	for { | |
| 		select { | |
| 		case symbol, _ := <-forex.ForexMap: | |
| 			symbolStr := string(symbol) | |
| 
 | |
| 			// Prevent duplicate subscription to CtrIp | |
| 			_, ok := forex.ForexMapSymbol.Load(symbolStr) | |
| 			if ok { | |
| 				time.Sleep(5 * time.Second) | |
| 				continue | |
| 			} | |
| 
 | |
| 			// Write to the redis list hot cache for initializing subscriptions when the program is pulled up | |
| 			checkBool, err := uo.redisDB.HExists(context.Background(), setting.MarketForex, symbolStr).Result() | |
| 			if err != nil { | |
| 				applogger.Error("%v SubscribeQuotesForex.MarketForex.HExists:%v", common.ErrForex, err) | |
| 				time.Sleep(5 * time.Second) | |
| 				continue | |
| 			} | |
| 			if !checkBool { | |
| 				if err = uo.redisDB.HSet(context.Background(), setting.MarketForex, symbolStr, symbolStr).Err(); err != nil { | |
| 					applogger.Error("%v SubscribeQuotesForex.MarketForex.HSet:%v", common.ErrForex, err) | |
| 					time.Sleep(5 * time.Second) | |
| 					continue | |
| 				} | |
| 			} | |
| 
 | |
| 			// 订阅买一卖一报价 | |
| 			go func() { | |
| 				topIc := fmt.Sprintf("%v.LastForex", symbolStr) | |
| 				pubSub := uo.redisDB.Subscribe(ctx, topIc) | |
| 				defer pubSub.Close() | |
| 				if _, err = pubSub.Receive(ctx); err != nil { | |
| 					applogger.Error("%v SubscribeQuotesForex.Receive:%v", common.ErrForex, err) | |
| 					return | |
| 				} | |
| 				chp := pubSub.Channel() | |
| 				for msg := range chp { | |
| 					var subMsg QuitesForex | |
| 					if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { | |
| 						applogger.Error("%v SubscribeQuotesForex.QuitesForex.Unmarshal:%v", common.ErrForex, err) | |
| 						close(forex.ForexMap) | |
| 						return | |
| 					} | |
| 					// 交易类型:0最新价,1买入,2卖出 | |
| 					askPriceKey := publicData.SymbolCache(flags.Wh, symbolStr, flags.TradeTypeBuy) | |
| 					askPrice := strconv.FormatFloat(subMsg.A, 'g', -1, 64) | |
| 					if err = memory.ForexCache.Set(askPriceKey, []byte(askPrice)); err != nil { | |
| 						applogger.Error("%v askPriceKey Set:%v", common.ErrForex, err) | |
| 						continue | |
| 					} | |
| 					bidPriceKey := publicData.SymbolCache(flags.Wh, symbolStr, flags.TradeTypeSell) | |
| 					bidPrice := strconv.FormatFloat(subMsg.B, 'g', -1, 64) | |
| 					if err = memory.ForexCache.Set(bidPriceKey, []byte(bidPrice)); err != nil { | |
| 						applogger.Error("%v bidPriceKey Set:%v", common.ErrForex, err) | |
| 						continue | |
| 					} | |
| 					if !flags.CheckSetting { | |
| 						applogger.Info("当前写入外汇卖一买一最新报价:%v,%v,%v", symbolStr, askPrice, bidPrice) | |
| 					} | |
| 				} | |
| 			}() | |
| 
 | |
| 			// 订阅实时报价 | |
| 			go func() { | |
| 				topIc := fmt.Sprintf("%v.Forex", symbolStr) | |
| 				pubSub := uo.redisDB.Subscribe(ctx, topIc) | |
| 				defer pubSub.Close() | |
| 				if _, err = pubSub.Receive(ctx); err != nil { | |
| 					applogger.Error("%v SubscribeQuotesForex.Receive:%v", common.ErrForex, err) | |
| 					return | |
| 				} | |
| 				chp := pubSub.Channel() | |
| 				for msg := range chp { | |
| 					var subMsg ForexJsonData | |
| 					if err = json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { | |
| 						applogger.Error("%v SubscribeQuotesForex.QuitesForex.Unmarshal:%v", common.ErrForex, err) | |
| 						close(forex.ForexMap) | |
| 						return | |
| 					} | |
| 					// 交易类型:0最新价,1买入,2卖出 | |
| 					priceKey := publicData.SymbolCache(flags.Wh, symbolStr, flags.TradeTypePrice) | |
| 					price := strconv.FormatFloat(subMsg.Close, 'g', -1, 64) | |
| 					if err = memory.ForexImmediateCache.Set(priceKey, []byte(price)); err != nil { | |
| 						applogger.Error("%v priceKey Set:%v", common.ErrForex, err) | |
| 						continue | |
| 					} | |
| 					if !flags.CheckSetting { | |
| 						applogger.Info("当前写入外汇即时最新报价:%v,%v,%v", symbolStr, priceKey, price) | |
| 					} | |
| 				} | |
| 			}() | |
| 
 | |
| 			// Write in the map to determine whether to repeat subscription | |
| 			forex.ForexMapSymbol.Store(symbolStr, symbolStr) | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // OrderSubAdminForexSubscribeBySum | |
| // | |
| //	@Description: 持仓订单浮动盈亏计算 | |
| //	@param ctx | |
| //	@param uo | |
| func OrderSubAdminForexSubscribeBySum(uo *Data) { | |
| 	for { | |
| 		topIc := setting.AdminForexSubscribe | |
| 		hashMap, err := uo.redisDB.HGetAll(context.Background(), topIc).Result() | |
| 		if err != nil { | |
| 			applogger.Error("orderSubAdminForexSubscribe.HGetAll:%v", err) | |
| 			time.Sleep(5 * time.Second) | |
| 			continue | |
| 		} | |
| 		var hashList []forexd.ForexTallyCache | |
| 		for _, value := range hashMap { | |
| 			var msg forexd.ForexTallyCache | |
| 			if err = json.Unmarshal([]byte(value), &msg); err != nil { | |
| 				time.Sleep(5 * time.Second) | |
| 				continue | |
| 			} | |
| 			switch msg.Status { | |
| 			case flags.Entrust: // 挂单 | |
| 			case flags.Position: // 持仓 | |
| 				hashList = append(hashList, msg) | |
| 			default: // 平仓|撤单 | |
| 				err = uo.redisDB.HDel(context.Background(), topIc, msg.OrderId).Err() | |
| 				if err != nil { | |
| 					applogger.Error("AdminForexSubscribe.HDel:%v", err) | |
| 					continue | |
| 				} | |
| 			} | |
| 		} | |
| 		applogger.Info("外汇数据量统计:%v", len(hashList)) | |
| 		// 统计外汇总持仓订单浮动盈亏 | |
| 		priceSum := decimal.Zero | |
| 		for _, value := range hashList { | |
| 			orderModel := forexData.ForexOrderProcessing(topIc, value) | |
| 			if orderModel != nil { | |
| 				var subPrice decimal.Decimal | |
| 				newPrice, err := decimal.NewFromString(orderModel.Price) | |
| 				if err != nil { | |
| 					continue | |
| 				} | |
| 				openPrice, err := decimal.NewFromString(orderModel.OpenPrice) | |
| 				if err != nil { | |
| 					continue | |
| 				} | |
| 				switch value.Order.TradeType { | |
| 				case flags.TradeTypeBuy: // 买涨 = 新价格 - 开仓价 | |
| 					subPrice = newPrice.Sub(openPrice) | |
| 				case flags.TradeTypeSell: // 卖跌 = 开仓价 - 新价格 | |
| 					subPrice = openPrice.Sub(newPrice) | |
| 				default: | |
| 					subPrice = decimal.Zero | |
| 				} | |
| 				orderNumber := decimal.RequireFromString(value.Order.OrderNumber) | |
| 				pryNum := decimal.RequireFromString(value.Order.PryNum) | |
| 				price := subPrice.Mul(orderNumber).Mul(pryNum) // 外汇浮动盈亏计算 | |
| 				priceSum = priceSum.Add(price)                 // 累加盈亏统计 | |
| 			} | |
| 		} | |
| 		// 写入缓存 | |
| 		if err = memory.ForexFloating.Set(flags.FloatingWh, []byte(priceSum.String())); err != nil { | |
| 			applogger.Error("统计外汇持仓订单浮动盈亏错误:%v", err) | |
| 			time.Sleep(5 * time.Second) | |
| 			continue | |
| 		} | |
| 		applogger.Info("统计外汇持仓订单浮动盈亏:%v", priceSum) | |
| 
 | |
| 		time.Sleep(600 * time.Millisecond) | |
| 	} | |
| } | |
| 
 | |
| // ForexTransactionEntrust | |
| // | |
| //	@Description: 外汇委托订单 | |
| //	@param ctx | |
| func ForexTransactionEntrust(ctx context.Context) { | |
| 	for { | |
| 		ForexTransactionCalculationEntrust(ctx) | |
| 		time.Sleep(400 * time.Millisecond) | |
| 	} | |
| } | |
| 
 | |
| // ForexTransactionCalculationEntrust | |
| // | |
| //	@Description: | |
| //	@param ctx | |
| func ForexTransactionCalculationEntrust(ctx context.Context) { | |
| 	var wg sync.WaitGroup | |
| 
 | |
| 	entrustList, err := Reds.HGetAll(context.Background(), setting.MarketForexEntrust).Result() | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexTransactionCalculationEntrust.HGetAll:%v", common.ErrForex, err) | |
| 		return | |
| 	} | |
| 
 | |
| 	for _, value := range entrustList { | |
| 		var msg = make(chan forexd.ForexTallyCache, 1) | |
| 		var entrust forexd.ForexTallyCache | |
| 		if err = json.Unmarshal([]byte(value), &entrust); err != nil { | |
| 			applogger.Error("%v ForexTransactionCalculationEntrust.Unmarshal:%v", common.ErrForex, err) | |
| 			continue | |
| 		} | |
| 
 | |
| 		var newPrice decimal.Decimal | |
| 		newPrice, err = GetForexPrice(entrust.Symbol, entrust.Order.TradeType) | |
| 		if err != nil { | |
| 			applogger.Error("%v ForexTransactionCalculationPosition.GetForexPrice:%v", common.ErrForex, err) | |
| 			continue | |
| 		} | |
| 
 | |
| 		wg.Add(1) | |
| 		go func(value string) { | |
| 			resultMsg := ForexConcurrentComputingEntrust(&entrust, newPrice, &wg) | |
| 			msg <- resultMsg // 通知管道异步处理订单操作 | |
| 		}(value) | |
| 
 | |
| 		// 处理并发结果(需要改成通过信道通知) | |
| 		select { | |
| 		case resultMsg, _ := <-msg: | |
| 			switch resultMsg.Status { | |
| 			case flags.Entrust: // 挂单中 | |
| 				if !flags.CheckSetting { | |
| 					applogger.Info("从信道接收外汇挂单信息:%v", resultMsg) | |
| 				} | |
| 			case flags.Position: // 持仓中 | |
| 				if !flags.CheckSetting { | |
| 					applogger.Info("从信道接收到外汇持仓信息:%v", resultMsg) | |
| 				} | |
| 
 | |
| 				// 处理开仓逻辑 | |
| 				if err = ForexLiquidationEntrust(ctx, &resultMsg); err != nil { | |
| 					applogger.Error("%v ForexTransactionCalculationEntrust.ForexLiquidationEntrust:%v", common.ErrForex, err) | |
| 					continue | |
| 				} | |
| 				// 写入持仓缓存列表 | |
| 				data, err := json.Marshal(resultMsg) | |
| 				if err != nil { | |
| 					applogger.Error("%v ForexTransactionCalculationEntrust.Marshal:%v", common.ErrForex, err) | |
| 					continue | |
| 				} | |
| 
 | |
| 				// TODO: 写入持仓消息列表 | |
| 				//if err = uo.mqProducer.Entrust.PushNotice(data); err != nil { | |
| 				//	applogger.Error("%v ForexTransactionCalculationEntrust.PushNotice:%v", common.ForexError, err) | |
| 				//	continue | |
| 				//} | |
|  | |
| 				// 写入外汇持仓缓存队列 | |
| 				if err = Reds.HSet(context.Background(), setting.MarketForexPosition, resultMsg.OrderId, string(data)).Err(); err != nil { | |
| 					applogger.Error("%v ForexTransactionCalculationEntrust.MarketForexPosition.HSet:%v", common.ErrForex, err) | |
| 					continue | |
| 				} | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	wg.Wait() | |
| 	applogger.Info("外汇挂单并发计算执行完毕......") | |
| } | |
| 
 | |
| // ForexConcurrentComputingEntrust | |
| // | |
| //	@Description: 外汇挂单缓存列表业务处理 | |
| //	@param order | |
| //	@param orderOld | |
| //	@param newPrice | |
| //	@param bidPrice | |
| //	@param askPrice | |
| //	@param wg | |
| //	@return tradedeal.ForexTallyCache | |
| func ForexConcurrentComputingEntrust(order *forexd.ForexTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) forexd.ForexTallyCache { | |
| 	var deleteCache bool | |
| 	var dealPrice string | |
| 	var limitPrice = decimal.RequireFromString(order.Order.LimitPrice) // 限价-价格 | |
| 	var status = order.Status                                          // 原始价格 | |
|  | |
| 	switch order.Order.TradeType { | |
| 	case flags.TradeTypeBuy: // 买涨 | |
| 		if !flags.CheckSetting { | |
| 			applogger.Info("用户ID:%v,限价买入下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status) | |
| 		} | |
| 		// 限价买入逻辑判定 -> 挂单金额 > 当前价格 | |
| 		if limitPrice.Cmp(newPrice) > 0 { | |
| 			deleteCache = true | |
| 			dealPrice = limitPrice.String() // 开仓价格 | |
| 		} | |
| 	case flags.TradeTypeSell: // 买跌 | |
| 		if !flags.CheckSetting { | |
| 			applogger.Info("用户ID:%v,限价卖出下单价格:%v,最新市价:%v,判定结果:%v,原始状态:%v", order.UserId, limitPrice, newPrice, limitPrice.Cmp(newPrice), order.Status) | |
| 		} | |
| 		// 限价卖入逻辑判定 -> 挂单金额 < 当前价格 | |
| 		if limitPrice.Cmp(newPrice) < 0 { | |
| 			deleteCache = true | |
| 			dealPrice = limitPrice.String() // 开仓价格 | |
| 		} | |
| 	default: | |
| 	} | |
| 
 | |
| 	if deleteCache { | |
| 		order.Status = flags.Position | |
| 		// 删除挂单缓存列表 | |
| 		if err := Reds.HDel(context.Background(), setting.MarketForexEntrust, order.OrderId).Err(); err != nil { | |
| 			applogger.Error("%v ForexConcurrentComputingEntrust.MarketForexEntrust.HDel:%v", common.ErrForex, err) | |
| 			order.Status = status | |
| 			dealPrice = decimal.Zero.String() | |
| 		} | |
| 	} | |
| 
 | |
| 	wg.Done() | |
| 	return forexd.ForexTallyCache{ | |
| 		UserId:    order.UserId,  // 用户Id | |
| 		OrderId:   order.OrderId, // 订单Id | |
| 		Symbol:    order.Symbol,  // 下单交易对 | |
| 		OpenPrice: dealPrice,     // 开仓价格 | |
| 		Status:    order.Status,  // 订单状态 | |
| 		Order:     order.Order,   // 下单信息 | |
| 	} | |
| } | |
| 
 | |
| // ForexMqOpenBusiness | |
| // | |
| //	@Description: TODO: 外汇持仓消息队列 | |
| //	@param ctx | |
| //	@param uo | |
| func ForexMqOpenBusiness(ctx context.Context, uo *Data) { | |
| 	go func() { | |
| 		uo.mqConsumer.Entrust.ConsumerNotice() | |
| 	}() | |
| 	for { | |
| 		select { | |
| 		case message, ok := <-consumer.ConsumerEntrustMq.ForexMq: | |
| 			if !ok { | |
| 				time.Sleep(5 * time.Second) | |
| 				applogger.Error("%v ForexMqOpenBusiness.ForexMq err....", common.ErrForex) | |
| 				continue | |
| 			} | |
| 
 | |
| 			// 持仓订单缓存数据序列化解析 | |
| 			var resultMsg forexd.ForexTallyCache | |
| 			if err := json.Unmarshal(message, &resultMsg); err != nil { | |
| 				applogger.Error("%v ForexMqOpenBusiness.Unmarshal:%v", common.ErrForex, err) | |
| 				time.Sleep(5 * time.Second) | |
| 				continue | |
| 			} | |
| 
 | |
| 			// 持仓平仓 | |
| 			if err := ForexLiquidationEntrust(ctx, &resultMsg); err != nil { | |
| 				applogger.Error("%v ForexMqOpenBusiness.ForexLiquidationEntrust:%v", common.ErrForex, err) | |
| 				continue | |
| 			} | |
| 		default: | |
| 			// TODO: 是否处理空队列缓存的情况 | |
| 			applogger.Error("这里没有监控到持仓mq队列消息.........") | |
| 			time.Sleep(2 * time.Second) | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // ForexLiquidationEntrust | |
| // | |
| //	@Description: 外汇挂单-->开仓业务处理 | |
| //	@param ctx | |
| //	@param order | |
| //	@return error | |
| func ForexLiquidationEntrust(ctx context.Context, order *forexd.ForexTallyCache) error { | |
| 	// 1、外汇开仓操作 | |
| 	err := ForexOpenPosition(ctx, Msql, order.UserId, order.OrderId, order.OpenPrice, order.Order) | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexLiquidationEntrust.ForexOpenPosition:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	// 2、开仓更新用户外汇订阅缓存订单状态 | |
| 	userSubKey := virtual.OrderIdListKey(setting.ForexSubscribe, order.UserId) | |
| 	if err = UpdateForexSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Position, order.ClosingPrice); err != nil { | |
| 		applogger.Error("%v ForexLiquidationEntrust.ForexSubscribe:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	// 3、开仓更新管理员外汇订阅缓存订单状态 | |
| 	if err = UpdateForexSubscribeHashStatusByOrderId(order.OrderId, setting.AdminForexSubscribe, flags.Position, order.ClosingPrice); err != nil { | |
| 		applogger.Error("%v ForexLiquidationEntrust.AdminForexSubscribe:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // ForexTransactionPosition | |
| // | |
| //	@Description: 外汇-持仓业务处理 | |
| //	@param ctx | |
| func ForexTransactionPosition(ctx context.Context) { | |
| 	for { | |
| 		ForexTransactionCalculationPosition(ctx) | |
| 		time.Sleep(400 * time.Millisecond) | |
| 	} | |
| } | |
| 
 | |
| // ForexTransactionCalculationPosition | |
| // | |
| //	@Description: 监控外汇持仓缓存列表(注意:止盈止损|强行平仓) | |
| //	@param ctx | |
| func ForexTransactionCalculationPosition(ctx context.Context) { | |
| 	var wg sync.WaitGroup | |
| 
 | |
| 	entrustList, err := Reds.HGetAll(context.Background(), setting.MarketForexPosition).Result() | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexTransactionCalculationPosition.LRange:%v", common.ErrForex, err) | |
| 		return | |
| 	} | |
| 
 | |
| 	for _, value := range entrustList { | |
| 		var msg = make(chan forexd.ForexTallyCache, 1) | |
| 		var entrust forexd.ForexTallyCache | |
| 		if err = json.Unmarshal([]byte(value), &entrust); err != nil { | |
| 			applogger.Error("%v ForexTransactionCalculationPosition.Unmarshal:%v", common.ErrForex, err) | |
| 			continue | |
| 		} | |
| 
 | |
| 		var newPrice decimal.Decimal | |
| 		newPrice, err = GetForexPriceNew(entrust.Symbol, entrust.Order.TradeType) | |
| 		if err != nil { | |
| 			applogger.Warn("%v ForexTransactionCalculationPosition.GetForexPrice:%v", common.ErrForex, err) | |
| 			continue | |
| 		} | |
| 
 | |
| 		wg.Add(1) | |
| 		go func(value string) { | |
| 			resultMsg := ForexConcurrentComputingPosition(&entrust, newPrice, &wg) | |
| 			msg <- resultMsg | |
| 		}(value) | |
| 
 | |
| 		// 处理并发结果 | |
| 		select { | |
| 		case resultMsg, _ := <-msg: | |
| 			switch resultMsg.Status { | |
| 			case flags.Position: // 持仓 | |
| 				if !flags.CheckSetting { | |
| 					applogger.Info("从信道接收到外汇持仓信息:%v", resultMsg) | |
| 				} | |
| 			case flags.Close: // 平仓 | |
| 				if !flags.CheckSetting { | |
| 					applogger.Info("从信道接收到外汇平仓信息:%v", resultMsg) | |
| 				} | |
| 
 | |
| 				// TODO: 平仓操作(优化写入队列中等待平仓) | |
| 				//var byteStr []byte | |
| 				//byteStr, err = json.Marshal(resultMsg) | |
| 				//if err != nil { | |
| 				//	applogger.Error("%v ForexTransactionCalculationPosition.Marshal:%v", common.ForexError, err) | |
| 				//	continue | |
| 				//} | |
| 				//if err = uo.mqProducer.Position.PushNotice(byteStr); err != nil { | |
| 				//	applogger.Error("%v ForexTransactionCalculationPosition.PushNotice:%v", common.ForexError, err) | |
| 				//	continue | |
| 				//} | |
|  | |
| 				if err = ForexLiquidationPosition(ctx, &resultMsg); err != nil { | |
| 					applogger.Error("%v ForexTransactionCalculationPosition.ForexLiquidationPosition:%v", common.ErrForex, err) | |
| 					continue | |
| 				} | |
| 			} | |
| 		} | |
| 	} | |
| 
 | |
| 	wg.Wait() | |
| 	applogger.Info("外汇持仓并发计算执行完毕......") | |
| } | |
| 
 | |
| // ForexConcurrentComputingPosition | |
| // | |
| //	@Description: 外汇持仓缓存列表业务处理 | |
| //	@param order | |
| //	@param newPrice | |
| //	@param wg | |
| //	@return tradedeal.ForexTallyCache | |
| func ForexConcurrentComputingPosition(order *forexd.ForexTallyCache, newPrice decimal.Decimal, wg *sync.WaitGroup) forexd.ForexTallyCache { | |
| 	var deleteCache, checkBool bool | |
| 	var dealPrice string | |
| 	var sellShort decimal.Decimal | |
| 	var openPrice = decimal.RequireFromString(order.OpenPrice)           // 限价 | |
| 	var orderNumber = decimal.RequireFromString(order.Order.OrderNumber) // 仓位 | |
| 	var status = order.Status                                            // 原始状态 | |
| 	var pryNum = decimal.RequireFromString(order.Order.PryNum)           // 杠杆 | |
|  | |
| 	// 下单判定设置(false无设置|true止盈止损) | |
| 	_, stopWinPrice, stopLossPrice, _ := ForexVoteStopType(order.Order) | |
| 
 | |
| 	switch order.Order.TradeType { | |
| 	case flags.TradeTypeBuy: // 买涨(强平) | |
| 		if !flags.CheckSetting { | |
| 			applogger.Info("用户ID:%v,持仓开仓价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status) | |
| 		} | |
| 		// 买涨挂止损止盈,止盈价必须<当前价,止损价必须>当前价; | |
| 		if !stopWinPrice.IsZero() { | |
| 			if stopWinPrice.Cmp(newPrice) < 0 { | |
| 				checkBool = true | |
| 				dealPrice = newPrice.String() // 平仓价格 | |
| 			} | |
| 		} | |
| 		if !stopLossPrice.IsZero() { | |
| 			if stopLossPrice.Cmp(newPrice) > 0 { | |
| 				checkBool = true | |
| 				dealPrice = newPrice.String() // 平仓价格 | |
| 			} | |
| 		} | |
| 		// 强行平仓(做多) | |
| 		if !checkBool { | |
| 			sellShort = newPrice.Sub(openPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum) | |
| 		} | |
| 	case flags.TradeTypeSell: // 买跌(存在强行平仓) | |
| 		if !flags.CheckSetting { | |
| 			applogger.Info("用户ID:%v,限价持仓买跌下单价格:%v,最新市价:%v,止盈:%v,止损:%v,原始状态:%v", order.UserId, openPrice, newPrice, stopWinPrice, stopLossPrice, order.Status) | |
| 		} | |
| 		// 买跌挂止损止盈,止盈价必须>当前价,止损价必须<当前价; | |
| 		if !stopWinPrice.IsZero() { | |
| 			if stopWinPrice.Cmp(newPrice) > 0 { | |
| 				checkBool = true | |
| 				dealPrice = newPrice.String() // 平仓价格 | |
| 			} | |
| 		} | |
| 		if !stopLossPrice.IsZero() { | |
| 			if stopLossPrice.Cmp(newPrice) < 0 { | |
| 				checkBool = true | |
| 				dealPrice = newPrice.String() // 平仓价格 | |
| 			} | |
| 		} | |
| 		// 强行平仓(做空) | |
| 		if !checkBool { | |
| 			sellShort = openPrice.Sub(newPrice).Mul(orderNumber).Mul(order.Order.System.FaceValue).Mul(pryNum) | |
| 		} | |
| 	default: | |
| 	} | |
| 	/* 强行平仓(做多|做空) | |
| 	1、做多:(最新成交价-开仓成交价)*仓位 | |
| 	1、做空:(开仓成交价-最新成交价)*仓位 | |
| 	2、当浮动盈亏 >= 保证金的70%订单强行平仓 | |
| 	*/ | |
| 	if sellShort.IsNegative() { | |
| 		orderAmount := decimal.RequireFromString(order.Order.EarnestMoney) | |
| 		floatPrice := orderAmount.Mul(order.Order.System.CompelNum) | |
| 		if !flags.CheckSetting { | |
| 			applogger.Info("强平保证金:%v,强平浮动70%:%v,强行平仓判定:%v", orderAmount, floatPrice, sellShort.Abs().Cmp(floatPrice)) | |
| 		} | |
| 		if sellShort.Abs().Cmp(floatPrice) >= 0 { | |
| 			checkBool = true | |
| 			dealPrice = newPrice.String() | |
| 		} | |
| 	} | |
| 	// 判定订单状态 | |
| 	if checkBool { | |
| 		deleteCache = true | |
| 		order.Status = flags.Close | |
| 	} | |
| 
 | |
| 	if deleteCache { | |
| 		// 删除持仓缓存列表数据 | |
| 		if err := Reds.HDel(context.Background(), setting.MarketForexPosition, order.OrderId).Err(); err != nil { | |
| 			applogger.Error("%v ForexConcurrentComputingPosition.MarketForexPosition.HDel:%v", common.ErrForex, err) | |
| 			order.Status = status | |
| 		} | |
| 	} | |
| 
 | |
| 	wg.Done() | |
| 	return forexd.ForexTallyCache{ | |
| 		UserId:       order.UserId,  // 用户Id | |
| 		OrderId:      order.OrderId, // 订单Id | |
| 		Symbol:       order.Symbol,  // 下单交易对 | |
| 		ClosingPrice: dealPrice,     // 平仓价格 | |
| 		Status:       order.Status,  // 订单状态 | |
| 		Order:        order.Order,   // 下单信息 | |
| 	} | |
| } | |
| 
 | |
| // ForexMqClosingBusiness | |
| // | |
| //	@Description: TODO: 处理外汇平仓消息队列 | |
| //	@param ctx | |
| //	@param uo | |
| func ForexMqClosingBusiness(ctx context.Context, uo *Data) { | |
| 	go func() { | |
| 		uo.mqConsumer.Position.ConsumerNotice() | |
| 	}() | |
| 	for { | |
| 		select { | |
| 		case message, ok := <-consumer.ConsumerPositionMq.ForexMq: | |
| 			if !ok { | |
| 				time.Sleep(5 * time.Second) | |
| 				applogger.Error("%v ForexMqClosingBusiness.ForexMq err....", common.ErrForex) | |
| 				continue | |
| 			} | |
| 			applogger.Info("接收到平仓信息:%v", string(message)) | |
| 
 | |
| 			// 持仓订单缓存数据序列化解析 | |
| 			var resultMsg forexd.ForexTallyCache | |
| 			if err := json.Unmarshal(message, &resultMsg); err != nil { | |
| 				applogger.Error("%v ForexMqClosingBusiness.Unmarshal:%v", common.ErrForex, err) | |
| 				time.Sleep(5 * time.Second) | |
| 				continue | |
| 			} | |
| 
 | |
| 			// 持仓平仓 | |
| 			if err := ForexLiquidationPosition(ctx, &resultMsg); err != nil { | |
| 				applogger.Error("%v ForexMqClosingBusiness.ForexLiquidationPosition:%v", common.ErrForex, err) | |
| 				continue | |
| 			} | |
| 		default: | |
| 			// TODO: 是否处理空队列缓存的情况 | |
| 			time.Sleep(2 * time.Second) | |
| 		} | |
| 	} | |
| } | |
| 
 | |
| // ForexLiquidationPosition | |
| // | |
| //	@Description: 外汇平仓操作 | |
| //	@param ctx | |
| //	@param order | |
| //	@return error | |
| func ForexLiquidationPosition(ctx context.Context, order *forexd.ForexTallyCache) error { | |
| 	// 1、平仓操作 | |
| 	err := ForexClosingPosition(ctx, Msql, order.OrderId, order.ClosingPrice, order.Order) | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexLiquidationPosition.ForexClosingPosition:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	// 2、平仓更新用户外汇订阅订单状态 | |
| 	userSubKey := virtual.OrderIdListKey(setting.ForexSubscribe, order.UserId) | |
| 	if err = UpdateForexSubscribeHashStatusByOrderId(order.OrderId, userSubKey, flags.Close, order.ClosingPrice); err != nil { | |
| 		applogger.Error("%v ForexLiquidationPosition.ForexSubscribe:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	// 3、平仓更新管理员外汇订阅订单状态 | |
| 	if err = UpdateForexSubscribeHashStatusByOrderId(order.OrderId, setting.AdminForexSubscribe, flags.Close, order.ClosingPrice); err != nil { | |
| 		applogger.Error("%v ForexLiquidationPosition.AdminForexSubscribe:%v", common.ErrForex, err) | |
| 		return err | |
| 	} | |
| 
 | |
| 	return nil | |
| } | |
| 
 | |
| // ForexVoteStopType | |
| // | |
| //	@Description: 设置外汇止盈止损 | |
| //	@param order | |
| //	@return bool | |
| //	@return decimal.Decimal | |
| //	@return decimal.Decimal | |
| //	@return error | |
| func ForexVoteStopType(order structure.ForexOrder) (bool, decimal.Decimal, decimal.Decimal, error) { | |
| 	var checkBool bool | |
| 	var stopWinPrice, stopLossPrice decimal.Decimal | |
| 	switch order.StopType { | |
| 	case flags.StopTypeNone: // 暂无设置止盈止损 | |
| 		checkBool = false | |
| 	case flags.StopTypeSet: // 设置止盈止损 | |
| 		checkBool = true | |
| 		if len(order.StopWinPrice) != 0 { | |
| 			stopWinPrice = decimal.RequireFromString(order.StopWinPrice) | |
| 		} | |
| 		if len(order.StopLossPrice) != 0 { | |
| 			stopLossPrice = decimal.RequireFromString(order.StopLossPrice) | |
| 		} | |
| 	default: | |
| 		return checkBool, stopWinPrice, stopLossPrice, flags.ErrContractThirteen | |
| 	} | |
| 
 | |
| 	return checkBool, stopWinPrice, stopLossPrice, nil | |
| } | |
| 
 | |
| // GetForexPrice | |
| // | |
| //	@Description: 外汇交易对卖一买一最新报价 | |
| //	@param symbol | |
| //	@return decimal.Decimal | |
| //	@return decimal.Decimal | |
| //	@return decimal.Decimal | |
| //	@return error | |
| func GetForexPrice(symbol string, tradeType int64) (decimal.Decimal, error) { | |
| 	var err error | |
| 	var newPrice []byte | |
| 	var cacheKey string | |
| 
 | |
| 	switch tradeType { | |
| 	case 1: // 外汇交易对买一最新报价 | |
| 		cacheKey = publicData.SymbolCache(flags.Wh, symbol, flags.TradeTypeBuy) | |
| 		newPrice, err = memory.GetForexCache(cacheKey) | |
| 	case 2: // 外汇交易对卖一最新报价 | |
| 		cacheKey = publicData.SymbolCache(flags.Wh, symbol, flags.TradeTypeSell) | |
| 		newPrice, err = memory.GetForexCache(cacheKey) | |
| 	} | |
| 	if err != nil { | |
| 		applogger.Error("%v GetForexPrice.Get:%v", common.ErrForex, err) | |
| 		return decimal.Decimal{}, err | |
| 	} | |
| 
 | |
| 	priceList, err := decimal.NewFromString(string(newPrice)) | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexTransactionCalculationPosition.Get:%v", common.ErrForex, err) | |
| 		return decimal.Decimal{}, err | |
| 	} | |
| 
 | |
| 	applogger.Info("ForexCode:%v,topIc:%v,ForexPrice:%v", symbol, cacheKey, priceList) | |
| 
 | |
| 	return priceList, nil | |
| } | |
| func GetForexPriceNew(symbol string, tradeType int64) (decimal.Decimal, error) { | |
| 	cacheKey := SymbolCache(flags.Wh, symbol, flags.TradeTypePrice) | |
| 	priceNew, err := memory.GetForexImmediateCache(cacheKey) | |
| 	if err != nil { | |
| 		return decimal.Decimal{}, err | |
| 	} | |
| 
 | |
| 	priceList, err := decimal.NewFromString(string(priceNew)) | |
| 	if err != nil { | |
| 		applogger.Error("%v ForexTransactionCalculationPosition.Get:%v", common.ErrForex, err) | |
| 		return decimal.Decimal{}, err | |
| 	} | |
| 
 | |
| 	applogger.Info("ForexCode:%v,topIc:%v,ForexPrice:%v", symbol, cacheKey, priceList) | |
| 
 | |
| 	return priceList, nil | |
| }
 | |
| 
 |