package websocketservice import ( "encoding/json" "fmt" "github.com/gorilla/websocket" "github.com/satori/go.uuid" "go.uber.org/zap" "log" "math" "net/http" "strings" "sync" "time" "wss-pool/cmd/common" "wss-pool/dictionary" "wss-pool/internal/model" red "wss-pool/internal/redis" "wss-pool/logging/applogger" mol "wss-pool/pkg/model" ) // Define a websocket connection object that contains information for each connection type Client struct { Id string // Client ID conn *websocket.Conn // Define websocket link objects msg chan []byte // Define messages received and distributed symbol sync.Map // Concurrent Security - Manage User Subscription Types mux sync.Mutex } type StockMessage struct { S string `json:"s,omitempty"` // 股票代码 Country string `json:"country"` //国家 StockCode string `json:"stock_code" bson:"stock_code"` // 股票代码 Symbol string `json:"symbol"` Stock string `json:"stock"` // 期权代码 IsStockIndex bool `json:"is_stock_index"` IsOptionList bool `json:"is_option_list"` IsOptionInfo bool `json:"is_option_info"` } var ( wsStockConMap = map[string][]*websocket.Conn{} mutexStock = sync.RWMutex{} msgStockChan = make(chan []byte) mutexConn = sync.RWMutex{} TotalNum int mutexTotal = sync.RWMutex{} countryMap = make(map[string][]string) mutexCountry = sync.RWMutex{} clearClientChan = make(chan *websocket.Conn) ) const ( stockConnNum int = 20 ) // Define an UpGrader to upgrade a regular HTTP connection to a websocket connection var upServer = &websocket.Upgrader{ // Define read/write buffer size WriteBufferSize: 1024, ReadBufferSize: 1024, // Verification request CheckOrigin: func(r *http.Request) bool { // If it is not a get request, return an error if r.Method != "GET" { fmt.Println("Request method error") return false } // If chat is not included in the path, an error is returned if r.URL.Path != "/quotes-share-wss" { fmt.Println("Request path error") return false } token := r.URL.Query().Get("token") if !common.CheckToken(token) { applogger.Debug("token expired") return false } // Verification rules can also be customized according to other needs return true }, } // ShareConnect func ShareConnect(host, addr string) { go writeShare() go offLineStock() http.HandleFunc("/quotes-share-wss", wsHandleShare) url := fmt.Sprintf("%v%v", host, addr) err := http.ListenAndServe(url, nil) if err != nil { log.Fatal("ListenAndServe: ", err) } } func wsHandleShare(w http.ResponseWriter, r *http.Request) { // Obtain a link through the upgraded upgrade tool conn, err := upServer.Upgrade(w, r, nil) if err != nil { applogger.Info("Failed to obtain connection:%v", err) return } // Register users after successful connection client := &Client{ Id: uuid.NewV4().String(), conn: conn, msg: make(chan []byte), symbol: sync.Map{}, mux: sync.Mutex{}, } readShare(client) } func setTotalNum(num int) { mutexTotal.Lock() defer mutexTotal.Unlock() TotalNum += num } func getTotalNum() { mutexTotal.RLock() defer mutexTotal.RUnlock() applogger.Debug("number of colleagues online :%v", TotalNum) } // Read the message sent by the client and process the return response func readShare(cl *Client) { defer cl.conn.Close() setTotalNum(1) getTotalNum() for { _, msg, err := cl.conn.ReadMessage() if err != nil { clearClientChan <- cl.conn applogger.Debug("user exit:%v", cl.conn.RemoteAddr().String()) return } // Process business logic psgMsg := model.SubMessage(string(msg)) if psgMsg != nil { switch psgMsg.Type { case "ping": // Receiving ping aloneSendStock(cl.conn, []byte(model.ReturnValue("pong"))) case "subscribe": // Receive subscription country, stock := getCountry(psgMsg.Symbol) if !dictionary.StockCountryMap[country] { applogger.Error(country, "incorrect subscription information 不属于合规的股票市场") aloneSendStock(cl.conn, []byte(model.ReturnValue("incorrect subscription information"))) clearClientChan <- cl.conn return } aloneSendStock(cl.conn, []byte(model.ReturnValue("subscribe success"))) //获取 服务是否 订阅该数据 mutexStock.RLock() conns, ok := wsStockConMap[psgMsg.Symbol] mutexStock.RUnlock() if ok { //查询client是否订阅 conns = checkClient(conns, cl.conn) } else { //添加订阅 conns = make([]*websocket.Conn, 0) conns = append(conns, cl.conn) } mutexStock.Lock() wsStockConMap[psgMsg.Symbol] = conns mutexStock.Unlock() applogger.Info("psgMsg.Symbol:%v,wsStockClient:%v", psgMsg.Symbol, wsStockConMap) if !ok { go cl.userPSubscribeUs(country, stock) } case "unSubscribe": // Receive unsubscribe applogger.Info("Received unsubscribe message body:", string(msg)) mutexStock.RLock() conns, ok := wsStockConMap[psgMsg.Symbol] mutexStock.RUnlock() if ok { //取消订阅 removeWsconnStock(conns, cl.conn, psgMsg.Symbol) } aloneSendStock(cl.conn, []byte(model.ReturnValue("unSubscribe success"))) applogger.Debug("Subscription type after current user deletion:%v", ok) default: // TODO: Handling other situations transmitted by customers applogger.Debug("Please provide accurate instructions......") } } } } // 废弃 客户端 func offLineStock() { for userConn := range clearClientChan { setTotalNum(-1) getTotalNum() for key, v := range wsStockConMap { removeWsconnStock(v, userConn, key) } } } // send message func aloneSendStock(conn *websocket.Conn, message []byte) error { mutexConn.Lock() defer mutexConn.Unlock() //applogger.Debug("aloneSendStock :%v",conn,message) conn.SetWriteDeadline(time.Now().Add(writeWait)) w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type if err != nil { //发送失败 applogger.Error("Failed to conn.NextWriter :%v", err) return err } if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground applogger.Error("Failed Write message :%v", err) return err } if err := w.Close(); err != nil { // Close write stream applogger.Error("Failed to close write stream:%v", err) return nil } return nil } // 清理客户端 func removeWsconnStock(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error { index := -1 for i, v := range conn { if v == userConn { index = i break } } if index >= 0 { conn = append(conn[:index], conn[index+1:]...) mutexStock.Lock() wsStockConMap[symbol] = conn mutexStock.Unlock() } return nil } func deleteWsconnStock(symbol string) { mutexStock.Lock() defer mutexStock.Unlock() delete(wsStockConMap, symbol) } // 广播 func broadcastWebSocketStock(msg []byte, symbol string) { //applogger.Debug("订阅客户端:%v---%v", wsStockConMap, symbol) mutexStock.RLock() conns, ok := wsStockConMap[symbol] mutexStock.RUnlock() if !ok { applogger.Error("Parsing data information:%v") return } total := len(conns) if total <= 0 { return } start := time.Now() connDiv := int(math.Ceil(float64(total) / float64(stockConnNum))) //分批并发推送 for i := 0; i < connDiv; i++ { startIndex := i * stockConnNum endIndex := (i + 1) * stockConnNum if endIndex > total { endIndex = total } wg := sync.WaitGroup{} for _, val := range conns[startIndex:endIndex] { wg.Add(1) go func(val *websocket.Conn, msg []byte) { defer wg.Done() aloneSendStock(val, msg) }(val, msg) } wg.Wait() } applogger.Info("broadcast WebSocket info : %v ;total:%v;time-consuming %v ", symbol, total, time.Since(start)) } func writeShare() { for message := range msgStockChan { if strings.Contains(string(message), "\"ev\":\"CAS\"") { // 外汇行情订阅 var subMsg mol.ForexJsonData if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } broadcastWebSocketStock(message, fmt.Sprintf("%s.Forex", subMsg.Pair)) } else if strings.Contains(string(message), "\"ev\":\"CAS-D\"") { // 外汇行情天订阅 var subMsg mol.ForexJsonData if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } broadcastWebSocketStock(message, fmt.Sprintf("%s.DayForex", subMsg.Pair)) } else if strings.Contains(string(message), "\"ev\":\"C\"") { // 外汇买一卖一报价 var subMsg mol.ForexLastQuote if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } broadcastWebSocketStock(message, fmt.Sprintf("%s.LastForex", subMsg.P)) } else if strings.Contains(string(message), "\"ev\":\"T\"") { var subMsg mol.ForexTrade if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } broadcastWebSocketStock(message, fmt.Sprintf("%s.TradeForex", subMsg.Code)) } else { var subMsg StockMessage if err := json.Unmarshal(message, &subMsg); err != nil { applogger.Error(err.Error()) } if subMsg.S == "" { if subMsg.IsStockIndex { // 指数行情订阅 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.StockCode, common.StockIndexPrefix)) } else if subMsg.IsOptionList { // 期权列表行情订阅 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionList))) } else if subMsg.IsOptionInfo { // 期权详情订阅 broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionInfo))) } else { // 东南亚股票市场行情订阅(tradingView) broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Symbol, common.CapitalizeFirstLetter(subMsg.Country))) } continue } // 美股行情订阅 broadcastWebSocketStock(message, fmt.Sprintf("%s.US", subMsg.S)) } } } func getCountry(symbol string) (string, string) { symbolArr := strings.Split(symbol, ".") if len(symbolArr) < 2 { applogger.Error("symbol 有误") return "", "" } county := symbolArr[len(symbolArr)-1] return county, symbol[0 : strings.Index(symbol, county)-1] } // 按市场订阅 func (cl *Client) userPSubscribeUs(country, symbol string) { mutexCountry.RLock() symbols, ok := countryMap[country] mutexCountry.RUnlock() //提加 mutexCountry.Lock() countryMap[country] = append(symbols, symbol) mutexCountry.Unlock() if ok { //applogger.Error(country, "已订阅") return } applogger.Debug(country, "start a stock subscription") pubSub := red.RedisClient.PSubscribe(fmt.Sprintf("*.%s", country)) defer func() { pubSub.Close() }() _, err := pubSub.Receive() if err != nil { applogger.Error("failed to receive from control PubSub,%v", zap.Error(err)) return } ch := pubSub.Channel() for msg := range ch { mutexStock.RLock() conns, ok := wsStockConMap[msg.Channel] mutexStock.RUnlock() //未订阅股票跳出 if !ok { continue } if len(conns) > 0 { msgStockChan <- []byte(msg.Payload) } else { deleteWsconnStock(msg.Channel) mutexCountry.RLock() symbols := countryMap[country] mutexCountry.RUnlock() index := -1 msgChannel := strings.Split(msg.Channel, ".") for i, v := range symbols { if v == msgChannel[0] { index = i break } } if index >= 0 { symbols = append(symbols[:index], symbols[index+1:]...) mutexCountry.Lock() countryMap[country] = symbols mutexCountry.Unlock() } // 退订 if len(symbols) <= 0 { mutexCountry.Lock() delete(countryMap, country) mutexCountry.Unlock() applogger.Debug("Starting unsubscribe.....", country) pubSub.PUnsubscribe(fmt.Sprintf("*.%s", country)) return } } } }