package websocketservice import ( "encoding/json" "errors" "fmt" "github.com/gorilla/websocket" "github.com/satori/go.uuid" "go.uber.org/zap" "log" "net/http" "strconv" "strings" "sync" "time" "wss-pool/config" "wss-pool/internal" "wss-pool/internal/gzip" "wss-pool/internal/model" red "wss-pool/internal/redis" "wss-pool/logging/applogger" "wss-pool/pkg/model/market" "wss-pool/pkg/model/stock" ) const ( // Time allowed to write a message to the peer. writeWait = 10 * time.Second // Time allowed to read the next pong message from the peer. pongWait = 60 * time.Second // Send pings to peer with this period. Must be less than pongWait. pingPeriod = (pongWait * 9) / 10 // Maximum message size allowed from peer. maxMessageSize = 512 SpotsStatus int = 1 ContractStatus int = 2 RedisCONTRACT string = "CONTRACT:LIST" RedisDIGITAL string = "DIGITAL:LIST" StatusPass int = 1 ) // Define a websocket connection object that contains information for each connection type User struct { Id string // Client ID conn *websocket.Conn // Define websocket link objects msg chan []byte // Define messages received and distributed symbol sync.Map // Define a collection of user subscription types stop chan string start chan string mux sync.Mutex } var ( wsConMap = map[string][]*websocket.Conn{} mutex = sync.RWMutex{} msgChan = make(chan []byte) mutexSpotConn = sync.RWMutex{} ) // Define the message body for the output type Message struct { ServersId string `json:"serversId,omitempty"` // Service ID (used to receive the client push ID of the third-party push server) Sender string `json:"sender,omitempty"` // sender Recipient string `json:"recipient,omitempty"` // recipient Content any `json:"content,omitempty"` // send content Symbol string `json:"symbol,omitempty"` // Subscription type Logo string `json:"logo_link,omitempty"` // Subscription type KeepDecimal string `json:"keep_decimal"` } var SpotMarketCache = sync.Map{} var ContractCache = sync.Map{} // Define an UpGrader to upgrade a regular HTTP connection to a websocket connection var up = &websocket.Upgrader{ // Define read/write buffer size WriteBufferSize: 1024, ReadBufferSize: 1024, // Verification request CheckOrigin: func(r *http.Request) bool { // If it is not a get request, return an error if r.Method != "GET" { fmt.Println("Request method error") return false } // If chat is not included in the path, an error is returned if r.URL.Path != "/quotes-wss" { fmt.Println("Request path error") return false } // Verification rules can also be customized according to other needs return true }, } func Connect(host, addr string) { //广播启动 go write() http.HandleFunc("/quotes-wss", wsHandle) url := fmt.Sprintf("%v%v", host, addr) fmt.Println(url) err := http.ListenAndServe(url, nil) if err != nil { log.Fatal("ListenAndServe: ", err) } } func wsHandle(w http.ResponseWriter, r *http.Request) { // Obtain a link through the upgraded upgrade tool conn, err := up.Upgrade(w, r, nil) if err != nil { applogger.Info("获取连接失败:%v", err) return } // Register users after successful connection user := &User{ Id: uuid.NewV4().String(), conn: conn, msg: make(chan []byte), stop: make(chan string), start: make(chan string), symbol: sync.Map{}, mux: sync.Mutex{}, } read(user) //go hearBeat(user) } // Read the message sent by the client and process the return response func read(user *User) { defer func() { user.conn.Close() }() for { _, msg, err := user.conn.ReadMessage() if err != nil { offLine(user.conn) applogger.Info("user exit:%v", user.conn.RemoteAddr().String()) return } // Process business logic psgMsg := model.SubMessage(string(msg)) fmt.Println(psgMsg) if psgMsg != nil { switch psgMsg.Type { case "ping": // Receiving ping aloneSend(user.conn, []byte(model.ReturnValue("pong"))) case "subscribe": // Receive subscription applogger.Info("接收到订阅消息体:", string(msg)) aloneSend(user.conn, []byte(model.ReturnValue("subscribe success"))) //获取 服务是否 订阅该数据 mutex.RLock() conns, ok := wsConMap[psgMsg.Symbol] mutex.RUnlock() if ok { //查询client是否订阅 conns = checkClient(conns, user.conn) } else { //添加订阅 conns = make([]*websocket.Conn, 0) conns = append(conns, user.conn) } applogger.Info("psgMsg.Symbol", psgMsg.Symbol) mutex.Lock() wsConMap[psgMsg.Symbol] = conns mutex.Unlock() if !ok { go user.userSubscribe(psgMsg.Symbol) } case "unSubscribe": // Receive unsubscribe applogger.Info("Received unsubscribe message body:", string(msg)) mutex.RLock() conns, ok := wsConMap[psgMsg.Symbol] mutex.RUnlock() if ok { //取消订阅 removeWsconn(conns, user.conn, psgMsg.Symbol) } aloneSend(user.conn, []byte(model.ReturnValue("unSubscribe success"))) applogger.Debug("Subscription type after current user deletion:%v", ok) default: // TODO: Handling other situations transmitted by customers } } } } // 清理客户端 func removeWsconn(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error { index := -1 for i, v := range conn { if v == userConn { index = i break } } if index >= 0 { conn = append(conn[:index], conn[index+1:]...) mutex.Lock() wsConMap[symbol] = conn mutex.Unlock() } return nil } func deleteWsconn(symbol string) { mutex.Lock() defer mutex.Unlock() delete(wsConMap, symbol) } // 检查是否订阅过 func checkClient(conns []*websocket.Conn, conn *websocket.Conn) []*websocket.Conn { for _, v := range conns { if v == conn { return conns } } conns = append(conns, conn) return conns } // 广播 func broadcastWebSocket(msg []byte, symbol string) { mutex.RLock() conns, ok := wsConMap[symbol] mutex.RUnlock() if !ok { applogger.Error("Parsing data information:%v") return } for _, val := range conns { if err := aloneSend(val, msg); err != nil { removeWsconn(conns, val, symbol) } } } // 断开连接 清理订阅 func offLine(userConn *websocket.Conn) { for key, v := range wsConMap { removeWsconn(v, userConn, key) } } // send message func aloneSend(conn *websocket.Conn, message []byte) error { mutexSpotConn.Lock() defer mutexSpotConn.Unlock() conn.SetWriteDeadline(time.Now().Add(writeWait)) w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type if err != nil { //发送失败 applogger.Error("Failed to conn.NextWriter :%v", err) return err } if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground applogger.Error("Failed Write message :%v", err) return err } if err := w.Close(); err != nil { // Close write stream applogger.Error("Failed to close write stream:%v", err) return nil } return nil } //func hearBeat(user *User) { // defer func() { // user.conn.Close() // }() // ticker := time.NewTicker(writeWait) // Set timing // for { // select { // case <-ticker.C: // TODO: Need to add a server to ping the client // //发送心跳 // if err := aloneSend(user.conn, []byte(model.ReturnValue("ping"))); err != nil { // applogger.Error("Failed to send heartbeat message: ", err) // //ping 不通 清理客户端 // offLine(user.conn) // return // } // } // } //} // Producer sends messages func write() { for message := range msgChan { // TODO: 延迟200毫秒 time.Sleep(20 * time.Millisecond) var subMsg Message json.Unmarshal(message, &subMsg) //广播 broadcastWebSocket(message, subMsg.Symbol) applogger.Info("broadcast WebSocket Sub message info:%v", subMsg.Symbol) } } // User subscription and cancellation func (u *User) userSubscribe(symbol string) { applogger.Info("新交易对", symbol) pubSub := red.RedisClient.Subscribe(symbol) defer func() { pubSub.Close() }() _, err := pubSub.Receive() if err != nil { applogger.Error("failed to receive from control PubSub,%v", zap.Error(err)) return } //fmt.Println(pubSub) ch := pubSub.Channel() for msg := range ch { var subMsg Message //合约深度解压包 isSubscribeCtDepth := strings.Contains(symbol, "USDT.depth.step") switch isSubscribeCtDepth { case true: content, err := gzip.GZipDecompress([]byte(msg.Payload)) if err != nil { applogger.Error("UnGZip data error: %s", err) return } result := market.SubscribeCtDepthResponse{} if err := json.Unmarshal([]byte(content), &result); err != nil { applogger.Error("Depth Unmarshal ", err) close(u.msg) return } //fmt.Println(result) subMsg.Symbol = result.Channel subMsg.Content = result subMsg.ServersId = result.Channel message, _ := json.Marshal(subMsg) msg.Payload = string(message) default: //applogger.Info("Subscribe date:%v", msg.Payload) if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("Parsing data information:%v", err) close(u.msg) return } } //fmt.Println("返回数据 : ", msg.Payload) //交易全被被取消订阅 mutex.RLock() conns := wsConMap[subMsg.Symbol] mutex.RUnlock() if len(conns) > 0 { msgChan <- []byte(msg.Payload) } else { applogger.Debug("Starting unsubscribe.....", symbol) deleteWsconn(symbol) pubSub.Unsubscribe(symbol) return } } } func HashValue(hashListName string) []stock.PHPData { keys := red.Scan(hashListName) result := make([]stock.PHPData, 0) for _, v := range keys { res, _ := red.HGetAll(v) fmt.Println(res) item := stock.PHPData{} for field, value := range res { switch field { case "name": item.Name = value case "code": item.Code = value case "keep_decimal": item.KeepDecimal = value case "face_value": item.FaceValue, _ = strconv.Atoi(value) case "max_pry": item.MaxPry, _ = strconv.Atoi(value) case "min_pry": item.MinPry, _ = strconv.Atoi(value) case "logo_link": item.LogoLink = value case "status": item.Status, _ = strconv.Atoi(value) } } //if item.Status == StatusPass { result = append(result, item) //} } return result } func HashValueOnce(key string) (stock.PHPData, error) { res, _ := red.HGetAll(key) item := stock.PHPData{} for field, value := range res { switch field { case "name": item.Name = value case "code": item.Code = value case "keep_decimal": item.KeepDecimal = value case "face_value": item.FaceValue, _ = strconv.Atoi(value) case "max_pry": item.MaxPry, _ = strconv.Atoi(value) case "min_pry": item.MinPry, _ = strconv.Atoi(value) case "logo_link": item.LogoLink = value case "status": item.Status, _ = strconv.Atoi(value) } } if item.Status == StatusPass { return item, nil } return stock.PHPData{}, errors.New("data is null") } // market_type //1现货,2合约,3美股 func PHPResutl(market_type int) stock.PHPRes { var eodModel stock.PHPRes url := fmt.Sprintf("https://%v/bs/market", config.Config.HbApi.PHPHost) bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":"%d","trade_name":"","page":1,"page_size":1000}`, market_type)) if err != nil { applogger.Error("Failed to query data:%v", err) return eodModel } if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil { applogger.Error("eodModel json Unmarshal err: %v", err) return eodModel } return eodModel } // market_type 1现货,2合约,3美股 func PHPMarketTrade(market_type, num int, trade_name string) stock.PHPMarketTradeList { var eodModel stock.PHPMarketTradeList url := fmt.Sprintf("https://%v/bs/market_trade", config.Config.HbApi.PHPHost) fmt.Println(url) bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":%d,"trade_name":"%s","num":%d}`, market_type, trade_name, num)) fmt.Println(bodyStr) if err != nil { applogger.Error("Failed to query data:%v", err) return eodModel } if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil { applogger.Error("eodModel json Unmarshal err: %v", err) return eodModel } return eodModel } func SubscriptionCache() { spot := HashValue(RedisDIGITAL) contract := HashValue(RedisCONTRACT) applogger.Info("spot :", spot, "contract :", contract) for _, v := range spot { symbol := fmt.Sprintf("market.%susdt.kline.1day", strings.ToLower(v.Name)) go UserMainSubscribe(symbol, SpotsStatus, v.Name) } for _, v := range contract { symbol := fmt.Sprintf("market.%s.kline.1day", v.Name) go UserMainSubscribe(symbol, ContractStatus, v.Name) } } // 1 合约 2 现货 func UserMainSubscribe(symbol string, market_type int, name string) { applogger.Info("新交易对", symbol) var subMsg Message pubSub := red.RedisClient.Subscribe(symbol) defer func() { pubSub.Close() }() _, err := pubSub.Receive() if err != nil { applogger.Error("failed to receive from control PubSub,%v", zap.Error(err)) return } ch := pubSub.Channel() for msg := range ch { //applogger.Info("Subscribe date:%v", msg.Payload) if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil { applogger.Error("Parsing data information:%v", err) return } if subMsg.Symbol == "" { applogger.Error("symbol not data :%v", err) continue } if market_type == SpotsStatus { val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisDIGITAL, name)) if err != nil { //applogger.Error(name, err.Error()) SpotMarketCache.Delete(strings.ToLower(name)) continue } subMsg.Logo = val.LogoLink subMsg.Symbol = strings.ToLower(val.Name) subMsg.KeepDecimal = val.KeepDecimal SpotMarketCache.Store(strings.ToLower(name), subMsg) } else { val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisCONTRACT, name)) if err != nil { //applogger.Error(name, err.Error()) ContractCache.Delete(name) continue } subMsg.Logo = val.LogoLink subMsg.Symbol = val.Name subMsg.KeepDecimal = val.KeepDecimal ContractCache.Store(name, subMsg) } } } // Send data to websocket websocketServer func (u *User) send(data string) (err error) { if u.conn == nil { applogger.Error("WebSocket sent error: no connection available") return err } u.mux.Lock() err = u.conn.WriteMessage(websocket.TextMessage, []byte(data)) u.mux.Unlock() return err }