package processor import ( "fmt" "github.com/gin-gonic/gin" "github.com/shopspring/decimal" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "net/http" "strconv" "strings" "wss-pool/cmd/common" "wss-pool/internal" "wss-pool/internal/data" "wss-pool/internal/data/business" red "wss-pool/internal/redis" "wss-pool/logging/applogger" "wss-pool/pkg/model" "wss-pool/pkg/model/stock" ) func StockIndexInfoAdd(c *gin.Context) { param := make([]model.StockIndexParam, 0) err := c.BindJSON(¶m) total := len(param) if err != nil { applogger.Error("BindJSON", err.Error()) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError)) return } else if total <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError)) return } else if param[0].Token != Token { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError)) return } else if param[0].Country == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "country error", internal.QueryError)) return } country := common.CapitalizeFirstLetter(param[0].Country) if country == "Us" { country = "US" } var dataList []mongo.WriteModel applogger.Debug("stock info add param total: %d", total) for _, v := range param { price := strconv.FormatFloat(v.Price, 'f', 4, 64) v.Price, err = strconv.ParseFloat(price, 64) if err != nil { applogger.Error(v.Country, v.StockCode, err) } if !business.IsPriceTime(v.StockCode, price, common.StockIndexPrefix) { continue } business.StockPyWsStockIndex(v, common.StockIndexPrefix) //更新最新价格 red.HsetMap(business.StockClosingPrice["StockIndexNew"], v.StockCode, price) filter := bson.M{"timestamp": bson.M{"$eq": v.Ts}, "stock_code": bson.M{"$eq": v.StockCode}} update := bson.D{{"$set", bson.D{ {"stock_code", v.StockCode}, {"stock_name", v.StockName}, {"price", v.Price}, {"high_price", v.HighPrice}, {"low_price", v.LowPrice}, {"open_price", v.OpenPrice}, {"up_down_rate", v.UpDownRate}, {"up_down", v.UpDown}, {"vol", v.Vol}, {"price_code", v.PriceCode}, {"country", v.Country}, {"timestamp", v.Ts}, }}} models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) } if len(dataList) > 0 { if err := data.MgoBulkWrite(data.GetStockIndexTableName(), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) } } c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok")) } func StockIndexListAdd(c *gin.Context) { param := stock.StockIndexList{} err := c.BindJSON(¶m) fmt.Println(param) if err != nil { applogger.Error("BindJSON", err.Error()) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError)) return } else if len(param.Results) <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError)) return } else if param.Token != Token { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError)) return } else if param.Results[0].Code == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError)) return } else if param.Results[0].Locale == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError)) return } var dataList []mongo.WriteModel for _, value := range param.Results { locale := common.CapitalizeFirstLetter(value.Locale) if locale == "Us" { locale = "US" } if !common.IsLetter(value.Code) { applogger.Debug(value.Code, "包含其他符号") continue } filter := bson.D{{"Code", bson.M{ "$eq": value.Code, }}, {"Country", bson.M{ "$eq": locale, }}} update := bson.D{{"$set", bson.D{ {"Code", value.Code}, {"Name", value.Name}, {"Country", locale}, {"Exchange", value.PrimaryExchange}, {"Currency", value.Currency}, {"State", common.StockIndexOn}, {"Intro", value.Intro}}}} //第二次 if value.Currency == "" && value.YesterdayClose != "" { _, err := decimal.NewFromString(value.YesterdayClose) if err != nil { applogger.Debug(value.Code, value.YesterdayClose, err.Error()) continue } value.DateStr = strings.TrimSpace(value.DateStr) //if common.TimeToNows().Format("2006-01-02") != value.DateStr { // applogger.Error(value.Code, "不是今日的闭盘价") // continue //} update = bson.D{{"$set", bson.D{ {"YesterdayClose", value.YesterdayClose}, //上一次 闭盘价 {"BeforeClose", value.BeforeClose}, {"ClosePrice", "0"}, {"Vol", value.Vol}, {"DateStr", value.DateStr}, }}} red.HsetMap(business.StockClosingPrice["StockIndex"], value.Code, value.YesterdayClose) red.HsetMap(business.StockClosingPrice["StockIndexBeforeClose"], value.Code, value.BeforeClose) red.HsetMap(business.StockClosingPrice["StockIndexNew"], value.Code, "0") } models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) } if len(dataList) > 0 { if err := data.MgoBulkWrite(data.StockIndexList, dataList); err != nil { applogger.Error("MgoInsertMany err:%v", err) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "operation failure", internal.QueryError)) return } } else { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "error operation repetition", internal.QueryError)) return } c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok")) } func StockIndexListUpdate(c *gin.Context) { param := stock.StockIndexList{} err := c.BindJSON(¶m) if err != nil { applogger.Error("BindJSON", err.Error()) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError)) return } else if len(param.Results) <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError)) return } else if param.Token != Token { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError)) return } else if param.Results[0].Code == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError)) return } else if param.Results[0].Locale == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError)) return } for _, value := range param.Results { locale := common.CapitalizeFirstLetter(value.Locale) if locale == "Us" { locale = "US" } filter := bson.D{{"Code", bson.M{ "$eq": value.Code, }}, {"Country", bson.M{ "$eq": locale, }}} updateData := bson.D{{"$set", bson.D{ {"Code", value.Code}, {"Name", value.Name}, {"Currency", value.Currency}, {"Exchange", value.PrimaryExchange}, {"Intro", value.Intro}}}} if err := data.MgoUpdateOne(data.StockIndexList, filter, updateData); err != nil { applogger.Error("MgoInsertMany info err: %v", err) } } c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok")) } func StockIndexListGet(c *gin.Context) { token := internal.ReplaceStr(c.Query("token")) if token != Token { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError)) return } filter := bson.M{} projection := bson.M{"Code": 1, "Country": 1} sort := bson.M{} var md stock.MgoPageSize md.Data, _ = data.MgoFindProjection(data.StockIndexList, filter, projection, sort, 0) c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, "ok")) } func StockIndexInfoMon(c *gin.Context) { param := model.StockMonRes{} err := c.BindJSON(¶m) if err != nil { applogger.Error("BindJSON", err.Error()) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError)) return } else if len(param.Result) <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError)) return } else if param.Token != Token { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError)) return } else if param.Status == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "status error", internal.QueryError)) return } var dataList []mongo.WriteModel for _, v := range param.Result { filter := bson.M{"timestamp": bson.M{"$eq": v.Ts}, "stock_code": bson.M{"$eq": v.StockCode}} update := bson.D{{"$set", bson.D{ {"stock_code", v.StockCode}, {"stock_name", v.StockName}, {"open_price", v.OpenPrice.String()}, {"high_price", v.HighPrice.String()}, {"low_price", v.LowPrice.String()}, {"close_price", v.ClosePrice.String()}, {"country", v.Country}, {"vol", v.Vol}, {"timestamp", v.Ts}, }}} models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) } if err := data.MgoBulkWrite(data.GetStockIndixKlineTableName(param.Status), dataList); err != nil { applogger.Error("stock MgoInsertMany err:%v", err) } c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok")) } func StockIndexKLineList(c *gin.Context) { symbol := internal.ReplaceStr(c.Query("symbol")) from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间 to := internal.IntegerInit(internal.ReplaceStr(c.Query("to"))) // 结束时间 period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度 size := internal.IntegerInit(internal.ReplaceStr(c.Query("size"))) // 结束时间 if period == "60min" { period = "1hour" } if symbol == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } else if period == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError)) return } filter := bson.M{"stock_code": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}} if size > 0 { filter = bson.M{"stock_code": symbol} } tableName := data.GetStockIndixKlineTableName(period) pagedData, err := data.MgoFinds(tableName, filter, int64(size)) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } var md stock.MgoPageSize md.Data = pagedData c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } func StockIndexInfo(c *gin.Context) { symbol := internal.ReplaceStr(c.Query("symbol")) if symbol == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError)) return } var pageData = model.StockMogoParam{} filter := bson.M{"stock_code": symbol} res, _ := data.MgoFinds(data.GetStockIndixKlineTableName("1day"), filter, int64(1)) pageData.StockCode = symbol if len(res) > 0 { pageData.HighPrice = business.TypeCheck(res[0]["high_price"]) pageData.LowPrice = business.TypeCheck(res[0]["low_price"]) pageData.Vol = res[0]["vol"] pageData.PriceTotal = business.TypeCheck(res[0]["price_total"]) pageData.TurnoverPriceTotal = business.TypeCheck(res[0]["turnover_price_total"]) } pageData.OpenPrice, _ = red.Hget(business.StockClosingPrice["StockIndex"], symbol) pageData.ClosePrice, _ = red.Hget(business.StockClosingPrice["StockIndexNew"], symbol) if pageData.ClosePrice == "" || pageData.ClosePrice == "0" { //闭盘期间 pageData.ClosePrice = pageData.OpenPrice pageData.OpenPrice, _ = red.Hget(business.StockClosingPrice["StockIndexBeforeClose"], symbol) } var md stock.MgoPageSize md.Data = pageData c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) } func StockIndexListUpdateToPHP(c *gin.Context) { param := stock.StockIndexPolygon{} err := c.BindJSON(¶m) fmt.Println(param) if err != nil { applogger.Error("BindJSON", err.Error()) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError)) return } else if param.Code == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError)) return } else if param.Locale == "" { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError)) return } var dataList []mongo.WriteModel if !common.IsLetter(param.Code) { applogger.Debug(param.Code, "包含其他符号") c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "code error", internal.QueryError)) return } filter := bson.D{{"Code", bson.M{ "$eq": param.Code, }}, {"Country", bson.M{ "$eq": param.Locale, }}} update := bson.D{{"$set", bson.D{ {"Code", param.Code}, {"Country", param.Locale}, {"State", param.State}, {"Sort", param.Sort}}}} models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true) dataList = append(dataList, models) if len(dataList) > 0 { if err := data.MgoBulkWrite(data.StockIndexList, dataList); err != nil { applogger.Error("MgoInsertMany err:%v", err) c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "operation failure", internal.QueryError)) return } } else { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "error operation repetition", internal.QueryError)) return } c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok")) } func ExchangeSymbolIndexList(c *gin.Context) { pageSize := internal.IntegerInit(internal.ReplaceStr(c.Query("pageSize"))) // 每页显示多少条数据 pageNumber := internal.IntegerInit(internal.ReplaceStr(c.Query("pageNumber"))) // 第几页 search := internal.ReplaceStr(c.Query("search")) // 搜索数据(模糊 code := internal.ReplaceStr(c.Query("code")) if pageSize <= 0 || pageNumber <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "分页参数不能为零", internal.QueryError)) return } var filter bson.M if len(search) > 0 { filter = bson.M{"$or": []bson.M{{"Code": bson.M{"$regex": search}}, {"Name": bson.M{"$regex": search}}}, "State": common.StockIndexOn} } else { filter = bson.M{"State": common.StockIndexOn} } strs := strings.Split(code, "-") if len(code) > 0 { filter = bson.M{"State": common.StockIndexOn, "Code": bson.M{"$in": strs}} } //fmt.Println(filter) total, err := data.MgoFindTotal(data.StockIndexList, filter) if err != nil { c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError)) return } pageData := make([]stock.StockIndexPolygon, 0) data.MgoPagingFindStructSort(data.StockIndexList, filter, int64(pageSize), int64(pageNumber), bson.M{"Sort": -1}, &pageData) var md stock.MgoPageSize md.PageSize = pageSize md.PageNumber = pageNumber md.Total = total if len(pageData) <= 0 { c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) return } for k, v := range pageData { key := business.StockClosingPrice["StockIndexNew"] pageData[k].ClosePrice, _ = red.Hget(key, v.Code) } md.Data = pageData c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess)) }