You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
86 lines
2.4 KiB
86 lines
2.4 KiB
package business
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/gorilla/websocket"
|
|
"go.mongodb.org/mongo-driver/bson"
|
|
"strings"
|
|
"wss-pool/config"
|
|
"wss-pool/internal"
|
|
"wss-pool/internal/data"
|
|
"wss-pool/logging/applogger"
|
|
"wss-pool/pkg/model/stock"
|
|
)
|
|
|
|
// ShareUsData TODO: 美股数据采集 修改为分布式多节点
|
|
func ShareUsData() {
|
|
url := fmt.Sprintf("https://%v/api/exchange-symbol-list/us?api_token=%v&fmt=json", config.Config.ShareGather.FinancialHost, config.Config.ShareGather.FinancialKey)
|
|
applogger.Debug("select info:%v", url)
|
|
bodyStr, err := internal.HttpGet(url)
|
|
if err != nil {
|
|
applogger.Error("HttpGet err:%v", err)
|
|
return
|
|
}
|
|
var shareModel []stock.StockShare
|
|
if err := json.Unmarshal([]byte(bodyStr), &shareModel); err != nil {
|
|
applogger.Error("Unmarshal err:%v", err)
|
|
return
|
|
}
|
|
|
|
for _, value := range shareModel {
|
|
applogger.Info("select data info:%v", value)
|
|
go func() {
|
|
url := fmt.Sprintf("wss://%v/ws/%v?api_token=%v", config.Config.ShareGather.FinancialWsUs, "us", config.Config.ShareGather.FinancialKey)
|
|
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
applogger.Error("链接wss服务器失败:%v", err)
|
|
return
|
|
}
|
|
defer conn.Close()
|
|
|
|
subscribe := fmt.Sprintf("{\"action\": \"subscribe\", \"symbols\": \"%v\"}", value.Code)
|
|
if err := conn.WriteMessage(websocket.TextMessage, []byte(subscribe)); err != nil {
|
|
applogger.Error("send connSub WriteMessage err::%v", err)
|
|
return
|
|
}
|
|
for {
|
|
_, msg, err := conn.ReadMessage()
|
|
if err != nil {
|
|
applogger.Error("err info:%v", err)
|
|
return
|
|
}
|
|
applogger.Info("Subscribe date:%v", string(msg))
|
|
if !strings.Contains(string(msg), "{\"status_code\":200,\"message\":\"Authorized\"}") {
|
|
go func() {
|
|
// 持久化数据信息
|
|
var msgC stock.RealTimeMessage
|
|
if err := bson.Unmarshal(msg, &msgC); err != nil {
|
|
applogger.Error("Insert info err:%v", err)
|
|
return
|
|
}
|
|
applogger.Debug("date info:%v", msgC)
|
|
|
|
var c = []string{}
|
|
for _, vc := range msgC.C {
|
|
c = append(c, vc.String())
|
|
}
|
|
cxt := bson.D{
|
|
{"S", msgC.S},
|
|
{"Dp", msgC.Dp},
|
|
{"C", c},
|
|
{"T", msgC.T},
|
|
{"Ms", msgC.Ms},
|
|
{"V", msgC.V},
|
|
{"P", msgC.P.String()},
|
|
}
|
|
if err := data.MgoInsertOne(data.StockUs, cxt); err != nil {
|
|
applogger.Error("Insert info err:%v", err)
|
|
return
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
}
|
|
|