You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

698 lines
20 KiB

package selfMarketSpot
import (
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"math/rand"
"strings"
"sync"
"time"
"wss-pool/cmd/common"
"wss-pool/cmd/websocketservice"
"wss-pool/dictionary"
"wss-pool/internal/data/business"
"wss-pool/internal/model"
red "wss-pool/internal/redis"
"wss-pool/logging/applogger"
"wss-pool/pkg/model/market"
)
const (
proportion int32 = 10000 //调价原价比例
step int32 = 20 //调价指数
digits int32 = 4 //保留小数位数
numPrices int = 20 //生成随机价格数量
defaultStep float64 = 0.001 // 默认波动率
PushFrequency int = 3 //S
TimeRemaining int64 = 20 //s
)
var (
contractChan = make(chan string)
SelfSymbol string //现货
InitialPrice decimal.Decimal // 初始价格
KLineMap = make(map[string]KlineLowHigh)
//ClosePrices decimal.Decimal //当前价格 //做调价终止值使用
OldFiveMin int64 //记录K线时间搓
OldFifteenMin int64
OldThirtyMin int64
OldOneHour int64
OldFourHour int64
OldDay int64
OldWeek int64
OldMon int64
IsRun bool
MarketSpotMap = make(map[string]bool) //保存领取的任务
lock sync.Mutex
EndChan = make(chan string)
)
type KlineLowHigh struct {
Low decimal.Decimal
High decimal.Decimal
ID int64
Vol decimal.Decimal
Amount decimal.Decimal
Count decimal.Decimal
Open decimal.Decimal
}
type ConstructorContract struct {
SelfSymbol string `json:"selfSymbol"` //现货
BeginTime string `json:"beginTime"` //开始时间
EndTime string `json:"endTime"` //结束时间
MaxPrice decimal.Decimal `json:"maxPrices"`
MinPrice decimal.Decimal `json:"minPrice"`
}
func NewSelfMarketSpot() {
//首次启动
go func() {
this := new(ConstructorContract)
this.MinPrice = InitialPrice
this.SelfSymbol = SelfSymbol
this.defaultContract()
}()
for {
t := time.NewTimer(1 * time.Minute)
<-t.C
go func() {
if IsRun {
applogger.Info("已有调价在运行, 该次调价不能运行")
return
}
result := getData()
fmt.Println(result)
for _, v := range result {
v.TradeName = common.ToLower(v.TradeName)
if v.TradeName != SelfSymbol {
applogger.Info("parametric inequality")
continue
}
end, _ := common.TimeStrToTimes(v.EndTime)
if end.Unix() <= common.TimeToNow() {
applogger.Info("该调价已过期")
continue
}
if _, ok := MarketSpotMap[fmt.Sprintf("%s-%d", v.TradeName, end.Unix())]; ok {
applogger.Info("该任务正在运行", fmt.Sprintf("%s-%d", v.TradeName, end.Unix()))
continue
}
MarketSpotMap[fmt.Sprintf("%s-%d", v.TradeName, end.Unix())] = true //保存任务
begin, _ := common.TimeStrToTimes(v.BeginTime)
if begin.Unix() < common.TimeToNow() {
applogger.Info("该调价已过期")
continue
} else if end.Unix() <= begin.Unix() {
applogger.Info("begin end 有误")
continue
}
maxPrice, _ := decimal.NewFromString(v.MaxPrice)
//if maxPrice.LessThan(InitialPrice) {
// applogger.Info("调价有误",v.MaxPrice)
// continue
//}
//等待到开始时间
applogger.Info("等待到begin time ", v.BeginTime, v.EndTime)
IsRun = true // 调价任务已开启
time.Sleep(begin.Sub(common.TimeToNows()))
// startd := time.Now()
contractChan <- "constructorStart"
this := new(ConstructorContract)
this.SelfSymbol = SelfSymbol
this.EndTime = v.EndTime
this.MinPrice = InitialPrice
this.MaxPrice = maxPrice
this.BeginTime = v.BeginTime
// fmt.Println("断开 默认波动 时间 Run time: ", time.Since(startd))
this.constructor(begin, end)
//更改状态
fmt.Println("调价结束")
contract := model.NewContractMarket()
contract.ID = v.ID
contract.UpdateIsGetOne()
}
if IsRun && len(result) > 0 {
IsRun = false
applogger.Debug("开启新的默认携程")
this := new(ConstructorContract)
this.MinPrice = InitialPrice
this.SelfSymbol = SelfSymbol
this.defaultContract()
}
}()
}
}
func getData() []model.ContractMarket {
contract := model.NewContractMarket()
contract.IsType = model.Market
symbol := strings.Split(SelfSymbol, "usdt")
contract.TradeName = strings.ToUpper(symbol[0])
result := contract.List()
return result
}
// 规定启动
func (this *ConstructorContract) constructor(begin, end time.Time) {
applogger.Info("开始调价。。。", "结束时间:", this.EndTime, "当前价格:", this.MinPrice, "最终价格", this.MaxPrice)
totalDuration := end.Sub(common.TimeToNows())
closePrice := this.MinPrice
highPrice := this.MinPrice
lowPrice := this.MinPrice
oldPrice := this.MinPrice
var openPrice decimal.Decimal
//temporalFrequency := time.Duration(PushFrequency) * time.Second
timeInterval := float64(totalDuration) / float64(time.Minute)
delta := this.MaxPrice.Sub(closePrice).Div(decimal.NewFromFloat(timeInterval))
fluctuation := closePrice.Mul(decimal.NewFromInt32(step).Div(decimal.NewFromInt32(proportion))).Round(digits)
applogger.Info("timeInterval", timeInterval, "delta", delta, "fluctuation", fluctuation)
// 开始进行调价
for !this.MaxPrice.Equal(InitialPrice) || end.Unix() > common.TimeToNows().Unix() {
fmt.Println("当前价格", InitialPrice)
prices := this.generateRandomPrices(InitialPrice, fluctuation, delta)
openPrice = InitialPrice
this.GetAllLowHigh(highPrice, lowPrice)
nonVanishing(lowPrice)
numFrequency := int(60) / PushFrequency
ClearTotal()
for i := 1; i <= numFrequency; i++ {
//go func(prices []decimal.Decimal, closePrice, openPrice, highPrice, lowPrice, oldPrice decimal.Decimal, this *ConstructorContract) {
start := time.Now() // 获取当前时间
rand.New(rand.NewSource(time.Now().UnixNano()))
key := rand.Intn(len(prices))
closePrice = prices[key]
// applogger.Info("实际落盘价:", closePrice)
//if (closePrice.Sub(this.MaxPrice)).Mul(delta).GreaterThan(decimal.NewFromInt32(0)) {
// closePrice = this.MaxPrice
//}
//当只剩 20 秒
if (end.Unix() - common.TimeToNows().Unix()) < TimeRemaining {
closePrice = this.MaxPrice
}
////生成深度、Trade Detail 数据
CreateDepth(oldPrice, closePrice)
////k 线 详情
this.pullStorage(closePrice, openPrice, highPrice, lowPrice, oldPrice, prices)
applogger.Debug("目标价格", this.MaxPrice, "当前价格", closePrice, "调价结束时间", this.EndTime, "i", i, time.Now().Format("2006-01-02 15:04:05"))
oldPrice = closePrice
//更新市价
lock.Lock()
InitialPrice = closePrice
lock.Unlock()
if end.Unix() <= common.TimeToNows().Unix() {
break
}
fmt.Println("Run time: ", time.Since(start))
s := float64(PushFrequency) - time.Since(start).Seconds()
if s > float64(0) {
applogger.Debug("停留 秒", s)
time.Sleep(time.Duration(s) * time.Second)
}
}
}
}
func nonVanishing(low decimal.Decimal) {
for k, v := range KLineMap {
if v.Low.IsZero() {
v.Low = low
}
KLineMap[k] = v
}
}
func (this *ConstructorContract) defaultContract() {
closePrice := this.MinPrice
highPrice := this.MinPrice
lowPrice := this.MinPrice
oldPrice := this.MinPrice
var openPrice decimal.Decimal
//var temporalFrequency = time.Duration(PushFrequency) * time.Second
for {
select {
case _, ok := <-contractChan: // 从管道接收值
if ok {
applogger.Info("calculateContractPrice start,defaultContract")
return
}
default:
//fmt.Println(time.Now().Format("2006-01-02 15:04:05"))
rand.New(rand.NewSource(time.Now().UnixNano()))
openPrice = InitialPrice
Loop:
prices := calculateContractPrice(InitialPrice)
fmt.Println("开盘价", InitialPrice)
// 更新落盘价、开盘价、最高价和最低价
key := rand.Intn(numPrices)
highPrices := this.getMaxPrices(openPrice, prices)
lowPrices := this.getMinPrices(openPrice, prices)
//被2整除 涨
if key%2 == 0 {
highPrice = this.getMaxPrice(openPrice, highPrices)
lowPrice = openPrice
prices = highPrices
} else {
highPrice = openPrice
lowPrice = this.getMinPrice(openPrice, lowPrices)
prices = lowPrices
}
if len(prices) <= 0 {
goto Loop
}
this.GetAllLowHigh(highPrice, lowPrice)
nonVanishing(lowPrice)
numFrequency := int(60) / PushFrequency
ClearTotal()
for i := 1; i <= numFrequency; i++ {
// start := time.Now() // 获取当前时间
// fmt.Println("Run time: ", time.Since(start))
go func(prices []decimal.Decimal, closePrice, openPrice, highPrice, lowPrice, oldPrice decimal.Decimal, this *ConstructorContract) {
////更新市价
//fmt.Println(closePrice)
rand.New(rand.NewSource(time.Now().UnixNano()))
key := rand.Intn(len(prices))
closePrice = prices[key]
lock.Lock()
InitialPrice = closePrice
lock.Unlock()
//// 生成深度、Trade Detail 数据
CreateDepth(oldPrice, closePrice)
//fmt.Println("Run time: ", time.Since(start))
this.pullStorage(closePrice, openPrice, highPrice, lowPrice, oldPrice, prices)
//fmt.Println("123123123 ", closePrice,openPrice, highPrice, lowPrice, oldPrice)
oldPrice = closePrice
}(prices, closePrice, openPrice, highPrice, lowPrice, oldPrice, this)
time.Sleep(time.Duration(int64(PushFrequency)) * time.Second)
}
}
}
}
func generateRandomStep(min, max decimal.Decimal) decimal.Decimal {
rand.New(rand.NewSource(time.Now().UnixNano()))
return min.Add(decimal.NewFromFloat(rand.Float64()).Mul(max.Sub(min)))
}
// 计算现货价格
func calculateContractPrice(basePrice decimal.Decimal) []decimal.Decimal {
prices := make([]decimal.Decimal, 0)
max := basePrice.Mul(decimal.NewFromFloat(defaultStep)).Round(digits)
min := max.Neg()
for i := 0; i < numPrices; i++ {
price := basePrice.Add(generateRandomStep(max, min)).Round(digits)
prices = append(prices, price)
}
return prices
}
func getOpen(timestamp int64, period string) decimal.Decimal {
tick := GetNewPriceAll(SelfSymbol, period)
open := InitialPrice
if len(tick) > 0 {
switch tick[0].Code {
case timestamp:
open, _ = decimal.NewFromString(tick[0].Open)
default:
open, _ = decimal.NewFromString(tick[0].Close)
}
}
return open
}
func (this *ConstructorContract) GetAllLowHigh(high, low decimal.Decimal) {
//初始化
if len(KLineMap) == 0 {
KLineMap["1min"] = KlineLowHigh{
ID: common.GenerateSingaporeMinuteTimestamp(),
Low: low,
High: high,
Vol: decimal.NewFromInt(0),
Amount: decimal.NewFromInt(0),
Count: decimal.NewFromInt(0),
Open: InitialPrice,
}
to := common.GenerateSingaporeFiveMinTimestamp()
from := to - int64(5*60)
low, high, vol, amount, count := GetTimeNewPrice(SelfSymbol, from, to, "1min")
KLineMap["5min"] = KlineLowHigh{
ID: to,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(to, "5min"),
}
OldFiveMin = to
to = common.GenerateSingaporeFifteenMinTimestamp()
from = to - int64(15*60)
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "5min")
KLineMap["15min"] = KlineLowHigh{
ID: to,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(to, "15min"),
}
OldFifteenMin = to
to = common.GenerateSingaporeThirtyMinTimestamp()
from = to - int64(30*60)
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "15min")
KLineMap["30min"] = KlineLowHigh{
ID: to,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(to, "30min"),
}
OldThirtyMin = to
from = common.GenerateSingaporeHourTimestamp()
to = from + int64(60*59)
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "30min")
KLineMap["60min"] = KlineLowHigh{
ID: from,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(from, "60min"),
}
OldOneHour = from
to = common.GenerateSingaporeFourHourTimestamp()
from = to - (4 * 60 * 60)
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "60min")
KLineMap["4hour"] = KlineLowHigh{
ID: from,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(from, "4hour"),
}
OldFourHour = from
from = common.GenerateSingaporeDayTimestamp("")
to = from + int64(60*60*24-1)
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "4hour")
KLineMap["1day"] = KlineLowHigh{
ID: from,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(from, "1day"),
}
OldDay = from
from = common.GenerateSingaporeMonTimestamp()
to = common.TimeToNow()
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "1week")
KLineMap["1mon"] = KlineLowHigh{
ID: from,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(from, "1mon"),
}
OldMon = from
from = common.GetWeekTimestamp()
to = common.TimeToNow()
low, high, vol, amount, count = GetTimeNewPrice(SelfSymbol, from, to, "1day")
KLineMap["1week"] = KlineLowHigh{
ID: from,
Low: low,
High: high,
Vol: vol,
Amount: amount,
Count: count,
Open: getOpen(from, "1week"),
}
OldWeek = from
}
for k, v := range KLineMap {
switch k {
case "1min":
v.ID = common.GenerateSingaporeMinuteTimestamp()
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.High = high
v.Low = low
v.Open = InitialPrice
case "5min":
v.ID = common.GenerateSingaporeFiveMinTimestamp()
if v.ID > OldFiveMin {
//新时间节点更新
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldFiveMin = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "15min":
v.ID = common.GenerateSingaporeFifteenMinTimestamp()
if v.ID > OldFifteenMin {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldFifteenMin = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "30min":
v.ID = common.GenerateSingaporeThirtyMinTimestamp()
if v.ID > OldThirtyMin {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldThirtyMin = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "60min":
v.ID = common.GenerateSingaporeHourTimestamp()
if v.ID > OldOneHour {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldOneHour = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "4hour":
v.ID = common.GenerateSingaporeFourHourTimestamp() - (4 * 60 * 60)
if v.ID > OldFourHour {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldFourHour = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "1day":
v.ID = common.GenerateSingaporeDayTimestamp("")
if v.ID > OldDay {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldDay = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "1week":
v.ID = common.GetWeekTimestamp()
if v.ID > OldWeek {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldWeek = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
case "1mon":
v.ID = common.GenerateSingaporeMonTimestamp()
if v.ID > OldMon {
v.Vol = decimal.NewFromFloat(0)
v.Amount = decimal.NewFromFloat(0)
v.Count = decimal.NewFromFloat(0)
v.Low = low
v.High = high
v.Open = getOpen(v.ID, k)
OldMon = v.ID
}
if v.Low.GreaterThan(low) {
v.Low = low
}
if v.High.LessThan(high) {
v.High = high
}
}
KLineMap[k] = v
}
}
func (this *ConstructorContract) pullStorage(close, open, high, low, oldPrice decimal.Decimal, prices []decimal.Decimal) {
for _, v := range dictionary.ContractPriceTime {
resp := market.SubscribeCandlestickResponse{
Channel: fmt.Sprintf("market.%s.kline.%s", this.SelfSymbol, v),
Timestamp: KLineMap[v].ID,
Tick: &market.Tick{
Open: KLineMap[v].Open,
High: KLineMap[v].High,
Low: KLineMap[v].Low,
Close: close,
Id: KLineMap[v].ID,
Count: int(TotalCount.Add(KLineMap[v].Count).IntPart()),
Amount: TotalAmount.Add(KLineMap[v].Amount),
},
}
if v == "1day" {
go OneDayDetailMerged(resp)
}
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: resp.Channel,
Content: resp,
Symbol: resp.Channel})
//applogger.Info("SubscribeKline %s:", string(jsonMessage), v)
red.RedisClient.Publish(resp.Channel, string(jsonMessage))
go business.UpdateWsMgo(resp)
}
//详情
detail := market.SubscribeCtDetailResponse{
Channel: fmt.Sprintf("market.%s.detail", this.SelfSymbol),
Timestamp: time.Now().Unix(),
Tick: &market.CtDetailTick{
Open: open,
High: high,
Low: low,
Close: close,
Id: time.Now().Unix(),
Count: TotalCount.IntPart(),
Amount: TotalAmount,
},
}
jsonMessages, _ := json.Marshal(websocketservice.Message{
ServersId: detail.Channel,
Content: detail,
Symbol: detail.Channel})
//applogger.Info("detail :", string(jsonMessages))
red.RedisClient.Publish(detail.Channel, string(jsonMessages))
}
// 生成随机价格波动
func (this *ConstructorContract) generateRandomPrices(currentPrice, fluctuation, delta decimal.Decimal) []decimal.Decimal {
prices := make([]decimal.Decimal, 0)
rand.New(rand.NewSource(time.Now().UnixNano()))
for i := 0; i < numPrices; i++ {
price := currentPrice.Add(delta.Add(fluctuation.Mul(decimal.NewFromFloat(2*rand.Float64() - 1)))).Round(digits)
prices = append(prices, price)
}
return prices
}
// 获取最高价
func (this *ConstructorContract) getMaxPrices(highPrice decimal.Decimal, prices []decimal.Decimal) []decimal.Decimal {
res := make([]decimal.Decimal, 0)
for _, price := range prices {
if price.GreaterThan(highPrice) {
res = append(res, price)
}
}
return res
}
func (this *ConstructorContract) getMaxPrice(highPrice decimal.Decimal, prices []decimal.Decimal) decimal.Decimal {
for _, price := range prices {
if price.GreaterThan(highPrice) {
highPrice = price
}
}
return highPrice
}
// 获取最低价
func (this *ConstructorContract) getMinPrices(lowPrice decimal.Decimal, prices []decimal.Decimal) []decimal.Decimal {
res := make([]decimal.Decimal, 0)
for _, price := range prices {
if price.LessThan(lowPrice) {
res = append(res, price)
}
}
return res
}
func (this *ConstructorContract) getMinPrice(lowPrice decimal.Decimal, prices []decimal.Decimal) decimal.Decimal {
for _, price := range prices {
if price.LessThan(lowPrice) {
lowPrice = price
}
}
return lowPrice
}