You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

86 lines
2.4 KiB

package business
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson"
"strings"
"wss-pool/config"
"wss-pool/internal"
"wss-pool/internal/data"
"wss-pool/logging/applogger"
"wss-pool/pkg/model/stock"
)
// ShareUsData TODO: 美股数据采集 修改为分布式多节点
func ShareUsData() {
url := fmt.Sprintf("https://%v/api/exchange-symbol-list/us?api_token=%v&fmt=json", config.Config.ShareGather.FinancialHost, config.Config.ShareGather.FinancialKey)
applogger.Debug("select info:%v", url)
bodyStr, err := internal.HttpGet(url)
if err != nil {
applogger.Error("HttpGet err:%v", err)
return
}
var shareModel []stock.StockShare
if err := json.Unmarshal([]byte(bodyStr), &shareModel); err != nil {
applogger.Error("Unmarshal err:%v", err)
return
}
for _, value := range shareModel {
applogger.Info("select data info:%v", value)
go func() {
url := fmt.Sprintf("wss://%v/ws/%v?api_token=%v", config.Config.ShareGather.FinancialWsUs, "us", config.Config.ShareGather.FinancialKey)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
applogger.Error("链接wss服务器失败:%v", err)
return
}
defer conn.Close()
subscribe := fmt.Sprintf("{\"action\": \"subscribe\", \"symbols\": \"%v\"}", value.Code)
if err := conn.WriteMessage(websocket.TextMessage, []byte(subscribe)); err != nil {
applogger.Error("send connSub WriteMessage err::%v", err)
return
}
for {
_, msg, err := conn.ReadMessage()
if err != nil {
applogger.Error("err info:%v", err)
return
}
applogger.Info("Subscribe date:%v", string(msg))
if !strings.Contains(string(msg), "{\"status_code\":200,\"message\":\"Authorized\"}") {
go func() {
// 持久化数据信息
var msgC stock.RealTimeMessage
if err := bson.Unmarshal(msg, &msgC); err != nil {
applogger.Error("Insert info err:%v", err)
return
}
applogger.Debug("date info:%v", msgC)
var c = []string{}
for _, vc := range msgC.C {
c = append(c, vc.String())
}
cxt := bson.D{
{"S", msgC.S},
{"Dp", msgC.Dp},
{"C", c},
{"T", msgC.T},
{"Ms", msgC.Ms},
{"V", msgC.V},
{"P", msgC.P.String()},
}
if err := data.MgoInsertOne(data.StockUs, cxt); err != nil {
applogger.Error("Insert info err:%v", err)
return
}
}()
}
}
}()
}
}