package closingMarket import ( "encoding/json" "fmt" "github.com/gorilla/websocket" "github.com/satori/go.uuid" "github.com/shopspring/decimal" "go.uber.org/zap" "log" "math" "net/http" "strconv" "strings" "sync" "time" "wss-pool/cmd/common" "wss-pool/dictionary" "wss-pool/internal/data/business" "wss-pool/internal/model" red "wss-pool/internal/redis" "wss-pool/logging/applogger" models "wss-pool/pkg/model" ) // Define a websocket connection object that contains information for each connection type Client struct { Id string // Client ID conn *websocket.Conn // Define websocket link objects msg chan []byte // Define messages received and distributed symbol sync.Map // Concurrent Security - Manage User Subscription Types mux sync.Mutex } type StockMessage struct { S string `json:"s,omitempty"` // 股票代码 Country string `json:"country"` //国家 StockCode string `json:"stock_code" bson:"stock_code"` // 股票代码 Symbol string `json:"symbol"` Stock string `json:"stock"` // 期权代码 IsStockIndex bool `json:"is_stock_index"` IsOptionList bool `json:"is_option_list"` IsOptionInfo bool `json:"is_option_info"` } var ( wsStockConMap = map[string][]*websocket.Conn{} mutexStock = sync.RWMutex{} msgStockChan = make(chan []byte) mutexConn = sync.RWMutex{} TotalNum int mutexTotal = sync.RWMutex{} countryMap = make(map[string][]string) mutexCountry = sync.RWMutex{} clearClientChan = make(chan *websocket.Conn) pinStockMap = make(map[string]bool) mutexPinMap = sync.RWMutex{} ) const ( stockConnNum int = 20 writeWait = 10 * time.Second ) // Define an UpGrader to upgrade a regular HTTP connection to a websocket connection var upServer = &websocket.Upgrader{ // Define read/write buffer size WriteBufferSize: 1024, ReadBufferSize: 1024, // Verification request CheckOrigin: func(r *http.Request) bool { // If it is not a get request, return an error if r.Method != "GET" { fmt.Println("Request method error") return false } // If chat is not included in the path, an error is returned if r.URL.Path != "/quotes-pin-wss" { fmt.Println("Request path error") return false } token := r.URL.Query().Get("token") if !common.CheckToken(token) { applogger.Debug("token expired") return false } // Verification rules can also be customized according to other needs return true }, } // ShareConnect func ShareConnect(host, addr string) { go writeShare() go offLineStock() go pinStock() http.HandleFunc("/quotes-pin-wss", wsHandleShare) url := fmt.Sprintf("%v%v", host, addr) err := http.ListenAndServe(url, nil) if err != nil { log.Fatal("ListenAndServe: ", err) } } // 读取插针数据 func pinStock() { for { stockPinData() time.Sleep(20 * time.Second) } } func getPinMap(symbol string) bool { mutexStock.RLock() defer mutexStock.RUnlock() return pinStockMap[symbol] } func setPinMap(symbol string) { mutexPinMap.Lock() defer mutexPinMap.Unlock() pinStockMap[symbol] = true applogger.Info("set pinMap", symbol) } func initPinMap() { mutexPinMap.Lock() defer mutexPinMap.Unlock() pinStockMap = make(map[string]bool) } // 针对资产插针 func stockPinData() { //清理上一次数据 initPinMap() for k, v := range business.StockClosedDataList { hashListName := fmt.Sprintf("STOCK_PRICES:%d", v) keys := red.Scan(hashListName) for _, key := range keys { res, _ := red.HGetAll(key) status, _ := strconv.Atoi(res["status"]) code := res["stock_code"] applogger.Debug(common.TimeToNows().Format("2006-01-02 15:04:05"), res) if status != business.StockStatusOn { continue } symbol := fmt.Sprintf("%s.%s", code, k) setPinMap(symbol) mutexStock.RLock() _, ok := wsStockConMap[symbol] mutexStock.RUnlock() //没有订阅没必要推送 if !ok { applogger.Info("No subscription ", symbol) continue } if k == "US" { UsPinStock(code, res["price"], k) } else { stockCode := common.GetOldCode(code) SouthAsiaPinSpot(code, stockCode, res["price"], k) } } } } func SouthAsiaPinSpot(symbol, stockCode, price, country string) { prices, _ := strconv.ParseFloat(price, 64) param := models.StockParam{ Symbol: symbol, StockCode: stockCode, StockName: "", Price: prices, UpDownRate: decimal.NewFromInt(0), UpDown: decimal.NewFromInt(0), TradeV: decimal.NewFromInt(0), TradeK: "买入", Country: strings.ToLower(country), Ts: time.Now().UnixMilli(), ClosingMarket: true, } param.Token = "" msgStr, err := json.Marshal(param) if err != nil { applogger.Error("json.Marshal err: %v", err) return } msgStockChan <- msgStr } func UsPinStock(code, price, country string) { message := &models.ClientMessage{ S: code, // 股票代码 C: []decimal.Decimal{decimal.NewFromInt(0), decimal.NewFromFloat(1)}, // 条件,有关更多信息,请参阅贸易条件术语表 V: common.CalculateContractPrices(decimal.NewFromInt(int64(100)), float64(0.02), 0, 1)[0].IntPart(), // 交易量,代表在相应时间戳处交易的股票数量 -- 报价交易量 Dp: true, // 暗池真/假 Ms: "open", // 市场状态,指示股票市场的当前状态(“开盘”、“收盘”、“延长交易时间”) T: time.Now().UnixMilli(), // 以毫秒为单位的时间戳 -- 此聚合窗口的结束时钟周期的时间戳(以 Unix 毫秒为单位) Cl: decimal.RequireFromString(price), // 此聚合窗口的收盘价 A: decimal.NewFromInt(11), // 今天的成交量加权平均价格 Se: time.Now().UnixMilli(), H: decimal.RequireFromString(price), // 此聚合窗口的最高逐笔报价 L: decimal.RequireFromString(price), // 此聚合窗口的最低价格变动价格 Op: decimal.RequireFromString(price), // 今天正式开盘价格 // P: decimal.RequireFromString(price), ClosingMarket: true, } msgStr, err := json.Marshal(message) if err != nil { applogger.Error("json.Marshal err: %v", err) return } msgStockChan <- msgStr } func wsHandleShare(w http.ResponseWriter, r *http.Request) { // Obtain a link through the upgraded upgrade tool conn, err := upServer.Upgrade(w, r, nil) if err != nil { applogger.Info("Failed to obtain connection:%v", err) return } // Register users after successful connection client := &Client{ Id: uuid.NewV4().String(), conn: conn, msg: make(chan []byte), symbol: sync.Map{}, mux: sync.Mutex{}, } readShare(client) } func setTotalNum(num int) { mutexTotal.Lock() defer mutexTotal.Unlock() TotalNum += num } func getTotalNum() { mutexTotal.RLock() defer mutexTotal.RUnlock() applogger.Debug("number of colleagues online :%v", TotalNum) } // Read the message sent by the client and process the return response func readShare(cl *Client) { defer cl.conn.Close() setTotalNum(1) getTotalNum() for { _, msg, err := cl.conn.ReadMessage() if err != nil { clearClientChan <- cl.conn applogger.Debug("user exit:%v", cl.conn.RemoteAddr().String()) return } // Process business logic psgMsg := model.SubMessage(string(msg)) if psgMsg != nil { switch psgMsg.Type { case "ping": // Receiving ping aloneSendStock(cl.conn, []byte(model.ReturnValue("pong"))) case "subscribe": // Receive subscription country, stock := getCountry(psgMsg.Symbol) //applogger.Info(country, stock) if !dictionary.StockCountryMap[country] { applogger.Error(country, "incorrect subscription information 不属于合规的股票市场") aloneSendStock(cl.conn, []byte(model.ReturnValue("incorrect subscription information"))) clearClientChan <- cl.conn return } aloneSendStock(cl.conn, []byte(model.ReturnValue("subscribe success"))) //获取 服务是否 订阅该数据 mutexStock.RLock() conns, ok := wsStockConMap[psgMsg.Symbol] mutexStock.RUnlock() if ok { //查询client是否订阅 conns = checkClient(conns, cl.conn) } else { //添加订阅 conns = make([]*websocket.Conn, 0) conns = append(conns, cl.conn) } //applogger.Info("psgMsg.Symbol", psgMsg.Symbol) mutexStock.Lock() wsStockConMap[psgMsg.Symbol] = conns mutexStock.Unlock() if !ok { go cl.userPSubscribeUs(country, stock) } case "unSubscribe": // Receive unsubscribe applogger.Info("Received unsubscribe message body:", string(msg)) mutexStock.RLock() conns, ok := wsStockConMap[psgMsg.Symbol] mutexStock.RUnlock() if ok { //取消订阅 removeWsconnStock(conns, cl.conn, psgMsg.Symbol) } aloneSendStock(cl.conn, []byte(model.ReturnValue("unSubscribe success"))) applogger.Debug("Subscription type after current user deletion:%v", ok) default: // TODO: Handling other situations transmitted by customers applogger.Debug("Please provide accurate instructions......") } } } } // 废弃 客户端 func offLineStock() { for userConn := range clearClientChan { setTotalNum(-1) getTotalNum() for key, v := range wsStockConMap { removeWsconnStock(v, userConn, key) } } } // send message func aloneSendStock(conn *websocket.Conn, message []byte) error { mutexConn.Lock() defer mutexConn.Unlock() //applogger.Debug("aloneSendStock :%v",conn,message) conn.SetWriteDeadline(time.Now().Add(writeWait)) w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type if err != nil { //发送失败 applogger.Error("Failed to conn.NextWriter :%v", err) return err } if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground applogger.Error("Failed Write message :%v", err) return err } if err := w.Close(); err != nil { // Close write stream applogger.Error("Failed to close write stream:%v", err) return nil } return nil } // 清理客户端 func removeWsconnStock(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error { index := -1 for i, v := range conn { if v == userConn { index = i break } } if index >= 0 { conn = append(conn[:index], conn[index+1:]...) mutexStock.Lock() wsStockConMap[symbol] = conn mutexStock.Unlock() } return nil } func deleteWsconnStock(symbol string) { mutexStock.Lock() defer mutexStock.Unlock() delete(wsStockConMap, symbol) } // 广播 func broadcastWebSocketStock(msg []byte, symbol string) { mutexStock.RLock() conns, ok := wsStockConMap[symbol] mutexStock.RUnlock() if !ok { applogger.Error("Parsing data information:%v") return } total := len(conns) if total <= 0 { return } start := time.Now() connDiv := int(math.Ceil(float64(total) / float64(stockConnNum))) //分批并发推送 for i := 0; i < connDiv; i++ { startIndex := i * stockConnNum endIndex := (i + 1) * stockConnNum if endIndex > total { endIndex = total } wg := sync.WaitGroup{} for _, val := range conns[startIndex:endIndex] { wg.Add(1) go func(val *websocket.Conn, msg []byte) { defer wg.Done() aloneSendStock(val, msg) }(val, msg) } wg.Wait() } applogger.Info("broadcast WebSocket info : %v ;total:%v;time-consuming %v ", symbol, total, time.Since(start)) } func writeShare() { for message := range msgStockChan { var subMsg StockMessage if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } if subMsg.S == "" { if subMsg.IsStockIndex { //指数 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.StockCode, common.StockIndexPrefix)) } else if subMsg.IsOptionList { //期权列表 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionList))) } else if subMsg.IsOptionInfo { //期权详情 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionInfo))) } else { //tradingview broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Symbol, common.CapitalizeFirstLetter(subMsg.Country))) } //applogger.Info("broadcast WebSocket Sub message %v info:%v", subMsg.Country, subMsg.StockCode) continue } //美股 broadcastWebSocketStock(message, fmt.Sprintf("%s.US", subMsg.S)) //applogger.Info("broadcast WebSocket Sub message US info:%v", subMsg.S) } } func checkClient(conns []*websocket.Conn, conn *websocket.Conn) []*websocket.Conn { for _, v := range conns { if v == conn { return conns } } conns = append(conns, conn) return conns } func getCountry(symbol string) (string, string) { symbolArr := strings.Split(symbol, ".") if len(symbolArr) < 2 { applogger.Error("symbol 有误") return "", "" } county := symbolArr[len(symbolArr)-1] return county, symbol[0 : strings.Index(symbol, county)-1] } // 按市场订阅 func (cl *Client) userPSubscribeUs(country, symbol string) { mutexCountry.RLock() symbols, ok := countryMap[country] mutexCountry.RUnlock() //提加 mutexCountry.Lock() countryMap[country] = append(symbols, symbol) mutexCountry.Unlock() if ok { //applogger.Error(country, "已订阅") return } applogger.Debug(country, "start a stock subscription") pubSub := red.RedisClient.PSubscribe(fmt.Sprintf("*.%s", country)) defer func() { pubSub.Close() }() _, err := pubSub.Receive() if err != nil { applogger.Error("failed to receive from control PubSub,%v", zap.Error(err)) return } ch := pubSub.Channel() for msg := range ch { mutexStock.RLock() conns, ok := wsStockConMap[msg.Channel] mutexStock.RUnlock() //未订阅股票跳出 if !ok { continue } // TODO: 是否还有客户端 if len(conns) > 0 { //是否有插针 必须放在下面判断,不然会被误判 客户端 断开链接 if !getPinMap(msg.Channel) { msgStockChan <- []byte(msg.Payload) } } else { deleteWsconnStock(msg.Channel) mutexCountry.RLock() symbols := countryMap[country] mutexCountry.RUnlock() index := -1 msgChannel := strings.Split(msg.Channel, ".") for i, v := range symbols { if v == msgChannel[0] { index = i break } } if index >= 0 { symbols = append(symbols[:index], symbols[index+1:]...) mutexCountry.Lock() countryMap[country] = symbols mutexCountry.Unlock() } // 退订 if len(symbols) <= 0 { mutexCountry.Lock() delete(countryMap, country) mutexCountry.Unlock() applogger.Debug("Starting unsubscribe.....", country) pubSub.PUnsubscribe(fmt.Sprintf("*.%s", country)) return } } } }