package business import ( "encoding/json" "errors" "fmt" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "math" "strconv" "strings" "sync" "time" "wss-pool/cmd/common" "wss-pool/config" "wss-pool/dictionary" "wss-pool/internal" "wss-pool/internal/data" red "wss-pool/internal/redis" "wss-pool/logging/applogger" "wss-pool/pkg/model" "wss-pool/pkg/model/stock" ) /* TODO: 股票列表更新规则(注意:每种股票的开盘时间进行更新) 1、股票列表数据采集-定时任务(新增和更新) 2、美股列表数据-定时任务(更新数据) 3、马股列表数据-定时任务(更新数据) */ const ( ContracttTime int = 1 //数字币数据年限 StockTime int = 5 //美股数据年限 PageSize int64 = 120 //聚合股票条数 StockStatusOn int = 1 StockStatusOff int = 2 ) var ( OldNextUrl string StockClosedDataList = map[string]int{ "US": 3, // 美股 "Malaysia": 5, // 马股 "Thailand": 6, // 泰股 "Indonesia": 4, // 印尼股 "India": 7, // 印度股 "Singapore": 9, // 新加坡股 "HongKong": 12, // 港股 "UK": 14, // 英股 "France": 15, // 法股 "Germany": 16, // 德股 "Brazil": 17, // 巴西股 "Japan": 18, // 日股 } ) var CountryStartTime = map[string]int64{ "India": 42300000, "Thailand": 39000000, "Indonesia": 36000000, "Malaysia": 32400000, "Singapore": 32400000, "UK": 54000000, "France": 54000000, "Germany": 54000000, "Brazil": 75600000, "Japan": 42300000, } var pinStock = map[string]map[string]map[string]bool{} var pinStockMutex = sync.RWMutex{} // InitStockList Stock List Collection func InitStockList() { url := fmt.Sprintf("https://%s/v3/reference/tickers?market=stocks&active=true&limit=1000&sort=ticker&apiKey=%s", config.Config.ShareGather.PolygonHost, config.Config.ShareGather.PolygonKey) for url != "" { applogger.Debug("url info:%v", url) bodyStr, err := internal.HttpGet(url) if err != nil { applogger.Error("Failed to query data:%v", err) return } var shareModel stock.StockPolygonParam if err := json.Unmarshal([]byte(bodyStr), &shareModel); err != nil { applogger.Error("Failed to parse stock list information:%v", err) return } //调用失败 if shareModel.Status == "ERROR" { fmt.Printf("%+v", shareModel) time.Sleep(10 * time.Second) continue } url = "" if shareModel.NextUrl != "" && shareModel.NextUrl != OldNextUrl { OldNextUrl = shareModel.NextUrl url = fmt.Sprintf("%s&apiKey=%s&market=stocks", shareModel.NextUrl, config.Config.ShareGather.PolygonKey) } var dataList []mongo.WriteModel for _, value := range shareModel.Results { // TODO: 更新本地股票列表查 value.YesterdayClose = "" value.BeforeClose = "" filter := bson.D{{"Code", bson.M{ "$eq": value.Code, }}, {"Country", bson.M{ "$eq": "US", }}} update := bson.D{{"$set", bson.D{ {"Code", value.Code}, {"Name", value.Name}, {"Country", strings.ToUpper(value.Locale)}, {"Exchange", value.PrimaryExchange}, {"Currency", value.Currency}, {"Type", value.Type}, {"Cik", value.CIK}, {"CompositeFigi", value.CompositeFigi}, {"ShareClassFigi", value.ShareClassFigi}, {"YesterdayClose", value.YesterdayClose}, {"BeforeClose", value.BeforeClose}}}} models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) } if err := data.MgoBulkWrite(data.StockList, dataList); err != nil { applogger.Error("MgoInsertMany err:%v", err) return } } } // TickUpdateStockUS US Stock List Data - Scheduled Tasks (Update Data) func TickUpdateStockUS() { for { now := time.Now() next := now.Add(time.Hour * 24) next = time.Date(next.Year(), next.Month(), next.Day(), 6, 0, 0, 0, next.Location()) t := time.NewTimer(next.Sub(now)) <-t.C UpdateStockUS() } } // TickUpdateStockKLSE Equities List Data - Scheduled Tasks (Update Data) func TickUpdateStockKLSE() { for { now := time.Now() next := now.Add(time.Hour * 24) next = time.Date(next.Year(), next.Month(), next.Day(), 6, 0, 0, 0, next.Location()) t := time.NewTimer(next.Sub(now)) <-t.C UpdateStockKLSE() } } // 火币现货只允许拉取当前2000条数据 func TickUpdateSpotKline() { for _, value := range dictionary.TimeCycle { UpdateSpotKline(value) } return for { t := time.NewTimer(1 * time.Hour) <-t.C for _, value := range dictionary.TimeCycle { UpdateSpotKline(value) } } } // 火币K线价格只允许拉取当前2000条数据 func TickUpdateContractPriceKline() { for _, value := range dictionary.ContractPriceTime { UpdatePriceKline(value) } for { t := time.NewTimer(1 * time.Hour) <-t.C for _, value := range dictionary.ContractPriceTime { UpdatePriceKline(value) } } } // 合约 func TickUpdateContractKline(isAll bool) { if isAll { applogger.Info("start TickUpdateContractKline") //endTime := time.Now().Unix() //startTime := time.Now().AddDate(-0, -6, 0).Unix() for _, value := range dictionary.ContractTime { UpdateContractKline(value) } return } //测试专用 //for _, value := range dictionary.ContractTime { // start := time.Now().Add(-2 * time.Hour).Unix() // end := time.Now().Unix() // UpdateContractKline(value, start, end) //} //for { // t := time.NewTimer(2 * time.Hour) // <-t.C // for _, value := range dictionary.ContractTime { // start := time.Now().Add(-2 * time.Hour).Unix() // end := time.Now().Unix() // UpdateContractKline(value, start, end) // } // //} } // 美股 func TickUpdateStockUs(isAll bool) { if isAll { start := common.TimeToNows().AddDate(0, 0, -20).Format("2006-01-02") end := common.TimeToNows().AddDate(0, 0, 0).Format("2006-01-02") for _, value := range dictionary.StockUsListDayTime { UpdateStockUs(start, end, value) } applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), "run us stock -----------------------------end") return } //nextFiveMinute := common.GenerateSingaporeFifteenMinTimestampOrigins() //waitDuration := nextFiveMinute.Sub(common.TimeToNows()) //time.Sleep(waitDuration) // 60 min runTime := time.Now() // 获取当前时间 if !common.IsOpeningUS() { applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), "us it's not opening time -----------------------------end") return } end := common.GenerateSingaporeFifteenMinTimestampOrigin() * 1000 start := end - int64(60*60*1000) for _, value := range dictionary.StockUsListTime { UpdateStockUs(start, end, value) } fmt.Println("Run time: ", time.Since(runTime)) } func GetTimeNewPrice(symbol string, from, to int64, country, period string) error { //fmt.Println("country", country, "period", period, "from", common.ConvertToTimeStr(from/1000), from, "to", common.ConvertToTimeStr(to/1000), to) filter := bson.M{"symbol": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}} tableName := data.GetStockTableName(common.CapitalizeFirstLetter(country)) highRes := make([]model.StockMogoParam, 0) projection := bson.M{"price": 1} sort := bson.M{"price": -1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &highRes, 1) if len(highRes) <= 0 { applogger.Error(symbol+" no data", period) return errors.New(symbol + " no data") } high := highRes[0].Price lowRes := make([]model.StockMogoParam, 0) sort = bson.M{"price": 1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &lowRes, 1) low := lowRes[0].Price openRes := make([]model.StockMogoParam, 0) sort = bson.M{"timestamp": 1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &openRes, 1) open := openRes[0].Price closeRes := make([]model.StockMogoParam, 0) projection = bson.M{} sort = bson.M{"timestamp": -1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &closeRes, 1) var dataList []mongo.WriteModel filter = bson.M{"timestamp": bson.M{"$eq": from}, "symbol": bson.M{"$eq": symbol}} update := bson.D{{"$set", bson.D{ {"symbol", closeRes[0].Symbol}, {"stock_code", closeRes[0].StockCode}, {"stock_name", closeRes[0].StockName}, {"open_price", fmt.Sprintf("%f", open)}, {"high_price", fmt.Sprintf("%f", high)}, {"low_price", fmt.Sprintf("%f", low)}, {"close_price", fmt.Sprintf("%f", closeRes[0].Price)}, {"up_down_rate", closeRes[0].UpDownRate}, {"up_down", closeRes[0].UpDown}, {"trade_v", closeRes[0].TradeV}, {"trade_k", closeRes[0].TradeK}, {"vol", closeRes[0].Vol}, {"turnover_price_total", closeRes[0].TurnoverPriceTotal}, {"price_total", closeRes[0].PriceTotal}, //{"p_e", res[l].PE}, //{"eps", res[l].Eps}, //{"employees_number", res[l].EmployeesNumber}, //{"plate", res[l].Plate}, //{"desc", res[l].Desc}, {"price_code", closeRes[0].PriceCode}, {"country", closeRes[0].Country}, {"timestamp", from}, }}} //applogger.Info("GetTimeNewPrice info: %v", update) models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) if err := data.MgoBulkWrite(data.GetStockSouthAsiaTableName(country, period), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) return err } return nil } func GetTimeNewPriceAll(symbol string, from, to int64, country, period, periodPre string) { //fmt.Println("country", country, "period", period, "from", common.ConvertToTimeStr(from/1000), "to", common.ConvertToTimeStr(to/1000)) filter := bson.M{"symbol": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}} tableName := data.GetStockSouthAsiaTableName(country, periodPre) projection := bson.M{"stringVal": 1} sort := bson.M{"stringVal": -1} highRes := data.MgoFindProjectionAggregate(tableName, "high_price", filter, projection, sort, 1) if len(highRes) <= 0 { applogger.Error(symbol+" no data", period) return } if highRes[0].StringVal <= 0 { applogger.Error(symbol+" no data", period) return } high := highRes[0].StringVal sort = bson.M{"stringVal": 1} lowRes := data.MgoFindProjectionAggregate(tableName, "low_price", filter, projection, sort, 1) low := lowRes[0].StringVal sort = bson.M{"timestamp": 1} openRes := data.MgoFindProjectionAggregate(tableName, "open_price", filter, projection, sort, 1) open := openRes[0].StringVal sort = bson.M{"timestamp": -1} closeRes := data.MgoFindProjectionAggregate(tableName, "close_price", filter, projection, sort, 1) var dataList []mongo.WriteModel ts := from if period == "1day" || period == "1week" || period == "1mon" { ts = from + CountryStartTime[country] if country == "Brazil" && (period == "1day" || period == "1mon") { ts = from } } filter = bson.M{"timestamp": bson.M{"$eq": ts}, "symbol": bson.M{"$eq": symbol}} update := bson.D{{"$set", bson.D{ {"symbol", closeRes[0].Symbol}, {"stock_code", closeRes[0].StockCode}, {"stock_name", closeRes[0].StockName}, {"open_price", fmt.Sprintf("%f", open)}, {"high_price", fmt.Sprintf("%f", high)}, {"low_price", fmt.Sprintf("%f", low)}, {"close_price", fmt.Sprintf("%f", closeRes[0].StringVal)}, //{"up_down_rate", res[l].UpDownRate}, //{"up_down", res[l].UpDown}, //{"trade_v", res[l].TradeV}, //{"trade_k", res[l].TradeK}, {"vol", closeRes[0].Vol}, {"turnover_price_total", closeRes[0].TurnoverPriceTotal}, {"price_total", closeRes[0].PriceTotal}, //{"p_e", res[l].PE}, //{"eps", res[l].Eps}, //{"employees_number", res[l].EmployeesNumber}, //{"plate", res[l].Plate}, // {"desc", res[l].Desc}, {"price_code", closeRes[0].PriceCode}, {"country", closeRes[0].Country}, {"timestamp", ts}, }}} //applogger.Info("GetTimeNewPriceAll info: %v", update) models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) if err := data.MgoBulkWrite(data.GetStockSouthAsiaTableName(country, period), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) } } func GetStockAll(country string, pageNum, pageSize int64) ([]stock.StockPolygon, int64, int64) { filter := bson.M{"Country": country, "YesterdayClose": bson.M{"$ne": ""}} projection := bson.M{"Code": 1, "Country": 1} res := make([]stock.StockPolygon, 0) total, _ := data.MgoFindTotal(data.StockList, filter) data.MgoPagingFindStructProjection(data.StockList, filter, projection, pageSize, pageNum, -1, &res) //印度股票过多 现在只画后台有权限得股票 //if country == "India" { // data := make([]stock.StockPolygon, 0) // for k, v := range res { // if common.IsExistStock(v.Locale, v.Code) { // data = append(data, res[k]) // } // } // return data, total, int64(math.Ceil(float64(total) / float64(pageSize))) //} return res, total, int64(math.Ceil(float64(total) / float64(pageSize))) } func getOpen(timestamp int64, country, period, symbol string, price string) string { filter := bson.M{"stock_code": symbol} projection := bson.M{"timestamp": 1, "open_price": 1, "close_price": 1} sort := bson.M{"timestamp": -1} res, _ := data.MgoFindProjection(data.GetStockSouthAsiaTableName(country, period), filter, projection, sort, int64(1)) open := price if len(res) > 0 { timestamps, _ := res[0]["timestamp"].(int64) switch timestamps { case timestamp: open = res[0]["open_price"].(string) default: open = res[0]["close_price"].(string) } } return open } func DeleteUs() { data.Mgo_init(config.Config.Mongodb) red.RedisClient = red.RedisInit(config.Config.Redis.DbEleven) filter := bson.M{"Country": "US", "YesterdayClose": ""} projection := bson.M{"Code": 1, "Country": 1} sort := bson.M{} result, _ := data.MgoFindProjection(data.StockList, filter, projection, sort, 0) fmt.Println(len(result)) country := "US" for _, v := range result { code := v["Code"].(string) red.Hset(StockClosingPrice[country], code, "0") red.Hset(StockClosingPrice[fmt.Sprintf("%sNew", country)], code, "0") red.Hset(StockClosingPrice[fmt.Sprintf("%sBeforeClose", country)], code, "0") } } // 只保留前半天数据 func DeleteSpot(param string) { red.RedisClient = red.RedisInit(config.Config.Redis.DbEleven) times := common.TimeToNows().Add(-10 * time.Minute).UnixMilli() filter := bson.M{"timestamp": bson.M{"$lte": times}} for _, country := range dictionary.StockCodeList { tableName := data.GetStockTableName(common.CapitalizeFirstLetter(country)) if err := data.MgoDeleteMany(tableName, filter); err != nil { applogger.Error(country, "err :", err.Error()) } //只清理数据 if param == "true" { continue } for _, v := range dictionary.StockSouthAsiaListTime { tableNames := data.GetStockSouthAsiaTableName(country, v) applogger.Debug(tableNames, "start") timeHour := common.TimeToNows().Add(-48 * time.Hour).UnixMilli() switch v { case "15min": timeHour = common.TimeToNows().Add(-75 * time.Hour).UnixMilli() case "30min": timeHour = common.TimeToNows().Add(-75 * time.Hour * 2).UnixMilli() case "1hour": timeHour = common.TimeToNows().Add(-7 * time.Hour * 24 * 2).UnixMilli() case "1day": timeHour = common.TimeToNows().Add(-365 * time.Hour * 24).UnixMilli() case "1week": timeHour = common.TimeToNows().Add(-365 * time.Hour * 24).UnixMilli() case "1mon": timeHour = common.TimeToNows().Add(-365 * time.Hour * 24 * 2).UnixMilli() } fmt.Println(timeHour) filter = bson.M{"timestamp": bson.M{"$lte": timeHour}} if err := data.MgoDeleteMany(tableNames, filter); err != nil { applogger.Error(country, "err :", err.Error()) } } } applogger.Debug("delete run end") } func DeleteSpotDay(times string, ts int64) { data.Mgo_init(config.Config.Mongodb) // var stockTime = []string{ "1day", "1week", "1mon"} stocks, _, _ := GetStockAll("India", 1, 7001) for _, stock := range stocks { // for _,times:= range stockTime { tableNames := data.GetStockSouthAsiaTableName("India", times) filter := bson.M{"symbol": stock.Code, "timestamp": ts} projection := bson.M{"stringVal": 1} sort := bson.M{"stringVal": -1} highRes := data.MgoFindProjectionAggregate(tableNames, "high_price", filter, projection, sort, 1) fmt.Printf("%+v", highRes) if len(highRes) <= 0 { applogger.Info(times, ts, stock.Code, "id empty ") continue } if highRes[0].ID.String() == "" { applogger.Info(times, ts, stock.Code, "id kong ") continue } filter = bson.M{"symbol": stock.Code, "timestamp": ts, "_id": bson.M{"$ne": highRes[0].ID}} if err := data.MgoDeleteMany(tableNames, filter); err != nil { applogger.Error(times, ts, stock.Code, "err :", err.Error()) } applogger.Info(times, ts, stock.Code) // os.Exit(111) } } // 清理外汇成交报价数据信息 func DeleteForexTrade() { data.Mgo_init(config.Config.Mongodb) // 当前时间 now := time.Now() // 时间戳格式化函数 timestamp := func(t time.Time) int64 { return t.UnixNano() / int64(time.Millisecond) } // 计算5分钟前的时间戳 fiveMinutesAgo := now.Add(time.Duration(-3) * time.Minute) fiveMinutesAgoMillis := timestamp(fiveMinutesAgo) filter := bson.M{"tick_time": bson.M{"$lte": fiveMinutesAgoMillis}} if err := data.MgoDeleteMany(data.ForexTradeList, filter); err != nil { applogger.Error("DeleteForexTrade MgoDeleteMany err :", err.Error()) } } // 推送插针数据:[3:美股 4:印尼 5:马股 6:泰股 9:新加坡 11:期权印度 12:港股 14:英国 15:法国 16:德国 17:巴西 18:日本] func StockClosedData() { red.RedisClient = red.RedisInit(config.Config.Redis.DbTen) for k, v := range StockClosedDataList { // TODO: 盘前数据功能 实行全天更改数据 //if k == "US" && common.IsOpeningUS() { // applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), k, " it's opening time -----------------------------end") // continue //} else if (k == "Thailand" || k == "Indonesia" || k == "India" || k == "Singapore" || k == "Malaysia" || k == "HongKong" || k == "UK" || k == "France" || k == "Germany" || k == "Brazil") && common.IsOpening(k) { // applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), k, " it's opening time -----------------------------end") // continue //} // TODO: 插针 hashListName := fmt.Sprintf("STOCK_PRICES:%d", v) keys := red.Scan(hashListName) stockCodes := make(map[string]bool) for _, key := range keys { res, _ := red.HGetAll(key) status, _ := strconv.Atoi(res["status"]) code := res["stock_code"] applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), res) if status != StockStatusOn { continue } if k == "US" { UsStock(code, res["price"], k, true) } else { stockCode := common.GetOldCode(code) SouthAsiaSpot(code, stockCode, res["price"], k, true) } stockCodes[code] = true } FullPush(stockCodes, k, v) } } // 只全量推送旧数据,对数据不做修改 func FullPush(stockCodes map[string]bool, country string, countryNum int) { if config.Config.Redis.FullPush != 1 { return } res, _ := red.HGetAll(fmt.Sprintf("STOCK_MARKET:LIST:%d", countryNum)) applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), country, res) status, _ := strconv.Atoi(res["status"]) if status != StockStatusOn { return } if !common.IsPullOpen(country, res["am_open_time"], res["am_close_time"], res["pm_open_time"], res["pm_close_time"]) { applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), country, " it's not pull push time -----------------------------end") return } filter := bson.M{"Country": country, "YesterdayClose": bson.M{"$ne": ""}} projection := bson.M{"Code": 1, "YesterdayClose": 1} stockRes := make([]stock.StockPolygon, 0) data.MgoPagingFindStructProjection(data.StockList, filter, projection, 11000, 1, -1, &stockRes) for _, v := range stockRes { if !common.IsExistStock(country, v.Code) || stockCodes[v.Code] { applogger.Debug(country, v.Code, "not pin code") continue } key := StockClosingPrice[fmt.Sprintf("%sNew", country)] closePrice, _ := red.Hget(key, v.Code) if closePrice == "0" || closePrice == "" { closePrice = v.YesterdayClose } if country == "US" { UsStock(v.Code, closePrice, country, false) } else { stockCode := common.GetOldCode(v.Code) SouthAsiaSpot(v.Code, stockCode, closePrice, country, false) } } } func CheckNewPrice(code, country, price string) { key := StockClosingPrice[fmt.Sprintf("%sNew", country)] value, _ := red.Hget(key, code) applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), code, country, price, value, "CheckNewPrice") if value != "" && value != "0" && value == price { //value 不是 0 并且 跟盘前价格一致 red.Hset(key, code, "0") } } // 推送目标美股插针数据行情 func UsStock(code, price, country string, isUpdate bool) { message := &model.ClientMessage{ S: code, // 股票代码 C: []decimal.Decimal{decimal.NewFromInt(0), decimal.NewFromFloat(1)}, // 条件,有关更多信息,请参阅贸易条件术语表 V: common.CalculateContractPrices(decimal.NewFromInt(int64(100)), float64(0.02), 0, 1)[0].IntPart(), // 交易量,代表在相应时间戳处交易的股票数量 -- 报价交易量 Dp: true, // 暗池真/假 Ms: "open", // 市场状态,指示股票市场的当前状态(“开盘”、“收盘”、“延长交易时间”) T: time.Now().UnixMilli(), // 以毫秒为单位的时间戳 -- 此聚合窗口的结束时钟周期的时间戳(以 Unix 毫秒为单位) Cl: decimal.RequireFromString(price), // 此聚合窗口的收盘价 A: decimal.NewFromInt(11), // 今天的成交量加权平均价格 Se: time.Now().UnixMilli(), H: decimal.RequireFromString(price), // 此聚合窗口的最高逐笔报价 L: decimal.RequireFromString(price), // 此聚合窗口的最低价格变动价格 Op: decimal.RequireFromString(price), // 今天正式开盘价格 // P: decimal.RequireFromString(price), ClosingMarket: true, } msgStr, err := json.Marshal(message) if err != nil { applogger.Error("json.Marshal err: %v", err) return } if isUpdate { red.Hset(StockClosingPrice[fmt.Sprintf("%sNew", country)], code, price) } applogger.Info("last date info: %v", string(msgStr)) red.RedisClient.Publish(fmt.Sprintf("%s.US", message.S), string(msgStr)) } // 推送目标市场股票插针数据行情 func SouthAsiaSpot(symbol, stockCode, price, country string, isUpdate bool) { prices, _ := strconv.ParseFloat(price, 64) param := model.StockParam{ Symbol: symbol, StockCode: stockCode, StockName: "", Price: prices, UpDownRate: decimal.NewFromInt(0), UpDown: decimal.NewFromInt(0), TradeV: decimal.NewFromInt(0), TradeK: "买入", Country: strings.ToLower(country), Ts: time.Now().UnixMilli(), ClosingMarket: true, } param.Token = "" msgStr, err := json.Marshal(param) if err != nil { applogger.Error("json.Marshal err: %v", err) return } applogger.Info("last date info: %v", string(msgStr)) // Write to Redis for broadcasting red.RedisClient.Publish(fmt.Sprintf("%s.%s", param.Symbol, country), string(msgStr)) if isUpdate { red.Hset(StockClosingPrice[fmt.Sprintf("%sNew", country)], symbol, price) } } // 东南亚聚合行情 func TickSouthAsiaSpotKline(stockName string) { if !common.IsOpening(stockName) { applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), "it's not opening time -----------------------------end") return } start := time.Now() // 获取当前时间 applogger.Info(common.TimeToNows().Format("2006-01-02 15:04:05"), "start TickSouthAsiaSpotKline") // 五分钟聚合 FiveMinTo := common.GenerateSingaporeFiveMinTimestampOrigin() * 1000 FiveMinFrom := FiveMinTo - int64(5*60*1000) // 十五分钟聚合 FifteenMinTo := common.GenerateSingaporeFifteenMinTimestampOrigin() * 1000 FifteenMinFrom := FifteenMinTo - int64(15*60*1000) // 三十分钟聚合 ThirtyMinTo := common.GenerateSingaporeThirtyMinTimestampOrigin() * 1000 ThirtyMinToForm := ThirtyMinTo - int64(30*60*1000) // 小时聚合 HourFrom := common.GenerateSingaporeHourTimestampOrigin() * 1000 HourTo := HourFrom + int64(60*60*1000) // 天聚合 DayFrom := common.GenerateSingaporeDayTimestamp(stockName) * 1000 DayTo := common.TimeToNow() * 1000 // 周聚合 WeekFrom := common.GetWeekTimestamp() * 1000 WeekTo := common.TimeToNow() * 1000 // 月聚合 MonFrom := common.GenerateSingaporeMonTimestampStock(stockName) * 1000 MonTo := common.TimeToNow() * 1000 //并发过高 拆分 stocks, _, pageTotal := GetStockAll(stockName, 1, PageSize) wg := sync.WaitGroup{} for _, value := range stocks { wg.Add(1) go func(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo int64, value stock.StockPolygon) { defer wg.Done() applogger.Info(common.TimeToNows().Format("2006-01-02 15:04:05"), "start ", value.Code, value.Locale) if err := GetTimeNewPrice(value.Code, FiveMinFrom, FiveMinTo, value.Locale, "5min"); err != nil { applogger.Error(err.Error(), "run end") return } GetTimeNewPriceAll(value.Code, FifteenMinFrom, FifteenMinTo, value.Locale, "15min", "5min") GetTimeNewPriceAll(value.Code, ThirtyMinToForm, ThirtyMinTo, value.Locale, "30min", "15min") GetTimeNewPriceAll(value.Code, HourFrom, HourTo, value.Locale, "1hour", "30min") GetTimeNewPriceAll(value.Code, DayFrom, DayTo, value.Locale, "1day", "1hour") GetTimeNewPriceAll(value.Code, WeekFrom, WeekTo, value.Locale, "1week", "1day") GetTimeNewPriceAll(value.Code, MonFrom, MonTo, value.Locale, "1mon", "1week") }(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo, value) } wg.Wait() for i := int64(2); i <= pageTotal; i++ { stocks, _, _ := GetStockAll(stockName, i, PageSize) wg := sync.WaitGroup{} for _, value := range stocks { wg.Add(1) go func(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo int64, value stock.StockPolygon) { defer wg.Done() applogger.Info(common.TimeToNows().Format("2006-01-02 15:04:05"), "start ", value.Code, value.Locale) if err := GetTimeNewPrice(value.Code, FiveMinFrom, FiveMinTo, value.Locale, "5min"); err != nil { applogger.Error(err.Error(), "run end") return } GetTimeNewPriceAll(value.Code, FifteenMinFrom, FifteenMinTo, value.Locale, "15min", "5min") GetTimeNewPriceAll(value.Code, ThirtyMinToForm, ThirtyMinTo, value.Locale, "30min", "15min") GetTimeNewPriceAll(value.Code, HourFrom, HourTo, value.Locale, "1hour", "30min") GetTimeNewPriceAll(value.Code, DayFrom, DayTo, value.Locale, "1day", "1hour") GetTimeNewPriceAll(value.Code, WeekFrom, WeekTo, value.Locale, "1week", "1day") GetTimeNewPriceAll(value.Code, MonFrom, MonTo, value.Locale, "1mon", "1week") }(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo, value) } wg.Wait() } fmt.Println("Run time: ", time.Since(start)) // TODO: 删除已画好的数据,提高查询速度 filter := bson.M{"timestamp": bson.M{"$lte": FiveMinTo}} tableName := data.GetStockTableName(stockName) data.MgoDeleteMany(tableName, filter) applogger.Info(tableName, FiveMinTo, "DELETE INFO") } // 指数数据聚合 func TickSpotIndexKline() { start := time.Now() // 获取当前时间 applogger.Info(common.TimeToNows().Format("2006-01-02 15:04:05"), "start TickSpotIndexKline") // 五分钟聚合 FiveMinTo := common.GenerateSingaporeFiveMinTimestampOrigin() * 1000 FiveMinFrom := FiveMinTo - int64(5*60*1000) // 十五分钟聚合 FifteenMinTo := common.GenerateSingaporeFifteenMinTimestampOrigin() * 1000 FifteenMinFrom := FifteenMinTo - int64(15*60*1000) // 三十分钟聚合 ThirtyMinTo := common.GenerateSingaporeThirtyMinTimestampOrigin() * 1000 ThirtyMinToForm := ThirtyMinTo - int64(30*60*1000) // 小时聚合 HourFrom := common.GenerateSingaporeHourTimestampOrigin() * 1000 HourTo := HourFrom + int64(60*60*1000) // 天聚合 DayFrom := common.GenerateSingaporeDayTimestamp("") * 1000 DayTo := common.TimeToNow() * 1000 // 周聚合 WeekFrom := common.GetWeekTimestamp() * 1000 WeekTo := common.TimeToNow() * 1000 // 月聚合 MonFrom := common.GenerateSingaporeMonTimestampStock("") * 1000 MonTo := common.TimeToNow() * 1000 //并发过高 拆分 filter := bson.M{"State": common.StockIndexOn} res := make([]stock.StockIndexPolygon, 0) data.MgoFindStockRes(data.StockIndexList, filter, &res) wg := sync.WaitGroup{} for _, value := range res { wg.Add(1) go func(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo int64, value stock.StockIndexPolygon) { defer wg.Done() applogger.Info(common.TimeToNows().Format("2006-01-02 15:04:05"), "start ", value.Code, value.Locale) if err := GetTimeNewIndexPrice(value.Code, FiveMinFrom, FiveMinTo, value.Locale, "5min"); err != nil { applogger.Error(err.Error(), "run end") return } GetTimeNewIndexPriceAll(value.Code, FifteenMinFrom, FifteenMinTo, value.Locale, "15min", "5min") GetTimeNewIndexPriceAll(value.Code, ThirtyMinToForm, ThirtyMinTo, value.Locale, "30min", "15min") GetTimeNewIndexPriceAll(value.Code, HourFrom, HourTo, value.Locale, "1hour", "30min") GetTimeNewIndexPriceAll(value.Code, DayFrom, DayTo, value.Locale, "1day", "1hour") GetTimeNewIndexPriceAll(value.Code, WeekFrom, WeekTo, value.Locale, "1week", "1day") GetTimeNewIndexPriceAll(value.Code, MonFrom, MonTo, value.Locale, "1mon", "1week") }(FiveMinTo, FiveMinFrom, FifteenMinTo, FifteenMinFrom, ThirtyMinTo, ThirtyMinToForm, HourFrom, HourTo, DayFrom, DayTo, WeekFrom, WeekTo, MonFrom, MonTo, value) } wg.Wait() fmt.Println("Run time: ", time.Since(start)) } func GetTimeNewIndexPrice(symbol string, from, to int64, country, period string) error { //fmt.Println("country", country, "period", period, "from", common.ConvertToTimeStr(from/1000), from, "to", common.ConvertToTimeStr(to/1000), to) filter := bson.M{"stock_code": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}} tableName := data.GetStockIndexTableName() highRes := make([]model.StockIndexParam, 0) projection := bson.M{"price": 1} sort := bson.M{"price": -1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &highRes, 1) if len(highRes) <= 0 { applogger.Error(symbol+" no data", period) return errors.New(symbol + " no data") } high := highRes[0].Price lowRes := make([]model.StockIndexParam, 0) sort = bson.M{"price": 1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &lowRes, 1) low := lowRes[0].Price openRes := make([]model.StockIndexParam, 0) sort = bson.M{"timestamp": 1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &openRes, 1) open := openRes[0].Price closeRes := make([]model.StockIndexParam, 0) projection = bson.M{} sort = bson.M{"timestamp": -1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &closeRes, 1) var dataList []mongo.WriteModel filter = bson.M{"timestamp": bson.M{"$eq": from}, "stock_code": bson.M{"$eq": symbol}} update := bson.D{{"$set", bson.D{ {"stock_code", closeRes[0].StockCode}, {"stock_name", closeRes[0].StockName}, {"open_price", fmt.Sprintf("%f", open)}, {"high_price", fmt.Sprintf("%f", high)}, {"low_price", fmt.Sprintf("%f", low)}, {"close_price", fmt.Sprintf("%f", closeRes[0].Price)}, {"up_down_rate", closeRes[0].UpDownRate}, {"up_down", closeRes[0].UpDown}, {"vol", closeRes[0].Vol}, {"country", closeRes[0].Country}, {"timestamp", from}, }}} applogger.Info("GetTimeNewPrice info: %v", update) models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) if err := data.MgoBulkWrite(data.GetStockIndixKlineTableName(period), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) return err } return nil } func GetTimeNewIndexPriceAll(symbol string, from, to int64, country, period, periodPre string) { filter := bson.M{"stock_code": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}} tableName := data.GetStockIndixKlineTableName(periodPre) res := make([]model.StockMogoParam, 0) projection := bson.M{"symbol": 1, "stock_code": 1, "stock_name": 1, "open_price": 1, "high_price": 1, "low_price": 1, "close_price": 1, "vol": 1, "country": 1, "timestamp": 1} sort := bson.M{"timestamp": 1} data.MgoFindProjectionRes(tableName, filter, projection, sort, &res, 0) if len(res) <= 0 { applogger.Error(symbol+" no data", period) return } var low, high, vol decimal.Decimal for key, v := range res { lows, _ := decimal.NewFromString(v.LowPrice) highs, _ := decimal.NewFromString(v.HighPrice) var vols decimal.Decimal if key == 0 { low = lows high = highs vol = vols continue } vol = vol.Add(vols) if low.GreaterThan(lows) { low = lows } if high.LessThan(highs) { high = highs } } l := len(res) - 1 var dataList []mongo.WriteModel ts := from open := res[0].OpenPrice opens, _ := decimal.NewFromString(open) if low.GreaterThan(opens) && !opens.Equal(decimal.NewFromInt(0)) { low = opens } if high.LessThan(opens) { high = opens } filter = bson.M{"timestamp": bson.M{"$eq": ts}, "stock_code": bson.M{"$eq": symbol}} update := bson.D{{"$set", bson.D{ {"symbol", res[l].Symbol}, {"stock_code", res[l].StockCode}, {"stock_name", res[l].StockName}, {"open_price", open}, {"high_price", high.String()}, {"low_price", low.String()}, {"close_price", res[l].ClosePrice}, {"vol", res[l].Vol}, {"country", res[l].Country}, {"timestamp", ts}, }}} applogger.Info("GetTimeNewPriceAll info: %v", update) models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) if err := data.MgoBulkWrite(data.GetStockIndixKlineTableName(period), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) } } // 控制插针数据不推送 func NewPinStock(noPin map[string]bool) { for { fmt.Println(noPin) pinStockMutex.Lock() for k, v := range StockClosedDataList { hashListName := fmt.Sprintf("STOCK_PRICES:%d", v) dbs := red.ScanMap(hashListName) dbData := make(map[string]map[string]bool) for db, keys := range dbs { //不用插针服务 if noPin[db] { applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), db, "no pin 不用插针") continue } stockCode := make(map[string]bool) for _, key := range keys { res, _ := red.HGetAllMap(db, key) applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), db, res) status, _ := strconv.Atoi(res["status"]) code := res["stock_code"] if status == StockStatusOn { stockCode[code] = true } } dbData[db] = stockCode } pinStock[k] = dbData } pinStockMutex.Unlock() applogger.Info("pin stock :%v", pinStock) time.Sleep(1 * time.Minute) } } func getPinStock(db, country string) map[string]bool { pinStockMutex.RLock() defer pinStockMutex.RUnlock() return pinStock[country][db] } func JudgePublishMap(country, code, channel string, message interface{}) { for k, db := range red.RedisClientMap { if getPinStock(k, country)[code] { applogger.Debug(k, code, "pin stock", message) continue } applogger.Info("channel", channel, "DB", k) err := db.Publish(channel, message).Err() if err != nil { fmt.Println("db", k, "存储失败:", err) } } } func JudgeHsetMap(country, key, field string, value interface{}) { for k, db := range red.RedisClientMap { if getPinStock(k, country)[field] { applogger.Debug(k, field, "pin stock", value) continue } applogger.Info("key", key, "field", field, "value", value, "DB", k) err := db.HSet(key, field, value).Err() if err != nil { fmt.Println("db", k, "存储失败:", err) } } } func StockWs(param model.StockParam, country string) { param.Token = "" msgStr, err := json.Marshal(param) if err != nil { applogger.Error("json.Marshal err: %v", err) return } //applogger.Info("last date info: %v", string(msgStr)) // Write to Redis for broadcasting JudgePublishMap(country, param.Symbol, fmt.Sprintf("%s.%s", param.Symbol, country), string(msgStr)) }