package processor import ( "encoding/json" "fmt" "" "" "" "net/http" "reflect" "strings" "time" "wss-pool/cmd/common" "wss-pool/cmd/websocketservice" "wss-pool/internal" "wss-pool/internal/data" red "wss-pool/internal/redis" "wss-pool/pkg/model/stock" ) const ( KlinePageSize = 100 SpotsStatus int = 1 ContractStatus int = 2 ) // 现货历史记录 // contract_code 合约代码 或 合约标识 // period K线类型 "1min", "5min", "15min", "30min", "60min", "4hour", "1day", "1mon", "1week", "1year" // size 获取数量,默认150 [1,2000] // from 开始时间戳 10位 单位S // to 结束时间戳 10位 单位S func SpotKLineList(c *gin.Context) { symbol := internal.ReplaceStr(c.Query("symbol")) // 币种 period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度 size := internal.IntegerInit(internal.ReplaceStr(c.Query("size"))) from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间 to := internal.IntegerInit(internal.ReplaceStr(c.Query("to"))) // 结束时间 if symbol == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } else if period == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError)) return } filter := bson.M{"channel": fmt.Sprintf("market.%susdt.kline.%s", symbol, period), "code": bson.M{"$gte": from, "$lte": to}} if size > 0 { filter = bson.M{"channel": fmt.Sprintf("market.%susdt.kline.%s", symbol, period)} } tableName := data.GetStockKLineTableName(period) //applogger.Debug("查询数据总数: %v", total) pagedData, err := data.MgoLimitFind(tableName, filter, int64(size)) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = removeDuplicates(pagedData) md.Data = common.MarshalToJsonWithGzip(md.Data) c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } func removeDuplicates(pagedData []data.MongoTick) []data.MongoTick { result := make([]data.MongoTick, 0) keys := make([]int64, 0) resMap := make(map[int64]data.MongoTick) for _, doc := range pagedData { resMap[doc.Code] = doc keys = append(keys, doc.Code) } for _, v := range keys { result = append(result, resMap[v]) } return result } // 合约价格历史记录 func ContractPriceKLineList(c *gin.Context) { contractCode := strings.ToUpper(internal.ReplaceStr(c.Query("symbol"))) // 合约 from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间 to := internal.IntegerInit(internal.ReplaceStr(c.Query("to"))) // 结束时间 period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度 size := internal.IntegerInit(internal.ReplaceStr(c.Query("size"))) // 结束时间 if contractCode == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } else if period == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError)) return } //bodyStr, err := internal.HttpGet(fmt.Sprintf("", contractCode, period)) //if err != nil { // c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) // return //} //result := business.SpotKlineRes{} //if err = json.Unmarshal([]byte(bodyStr), &result); err != nil { // applogger.Error("Unmarshal err: %v---%v", err) //} //var md stock.MgoPageSize //md.Data = result //c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) //else if from == 0 { // c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "from error", internal.QueryError)) // return //} else if to == 0 { // c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "to error", internal.QueryError)) // return //} filter := bson.M{"channel": fmt.Sprintf("market.%s-USDT.mark_price.%s", contractCode, period), "code": bson.M{"$gte": from, "$lte": to}} if size > 0 { filter = bson.M{"channel": fmt.Sprintf("market.%s-USDT.mark_price.%s", contractCode, period)} } tableName := data.GetContractPriceKLineTableName(period) //applogger.Debug("查询数据总数: %v", total) pagedData, err := data.MgoLimitFind(tableName, filter, int64(size)) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = removeDuplicates(pagedData) c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } // 合约历史记录 func ContractKLineList(c *gin.Context) { contractCode := strings.ToUpper(internal.ReplaceStr(c.Query("symbol"))) // 合约 from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间 to := internal.IntegerInit(internal.ReplaceStr(c.Query("to"))) // 结束时间 period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度 size := internal.IntegerInit(internal.ReplaceStr(c.Query("size"))) // 结束时间 if contractCode == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } else if period == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError)) return } //url := fmt.Sprintf("", contractCode, period, from, to) //if size > 0 { // url = fmt.Sprintf("", contractCode, period, size) //} //bodyStr, err := internal.HttpGet(url) //if err != nil { // c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) // return //} //result := business.ContractKlineRes{} //if err = json.Unmarshal([]byte(bodyStr), &result); err != nil { // applogger.Error("Unmarshal err: %v---%v", err) //} //var md stock.MgoPageSize //md.Data = result //c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) filter := bson.M{"channel": fmt.Sprintf("market.%s-USDT.kline.%s", contractCode, period), "code": bson.M{"$gte": from, "$lte": to}} if size > 0 { filter = bson.M{"channel": fmt.Sprintf("market.%s-USDT.kline.%s", contractCode, period)} } tableName := data.GetContractKLineTableName(period) pagedData, err := data.MgoLimitFind(tableName, filter, int64(size)) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = removeDuplicates(pagedData) md.Data = common.MarshalToJsonWithGzip(md.Data) c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } // 查询价格 func InquiryPrice(c *gin.Context) { contractCode := internal.ReplaceStr(c.Query("symbol")) timeStr := c.Query("time") // 结束时间 if contractCode == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } else if timeStr == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "timeStr error", internal.QueryError)) return } timestamp, err := common.TimeStrToTimestamp(timeStr) fmt.Println(timestamp) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err.Error(), internal.QueryError)) return } ok := strings.Contains(contractCode, "-USDT") filter := bson.M{"channel": fmt.Sprintf("market.%s.kline.1min", contractCode), "code": timestamp} tableName := data.GetContractKLineTableName("1min") if !ok { tableName = data.GetStockKLineTableName("1min") } pagedData, err := data.MgoLimitFind(tableName, filter, int64(0)) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = removeDuplicates(pagedData) c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } // 美股查询记录 func HistoryUsList(c *gin.Context) { code := strings.ToUpper(internal.ReplaceStr(c.Query("code"))) // 股票代码 pageSize := internal.IntegerInit(internal.ReplaceStr(c.Query("pageSize"))) // 每页显示多少条数据 pageNumber := internal.IntegerInit(internal.ReplaceStr(c.Query("pageNumber"))) // 第几页 period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度 from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间 if pageNumber <= 0 { pageNumber = 1 } if pageSize <= 0 { pageSize = KlinePageSize } if code == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "code error", internal.QueryError)) return } else if period == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError)) return } if from == 0 { from = int(time.Now().Unix()) } filter := bson.M{"code": code, "timestamp": bson.M{"$lte": from}} tableName := data.GetStockUsTableName(period) total, err := data.MgoFindTotal(tableName, filter) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } //applogger.Debug("查询数据总数: %v", total) pagedData, err := data.MgoPagingFind(tableName, filter, int64(pageSize), int64(pageNumber), -1) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = pagedData md.PageSize = pageSize md.PageNumber = pageNumber md.Total = total c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } func removeWsconn(conn interface{}, userConn *websocket.Conn, symbol string) error { connRes := make([]*websocket.Conn, 0) value := reflect.ValueOf(conn) if value.Kind() != reflect.Slice && value.Kind() != reflect.Array { return nil } for i := 0; i < value.Len(); i++ { val := value.Index(i).Interface().(*websocket.Conn) if val != userConn { connRes = append(connRes, val) } } return nil } // 现货|合约|秒合约列表查询 func MainSpotList(c *gin.Context) { market_type := internal.IntegerInit(internal.ReplaceStr(c.Query("marketType"))) if market_type == 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "marketType error", internal.QueryError)) return } result := make([]interface{}, 0) var md stock.MgoPageSize resMap := make(map[string]interface{}) if market_type == SpotsStatus { // spot := websocketservice.HashValue(websocketservice.RedisDIGITAL) websocketservice.SpotMarketCache.Range( func(key, value any) bool { //for _, val := range spot { //if strings.Contains(key.(string), fmt.Sprintf("%susdt", strings.ToLower(val.Name))) && value != nil { resMap[key.(string)] = value //} return true }) if _, ok := resMap["btc"]; ok { result = append(result, resMap["btc"]) } if _, ok := resMap["eth"]; ok { result = append(result, resMap["eth"]) } if _, ok := resMap["bnb"]; ok { result = append(result, resMap["bnb"]) } if _, ok := resMap["usdc"]; ok { result = append(result, resMap["usdc"]) } if _, ok := resMap["xrp"]; ok { result = append(result, resMap["xrp"]) } if _, ok := resMap["ada"]; ok { result = append(result, resMap["ada"]) } if _, ok := resMap["doge"]; ok { result = append(result, resMap["doge"]) } if _, ok := resMap["sol"]; ok { result = append(result, resMap["sol"]) } if _, ok := resMap["trx"]; ok { result = append(result, resMap["trx"]) } if _, ok := resMap["ltc"]; ok { result = append(result, resMap["ltc"]) } if _, ok := resMap["dot"]; ok { result = append(result, resMap["dot"]) } if _, ok := resMap["matic"]; ok { result = append(result, resMap["matic"]) } if _, ok := resMap["bch"]; ok { result = append(result, resMap["bch"]) } if _, ok := resMap["eos"]; ok { result = append(result, resMap["eos"]) } if _, ok := resMap["ton"]; ok { result = append(result, resMap["ton"]) } if _, ok := resMap["avax"]; ok { result = append(result, resMap["avax"]) } if _, ok := resMap["shib"]; ok { result = append(result, resMap["shib"]) } if _, ok := resMap["invu"]; ok { result = append(result, resMap["invu"]) } if _, ok := resMap["osel"]; ok { result = append(result, resMap["osel"]) } if _, ok := resMap["fmd"]; ok { result = append(result, resMap["fmd"]) } if _, ok := resMap["dten"]; ok { result = append(result, resMap["dten"]) } if _, ok := resMap["xnsl"]; ok { result = append(result, resMap["xnsl"]) } if _, ok := resMap["kools"]; ok { result = append(result, resMap["kools"]) } md.Data = result c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) return } // contract := websocketservice.HashValue(websocketservice.RedisCONTRACT) websocketservice.ContractCache.Range( func(key, value any) bool { //for _, val := range contract { // if strings.Contains(key.(string), val.Name) && value != nil { resMap[key.(string)] = value // } // } return true }) if _, ok := resMap["BTC-USDT"]; ok { result = append(result, resMap["BTC-USDT"]) } if _, ok := resMap["ETH-USDT"]; ok { result = append(result, resMap["ETH-USDT"]) } if _, ok := resMap["BCH-USDT"]; ok { result = append(result, resMap["BCH-USDT"]) } if _, ok := resMap["XRP-USDT"]; ok { result = append(result, resMap["XRP-USDT"]) } if _, ok := resMap["EOS-USDT"]; ok { result = append(result, resMap["EOS-USDT"]) } if _, ok := resMap["LTC-USDT"]; ok { result = append(result, resMap["LTC-USDT"]) } if _, ok := resMap["TRX-USDT"]; ok { result = append(result, resMap["TRX-USDT"]) } if _, ok := resMap["ETC-USDT"]; ok { result = append(result, resMap["ETC-USDT"]) } if _, ok := resMap["LINK-USDT"]; ok { result = append(result, resMap["LINK-USDT"]) } if _, ok := resMap["BNB-USDT"]; ok { result = append(result, resMap["BNB-USDT"]) } if _, ok := resMap["ADA-USDT"]; ok { result = append(result, resMap["ADA-USDT"]) } if _, ok := resMap["DOGE-USDT"]; ok { result = append(result, resMap["DOGE-USDT"]) } if _, ok := resMap["SOL-USDT"]; ok { result = append(result, resMap["SOL-USDT"]) } if _, ok := resMap["DOT-USDT"]; ok { result = append(result, resMap["DOT-USDT"]) } if _, ok := resMap["MATIC-USDT"]; ok { result = append(result, resMap["MATIC-USDT"]) } if _, ok := resMap["AVAX-USDT"]; ok { result = append(result, resMap["AVAX-USDT"]) } if _, ok := resMap["SHIB-USDT"]; ok { result = append(result, resMap["SHIB-USDT"]) } if _, ok := resMap["BNBS-USDT"]; ok { result = append(result, resMap["BNBS-USDT"]) } if _, ok := resMap["INVU-USDT"]; ok { result = append(result, resMap["INVU-USDT"]) } if _, ok := resMap["OSEL-USDT"]; ok { result = append(result, resMap["OSEL-USDT"]) } if _, ok := resMap["FMD-USDT"]; ok { result = append(result, resMap["FMD-USDT"]) } if _, ok := resMap["DTEN-USDT"]; ok { result = append(result, resMap["DTEN-USDT"]) } if _, ok := resMap["XNSL-USDT"]; ok { result = append(result, resMap["XNSL-USDT"]) } if _, ok := resMap["KOOLS-USDT"]; ok { result = append(result, resMap["KOOLS-USDT"]) } md.Data = result c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } // 现货|合约|秒合约|自选列表查询 func MainFreeSpotList(c *gin.Context) { id := internal.ReplaceStr(c.Query("id")) market_type := internal.IntegerInit(internal.ReplaceStr(c.Query("marketType"))) if market_type == 0 || len(id) == 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "marketType or userId error", internal.QueryError)) return } var md stock.MgoPageSize result := make([]interface{}, 0) md.Data = result userIdKey := fmt.Sprintf("%v%v", FreeSymbolKey, id) resultStr, err := red.Get_Cache_Byte(userIdKey) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QueryError)) return } var freeList []StockSymbol if err = json.Unmarshal(resultStr, &freeList); err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QueryError)) return } // 组合需要查询自选缓存股票code var symbolList []string for _, value := range freeList { if market_type == value.MarketType { symbolList = append(symbolList, value.Code) } } resMap := make(map[string]interface{}) switch market_type { case SpotsStatus: websocketservice.SpotMarketCache.Range(func(key, value any) bool { for _, vue := range symbolList { if vue == key.(string) { resMap[vue] = value } } return true }) if _, ok := resMap["btc"]; ok { result = append(result, resMap["btc"]) } if _, ok := resMap["eth"]; ok { result = append(result, resMap["eth"]) } if _, ok := resMap["bnb"]; ok { result = append(result, resMap["bnb"]) } if _, ok := resMap["usdc"]; ok { result = append(result, resMap["usdc"]) } if _, ok := resMap["xrp"]; ok { result = append(result, resMap["xrp"]) } if _, ok := resMap["ada"]; ok { result = append(result, resMap["ada"]) } if _, ok := resMap["doge"]; ok { result = append(result, resMap["doge"]) } if _, ok := resMap["sol"]; ok { result = append(result, resMap["sol"]) } if _, ok := resMap["trx"]; ok { result = append(result, resMap["trx"]) } if _, ok := resMap["ltc"]; ok { result = append(result, resMap["ltc"]) } if _, ok := resMap["dot"]; ok { result = append(result, resMap["dot"]) } if _, ok := resMap["matic"]; ok { result = append(result, resMap["matic"]) } if _, ok := resMap["bch"]; ok { result = append(result, resMap["bch"]) } if _, ok := resMap["eos"]; ok { result = append(result, resMap["eos"]) } if _, ok := resMap["ton"]; ok { result = append(result, resMap["ton"]) } if _, ok := resMap["avax"]; ok { result = append(result, resMap["avax"]) } if _, ok := resMap["shib"]; ok { result = append(result, resMap["shib"]) } if _, ok := resMap["invu"]; ok { result = append(result, resMap["invu"]) } if _, ok := resMap["osel"]; ok { result = append(result, resMap["osel"]) } if _, ok := resMap["fmd"]; ok { result = append(result, resMap["fmd"]) } if _, ok := resMap["dten"]; ok { result = append(result, resMap["dten"]) } if _, ok := resMap["xnsl"]; ok { result = append(result, resMap["xnsl"]) } if _, ok := resMap["kools"]; ok { result = append(result, resMap["kools"]) } md.Data = result c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) default: websocketservice.ContractCache.Range( func(key, value any) bool { for _, vue := range symbolList { if vue == key.(string) { resMap[vue] = value } } return true }) if _, ok := resMap["BTC-USDT"]; ok { result = append(result, resMap["BTC-USDT"]) } if _, ok := resMap["ETH-USDT"]; ok { result = append(result, resMap["ETH-USDT"]) } if _, ok := resMap["BCH-USDT"]; ok { result = append(result, resMap["BCH-USDT"]) } if _, ok := resMap["XRP-USDT"]; ok { result = append(result, resMap["XRP-USDT"]) } if _, ok := resMap["EOS-USDT"]; ok { result = append(result, resMap["EOS-USDT"]) } if _, ok := resMap["LTC-USDT"]; ok { result = append(result, resMap["LTC-USDT"]) } if _, ok := resMap["TRX-USDT"]; ok { result = append(result, resMap["TRX-USDT"]) } if _, ok := resMap["ETC-USDT"]; ok { result = append(result, resMap["ETC-USDT"]) } if _, ok := resMap["LINK-USDT"]; ok { result = append(result, resMap["LINK-USDT"]) } if _, ok := resMap["BNB-USDT"]; ok { result = append(result, resMap["BNB-USDT"]) } if _, ok := resMap["ADA-USDT"]; ok { result = append(result, resMap["ADA-USDT"]) } if _, ok := resMap["DOGE-USDT"]; ok { result = append(result, resMap["DOGE-USDT"]) } if _, ok := resMap["SOL-USDT"]; ok { result = append(result, resMap["SOL-USDT"]) } if _, ok := resMap["DOT-USDT"]; ok { result = append(result, resMap["DOT-USDT"]) } if _, ok := resMap["MATIC-USDT"]; ok { result = append(result, resMap["MATIC-USDT"]) } if _, ok := resMap["AVAX-USDT"]; ok { result = append(result, resMap["AVAX-USDT"]) } if _, ok := resMap["SHIB-USDT"]; ok { result = append(result, resMap["SHIB-USDT"]) } if _, ok := resMap["BNBS-USDT"]; ok { result = append(result, resMap["BNBS-USDT"]) } if _, ok := resMap["INVU-USDT"]; ok { result = append(result, resMap["INVU-USDT"]) } if _, ok := resMap["OSEL-USDT"]; ok { result = append(result, resMap["OSEL-USDT"]) } if _, ok := resMap["FMD-USDT"]; ok { result = append(result, resMap["FMD-USDT"]) } if _, ok := resMap["DTEN-USDT"]; ok { result = append(result, resMap["DTEN-USDT"]) } if _, ok := resMap["XNSL-USDT"]; ok { result = append(result, resMap["XNSL-USDT"]) } if _, ok := resMap["KOOLS-USDT"]; ok { result = append(result, resMap["KOOLS-USDT"]) } md.Data = result c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } } func IntroList(c *gin.Context) { symbol := strings.ToUpper(internal.ReplaceStr(c.Query("symbol"))) // 交易对 if symbol == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } var md stock.MgoPageSize md.Data = Intro[symbol] c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) }