You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
415 lines
15 KiB
415 lines
15 KiB
2 months ago
|
package processor
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"github.com/gin-gonic/gin"
|
||
|
"github.com/shopspring/decimal"
|
||
|
"go.mongodb.org/mongo-driver/bson"
|
||
|
"go.mongodb.org/mongo-driver/mongo"
|
||
|
"net/http"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"wss-pool/cmd/common"
|
||
|
"wss-pool/internal"
|
||
|
"wss-pool/internal/data"
|
||
|
"wss-pool/internal/data/business"
|
||
|
red "wss-pool/internal/redis"
|
||
|
"wss-pool/logging/applogger"
|
||
|
"wss-pool/pkg/model"
|
||
|
"wss-pool/pkg/model/stock"
|
||
|
)
|
||
|
|
||
|
func StockIndexInfoAdd(c *gin.Context) {
|
||
|
param := make([]model.StockIndexParam, 0)
|
||
|
err := c.BindJSON(¶m)
|
||
|
total := len(param)
|
||
|
if err != nil {
|
||
|
applogger.Error("BindJSON", err.Error())
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError))
|
||
|
return
|
||
|
} else if total <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError))
|
||
|
return
|
||
|
} else if param[0].Token != Token {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError))
|
||
|
return
|
||
|
} else if param[0].Country == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "country error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
country := common.CapitalizeFirstLetter(param[0].Country)
|
||
|
if country == "Us" {
|
||
|
country = "US"
|
||
|
}
|
||
|
var dataList []mongo.WriteModel
|
||
|
applogger.Debug("stock info add param total: %d", total)
|
||
|
for _, v := range param {
|
||
|
price := strconv.FormatFloat(v.Price, 'f', 4, 64)
|
||
|
v.Price, err = strconv.ParseFloat(price, 64)
|
||
|
if err != nil {
|
||
|
applogger.Error(v.Country, v.StockCode, err)
|
||
|
}
|
||
|
if !business.IsPriceTime(v.StockCode, price, common.StockIndexPrefix) {
|
||
|
continue
|
||
|
}
|
||
|
business.StockPyWsStockIndex(v, common.StockIndexPrefix)
|
||
|
//更新最新价格
|
||
|
red.HsetMap(business.StockClosingPrice["StockIndexNew"], v.StockCode, price)
|
||
|
filter := bson.M{"timestamp": bson.M{"$eq": v.Ts}, "stock_code": bson.M{"$eq": v.StockCode}}
|
||
|
update := bson.D{{"$set", bson.D{
|
||
|
{"stock_code", v.StockCode},
|
||
|
{"stock_name", v.StockName},
|
||
|
{"price", v.Price},
|
||
|
{"high_price", v.HighPrice},
|
||
|
{"low_price", v.LowPrice},
|
||
|
{"open_price", v.OpenPrice},
|
||
|
{"up_down_rate", v.UpDownRate},
|
||
|
{"up_down", v.UpDown},
|
||
|
{"vol", v.Vol},
|
||
|
{"price_code", v.PriceCode},
|
||
|
{"country", v.Country},
|
||
|
{"timestamp", v.Ts},
|
||
|
}}}
|
||
|
models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true)
|
||
|
dataList = append(dataList, models)
|
||
|
}
|
||
|
if len(dataList) > 0 {
|
||
|
if err := data.MgoBulkWrite(data.GetStockIndexTableName(), dataList); err != nil {
|
||
|
applogger.Error("stock MgoInsertMany err:%v", err)
|
||
|
}
|
||
|
}
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok"))
|
||
|
}
|
||
|
|
||
|
func StockIndexListAdd(c *gin.Context) {
|
||
|
param := stock.StockIndexList{}
|
||
|
err := c.BindJSON(¶m)
|
||
|
fmt.Println(param)
|
||
|
if err != nil {
|
||
|
applogger.Error("BindJSON", err.Error())
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError))
|
||
|
return
|
||
|
} else if len(param.Results) <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Token != Token {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Results[0].Code == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Results[0].Locale == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var dataList []mongo.WriteModel
|
||
|
for _, value := range param.Results {
|
||
|
locale := common.CapitalizeFirstLetter(value.Locale)
|
||
|
if locale == "Us" {
|
||
|
locale = "US"
|
||
|
}
|
||
|
if !common.IsLetter(value.Code) {
|
||
|
applogger.Debug(value.Code, "包含其他符号")
|
||
|
continue
|
||
|
}
|
||
|
filter := bson.D{{"Code", bson.M{
|
||
|
"$eq": value.Code,
|
||
|
}}, {"Country", bson.M{
|
||
|
"$eq": locale,
|
||
|
}}}
|
||
|
update := bson.D{{"$set", bson.D{
|
||
|
{"Code", value.Code},
|
||
|
{"Name", value.Name},
|
||
|
{"Country", locale},
|
||
|
{"Exchange", value.PrimaryExchange},
|
||
|
{"Currency", value.Currency},
|
||
|
{"State", common.StockIndexOn},
|
||
|
{"Intro", value.Intro}}}}
|
||
|
//第二次
|
||
|
if value.Currency == "" && value.YesterdayClose != "" {
|
||
|
_, err := decimal.NewFromString(value.YesterdayClose)
|
||
|
if err != nil {
|
||
|
applogger.Debug(value.Code, value.YesterdayClose, err.Error())
|
||
|
continue
|
||
|
}
|
||
|
value.DateStr = strings.TrimSpace(value.DateStr)
|
||
|
//if common.TimeToNows().Format("2006-01-02") != value.DateStr {
|
||
|
// applogger.Error(value.Code, "不是今日的闭盘价")
|
||
|
// continue
|
||
|
//}
|
||
|
update = bson.D{{"$set", bson.D{
|
||
|
{"YesterdayClose", value.YesterdayClose}, //上一次 闭盘价
|
||
|
{"BeforeClose", value.BeforeClose},
|
||
|
{"ClosePrice", "0"},
|
||
|
{"Vol", value.Vol},
|
||
|
{"DateStr", value.DateStr},
|
||
|
}}}
|
||
|
red.HsetMap(business.StockClosingPrice["StockIndex"], value.Code, value.YesterdayClose)
|
||
|
red.HsetMap(business.StockClosingPrice["StockIndexBeforeClose"], value.Code, value.BeforeClose)
|
||
|
red.HsetMap(business.StockClosingPrice["StockIndexNew"], value.Code, "0")
|
||
|
}
|
||
|
models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true)
|
||
|
dataList = append(dataList, models)
|
||
|
}
|
||
|
if len(dataList) > 0 {
|
||
|
if err := data.MgoBulkWrite(data.StockIndexList, dataList); err != nil {
|
||
|
applogger.Error("MgoInsertMany err:%v", err)
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "operation failure", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
} else {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "error operation repetition", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok"))
|
||
|
}
|
||
|
|
||
|
func StockIndexListUpdate(c *gin.Context) {
|
||
|
param := stock.StockIndexList{}
|
||
|
err := c.BindJSON(¶m)
|
||
|
if err != nil {
|
||
|
applogger.Error("BindJSON", err.Error())
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError))
|
||
|
return
|
||
|
} else if len(param.Results) <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Token != Token {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Results[0].Code == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Results[0].Locale == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
for _, value := range param.Results {
|
||
|
locale := common.CapitalizeFirstLetter(value.Locale)
|
||
|
if locale == "Us" {
|
||
|
locale = "US"
|
||
|
}
|
||
|
filter := bson.D{{"Code", bson.M{
|
||
|
"$eq": value.Code,
|
||
|
}}, {"Country", bson.M{
|
||
|
"$eq": locale,
|
||
|
}}}
|
||
|
updateData := bson.D{{"$set", bson.D{
|
||
|
{"Code", value.Code},
|
||
|
{"Name", value.Name},
|
||
|
{"Currency", value.Currency},
|
||
|
{"Exchange", value.PrimaryExchange},
|
||
|
{"Intro", value.Intro}}}}
|
||
|
if err := data.MgoUpdateOne(data.StockIndexList, filter, updateData); err != nil {
|
||
|
applogger.Error("MgoInsertMany info err: %v", err)
|
||
|
}
|
||
|
}
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok"))
|
||
|
}
|
||
|
|
||
|
func StockIndexListGet(c *gin.Context) {
|
||
|
token := internal.ReplaceStr(c.Query("token"))
|
||
|
if token != Token {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
filter := bson.M{}
|
||
|
projection := bson.M{"Code": 1, "Country": 1}
|
||
|
sort := bson.M{}
|
||
|
var md stock.MgoPageSize
|
||
|
md.Data, _ = data.MgoFindProjection(data.StockIndexList, filter, projection, sort, 0)
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, "ok"))
|
||
|
}
|
||
|
|
||
|
func StockIndexInfoMon(c *gin.Context) {
|
||
|
param := model.StockMonRes{}
|
||
|
err := c.BindJSON(¶m)
|
||
|
if err != nil {
|
||
|
applogger.Error("BindJSON", err.Error())
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError))
|
||
|
return
|
||
|
} else if len(param.Result) <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param null", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Token != Token {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "token error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Status == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "status error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var dataList []mongo.WriteModel
|
||
|
for _, v := range param.Result {
|
||
|
filter := bson.M{"timestamp": bson.M{"$eq": v.Ts}, "stock_code": bson.M{"$eq": v.StockCode}}
|
||
|
update := bson.D{{"$set", bson.D{
|
||
|
{"stock_code", v.StockCode},
|
||
|
{"stock_name", v.StockName},
|
||
|
{"open_price", v.OpenPrice.String()},
|
||
|
{"high_price", v.HighPrice.String()},
|
||
|
{"low_price", v.LowPrice.String()},
|
||
|
{"close_price", v.ClosePrice.String()},
|
||
|
{"country", v.Country},
|
||
|
{"vol", v.Vol},
|
||
|
{"timestamp", v.Ts},
|
||
|
}}}
|
||
|
models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true)
|
||
|
dataList = append(dataList, models)
|
||
|
}
|
||
|
if err := data.MgoBulkWrite(data.GetStockIndixKlineTableName(param.Status), dataList); err != nil {
|
||
|
applogger.Error("stock MgoInsertMany err:%v", err)
|
||
|
}
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok"))
|
||
|
}
|
||
|
|
||
|
func StockIndexKLineList(c *gin.Context) {
|
||
|
symbol := internal.ReplaceStr(c.Query("symbol"))
|
||
|
from := internal.IntegerInit(internal.ReplaceStr(c.Query("from"))) // 开始时间
|
||
|
to := internal.IntegerInit(internal.ReplaceStr(c.Query("to"))) // 结束时间
|
||
|
period := internal.ReplaceStr(c.Query("period")) // 时间颗粒度
|
||
|
size := internal.IntegerInit(internal.ReplaceStr(c.Query("size"))) // 结束时间
|
||
|
if period == "60min" {
|
||
|
period = "1hour"
|
||
|
}
|
||
|
if symbol == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError))
|
||
|
return
|
||
|
} else if period == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "period error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
filter := bson.M{"stock_code": symbol, "timestamp": bson.M{"$gte": from, "$lte": to}}
|
||
|
if size > 0 {
|
||
|
filter = bson.M{"stock_code": symbol}
|
||
|
}
|
||
|
tableName := data.GetStockIndixKlineTableName(period)
|
||
|
|
||
|
pagedData, err := data.MgoFinds(tableName, filter, int64(size))
|
||
|
if err != nil {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var md stock.MgoPageSize
|
||
|
md.Data = pagedData
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess))
|
||
|
}
|
||
|
|
||
|
func StockIndexInfo(c *gin.Context) {
|
||
|
symbol := internal.ReplaceStr(c.Query("symbol"))
|
||
|
if symbol == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "symbol error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var pageData = model.StockMogoParam{}
|
||
|
filter := bson.M{"stock_code": symbol}
|
||
|
res, _ := data.MgoFinds(data.GetStockIndixKlineTableName("1day"), filter, int64(1))
|
||
|
pageData.StockCode = symbol
|
||
|
if len(res) > 0 {
|
||
|
pageData.HighPrice = business.TypeCheck(res[0]["high_price"])
|
||
|
pageData.LowPrice = business.TypeCheck(res[0]["low_price"])
|
||
|
pageData.Vol = res[0]["vol"]
|
||
|
pageData.PriceTotal = business.TypeCheck(res[0]["price_total"])
|
||
|
pageData.TurnoverPriceTotal = business.TypeCheck(res[0]["turnover_price_total"])
|
||
|
}
|
||
|
pageData.OpenPrice, _ = red.Hget(business.StockClosingPrice["StockIndex"], symbol)
|
||
|
pageData.ClosePrice, _ = red.Hget(business.StockClosingPrice["StockIndexNew"], symbol)
|
||
|
if pageData.ClosePrice == "" || pageData.ClosePrice == "0" { //闭盘期间
|
||
|
pageData.ClosePrice = pageData.OpenPrice
|
||
|
pageData.OpenPrice, _ = red.Hget(business.StockClosingPrice["StockIndexBeforeClose"], symbol)
|
||
|
}
|
||
|
var md stock.MgoPageSize
|
||
|
md.Data = pageData
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess))
|
||
|
}
|
||
|
|
||
|
func StockIndexListUpdateToPHP(c *gin.Context) {
|
||
|
param := stock.StockIndexPolygon{}
|
||
|
err := c.BindJSON(¶m)
|
||
|
fmt.Println(param)
|
||
|
if err != nil {
|
||
|
applogger.Error("BindJSON", err.Error())
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "param error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Code == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "ticker error", internal.QueryError))
|
||
|
return
|
||
|
} else if param.Locale == "" {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "locale error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var dataList []mongo.WriteModel
|
||
|
if !common.IsLetter(param.Code) {
|
||
|
applogger.Debug(param.Code, "包含其他符号")
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "code error", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
filter := bson.D{{"Code", bson.M{
|
||
|
"$eq": param.Code,
|
||
|
}}, {"Country", bson.M{
|
||
|
"$eq": param.Locale,
|
||
|
}}}
|
||
|
update := bson.D{{"$set", bson.D{
|
||
|
{"Code", param.Code},
|
||
|
{"Country", param.Locale},
|
||
|
{"State", param.State},
|
||
|
{"Sort", param.Sort}}}}
|
||
|
models := mongo.NewUpdateOneModel().SetFilter(filter).SetUpdate(update).SetUpsert(true)
|
||
|
dataList = append(dataList, models)
|
||
|
if len(dataList) > 0 {
|
||
|
if err := data.MgoBulkWrite(data.StockIndexList, dataList); err != nil {
|
||
|
applogger.Error("MgoInsertMany err:%v", err)
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "operation failure", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
} else {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "error operation repetition", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, "", "ok"))
|
||
|
}
|
||
|
|
||
|
func ExchangeSymbolIndexList(c *gin.Context) {
|
||
|
pageSize := internal.IntegerInit(internal.ReplaceStr(c.Query("pageSize"))) // 每页显示多少条数据
|
||
|
pageNumber := internal.IntegerInit(internal.ReplaceStr(c.Query("pageNumber"))) // 第几页
|
||
|
search := internal.ReplaceStr(c.Query("search")) // 搜索数据(模糊
|
||
|
code := internal.ReplaceStr(c.Query("code"))
|
||
|
|
||
|
if pageSize <= 0 || pageNumber <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, "分页参数不能为零", internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
var filter bson.M
|
||
|
if len(search) > 0 {
|
||
|
filter = bson.M{"$or": []bson.M{{"Code": bson.M{"$regex": search}}, {"Name": bson.M{"$regex": search}}}, "State": common.StockIndexOn}
|
||
|
} else {
|
||
|
filter = bson.M{"State": common.StockIndexOn}
|
||
|
}
|
||
|
|
||
|
strs := strings.Split(code, "-")
|
||
|
if len(code) > 0 {
|
||
|
filter = bson.M{"State": common.StockIndexOn, "Code": bson.M{"$in": strs}}
|
||
|
}
|
||
|
//fmt.Println(filter)
|
||
|
total, err := data.MgoFindTotal(data.StockIndexList, filter)
|
||
|
if err != nil {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusBadRequest, err, internal.QueryError))
|
||
|
return
|
||
|
}
|
||
|
pageData := make([]stock.StockIndexPolygon, 0)
|
||
|
data.MgoPagingFindStructSort(data.StockIndexList, filter, int64(pageSize), int64(pageNumber), bson.M{"Sort": -1}, &pageData)
|
||
|
var md stock.MgoPageSize
|
||
|
md.PageSize = pageSize
|
||
|
md.PageNumber = pageNumber
|
||
|
md.Total = total
|
||
|
if len(pageData) <= 0 {
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess))
|
||
|
return
|
||
|
}
|
||
|
for k, v := range pageData {
|
||
|
key := business.StockClosingPrice["StockIndexNew"]
|
||
|
pageData[k].ClosePrice, _ = red.Hget(key, v.Code)
|
||
|
}
|
||
|
md.Data = pageData
|
||
|
c.JSON(http.StatusOK, internal.GinResult(http.StatusOK, md, internal.QuerySuccess))
|
||
|
}
|