package business import ( "encoding/json" "fmt" "github.com/gorilla/websocket" "go.mongodb.org/mongo-driver/bson" "strings" "wss-pool/config" "wss-pool/internal" "wss-pool/internal/data" "wss-pool/logging/applogger" "wss-pool/pkg/model/stock" ) // ShareUsData TODO: 美股数据采集 修改为分布式多节点 func ShareUsData() { url := fmt.Sprintf("https://%v/api/exchange-symbol-list/us?api_token=%v&fmt=json", config.Config.ShareGather.FinancialHost, config.Config.ShareGather.FinancialKey) applogger.Debug("select info:%v", url) bodyStr, err := internal.HttpGet(url) if err != nil { applogger.Error("HttpGet err:%v", err) return } var shareModel []stock.StockShare if err := json.Unmarshal([]byte(bodyStr), &shareModel); err != nil { applogger.Error("Unmarshal err:%v", err) return } for _, value := range shareModel { applogger.Info("select data info:%v", value) go func() { url := fmt.Sprintf("wss://%v/ws/%v?api_token=%v", config.Config.ShareGather.FinancialWsUs, "us", config.Config.ShareGather.FinancialKey) conn, _, err := websocket.DefaultDialer.Dial(url, nil) if err != nil { applogger.Error("链接wss服务器失败:%v", err) return } defer conn.Close() subscribe := fmt.Sprintf("{\"action\": \"subscribe\", \"symbols\": \"%v\"}", value.Code) if err := conn.WriteMessage(websocket.TextMessage, []byte(subscribe)); err != nil { applogger.Error("send connSub WriteMessage err::%v", err) return } for { _, msg, err := conn.ReadMessage() if err != nil { applogger.Error("err info:%v", err) return } applogger.Info("Subscribe date:%v", string(msg)) if !strings.Contains(string(msg), "{\"status_code\":200,\"message\":\"Authorized\"}") { go func() { // 持久化数据信息 var msgC stock.RealTimeMessage if err := bson.Unmarshal(msg, &msgC); err != nil { applogger.Error("Insert info err:%v", err) return } applogger.Debug("date info:%v", msgC) var c = []string{} for _, vc := range msgC.C { c = append(c, vc.String()) } cxt := bson.D{ {"S", msgC.S}, {"Dp", msgC.Dp}, {"C", c}, {"T", msgC.T}, {"Ms", msgC.Ms}, {"V", msgC.V}, {"P", msgC.P.String()}, } if err := data.MgoInsertOne(data.StockUs, cxt); err != nil { applogger.Error("Insert info err:%v", err) return } }() } } }() } }