You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

217 lines
6.8 KiB

2 months ago
package marketwsscliert
import (
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"go.mongodb.org/mongo-driver/bson"
"strings"
"time"
"wss-pool/cmd/common"
"wss-pool/config"
"wss-pool/internal/data"
"wss-pool/internal/data/business"
"wss-pool/logging/applogger"
"wss-pool/pkg/model"
"wss-pool/pkg/model/stock"
)
var UsNewCode = make(map[string]bool)
// 订阅美股股票行情数据
func subscribeFinnhub() *websocket.Conn {
url := fmt.Sprintf("wss://%v?token=%v", config.Config.FinnhubUs.FinnhubWss, config.Config.FinnhubUs.FinnhubKey)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
applogger.Error("Failed to link to wss server:%v", err)
//defer conn.Close()
return nil
}
return conn
}
func subscribeMarketUsBak(conn *websocket.Conn) {
_ = sendUsCode(conn, false) // 获取订阅全部股票
go singleSendUsCode(conn) // 单线程 订阅后续新增的股票
// TODO: Verify if key permissions are passed
//subAuh := fmt.Sprintf("{\"action\":\"auth\",\"params\":\"%v\"}", config.Config.ShareGather.PolygonKey)
//Send(conn, subAuh)
//// Initiate event subscription
//subData := fmt.Sprintf("{\"action\":\"subscribe\",\"params\":\"A.*\"}")
//Send(conn, subData)
//usMessage := make([]model.ClientMessage, 0)
for {
_, msg, err := conn.ReadMessage()
if err != nil {
applogger.Error("err info:%v", err)
return
}
applogger.Debug("ReadMessage data info:%v", string(msg))
var subModel model.FinnhubMessage
if err := json.Unmarshal(msg, &subModel); err != nil {
applogger.Error("subModel Unmarshal info err: %v", err)
continue
}
if !common.IsFinnhubOpeningUS() {
applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), "us shareMarket it's not opening time -----------------------------end")
continue
}
for _, value := range subModel.Data {
message := model.ClientMessage{
S: value.S, // 股票代码
C: value.C, // 条件,有关更多信息,请参阅贸易条件术语表
V: value.V, // 交易量,代表在相应时间戳处交易的股票数量 -- 报价交易量
Dp: true, // 暗池真/假
T: value.T, // 以毫秒为单位的时间戳 -- 此聚合窗口的结束时钟周期的时间戳(以 Unix 毫秒为单位)
Cl: value.P,
Op: value.P,
H: value.P,
L: value.P,
}
msgStr, err := json.Marshal(message)
if err != nil {
applogger.Error("json.Marshal err: %v", err)
return
}
applogger.Debug("message info:%v", string(msgStr))
// Write to Redis for broadcasting
if len(message.S) == 0 {
return
}
business.JudgePublishMap("US", message.S, fmt.Sprintf("%s.US", message.S), string(msgStr))
business.JudgeHsetMap("US", business.StockClosingPrice["USNew"], message.S, message.Cl.String())
}
}
}
func sendUsCode(conn *websocket.Conn, isSingle bool) error {
filter := bson.M{"Country": "US", "YesterdayClose": bson.M{"$ne": ""}}
projection := bson.M{"Code": 1}
stockRes := make([]stock.StockPolygon, 0)
data.MgoPagingFindStructProjection(data.StockList, filter, projection, 20000, 1, -1, &stockRes)
old := len(UsNewCode)
for _, v := range stockRes {
if !common.IsExistStock("US", v.Code) {
continue
}
sendPing := fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", v.Code)
if !isSingle { //订阅全部美股
fmt.Println(sendPing)
Send(conn, sendPing)
UsNewCode[v.Code] = true
continue
}
if !UsNewCode[v.Code] { //追加订阅新股票
applogger.Debug("new us stock:%v", sendPing)
Send(conn, sendPing)
UsNewCode[v.Code] = true
}
}
applogger.Debug("old us stock num", old, len(UsNewCode))
return nil
}
func singleSendUsCode(conn *websocket.Conn) {
for {
time.Sleep(1 * time.Hour)
//发送测试数据检查该连接是否有效
if err := Send(conn, fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", "testcode.ping")); err != nil {
applogger.Error("ws:%v", err)
return
}
applogger.Debug("send new us stock start")
if err := sendUsCode(conn, true); err != nil {
applogger.Debug("send new us stock end")
return
}
}
}
// 接收分发股票实时行情
func subscribeDispense(roue string, post int) *websocket.Conn {
url := fmt.Sprintf("ws://%v:%v/%v", config.Config.FinnhubUs.DispenseWss, post, roue)
applogger.Debug("dispense wss url:", url)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
applogger.Error("Failed to link to wss server:%v", err)
return nil
}
applogger.Debug("dispense wss connect success......")
return conn
}
// 美股订阅
func subscribeMarketUsBakNew(conn *websocket.Conn) {
for {
_, msg, err := conn.ReadMessage()
if err != nil {
applogger.Error("err info:%v", err)
time.Sleep(50 * time.Second)
break
}
msgStrOne := string(msg)
applogger.Debug("ReadMessage data info:%v", msgStrOne)
if strings.Contains(msgStrOne, "ping") || strings.Contains(msgStrOne, "subscribe success") {
continue
}
var subModel model.FinnhubMessageNew
if err = json.Unmarshal(msg, &subModel); err != nil {
applogger.Error("subModel Unmarshal info111 err: %v", err)
continue
}
var modelCl model.FinnhubMessage
if err = json.Unmarshal([]byte(subModel.Content), &modelCl); err != nil {
applogger.Error("subModel Unmarshal info222 err: %v", err)
continue
}
// 美股开盘时间判定
//if !common.IsFinnhubOpeningUS() {
// applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), "us shareMarket it's not opening time -----------------------------end")
// continue
//}
for _, value := range modelCl.Data {
message := model.ClientMessage{
S: value.S, // 股票代码
C: value.C, // 条件,有关更多信息,请参阅贸易条件术语表
V: value.V, // 交易量,代表在相应时间戳处交易的股票数量 -- 报价交易量
Dp: true, // 暗池真/假
T: value.T, // 以毫秒为单位的时间戳 -- 此聚合窗口的结束时钟周期的时间戳(以 Unix 毫秒为单位)
Cl: value.P,
Op: value.P,
H: value.P,
L: value.P,
}
msgStr, err := json.Marshal(message)
if err != nil {
applogger.Error("json.Marshal err: %v", err)
continue
}
applogger.Debug("message info:%v", string(msgStr))
// Write to Redis for broadcasting
if len(message.S) == 0 {
continue
}
business.JudgePublishMap("US", message.S, fmt.Sprintf("%s.US", message.S), string(msgStr))
business.JudgeHsetMap("US", business.StockClosingPrice["USNew"], message.S, message.Cl.String())
}
}
}
// 发送消息
func Send(conn *websocket.Conn, data string) error {
if conn == nil {
applogger.Error("WebSocket sent error: no connection available")
return errors.New("WebSocket sent error: no connection available")
}
err := conn.WriteMessage(websocket.TextMessage, []byte(data))
if err != nil {
applogger.Error("WebSocket sent error: data=%s, error=%s", data, err)
return err
}
return nil
}