package socket import ( "fmt" "github.com/gorilla/websocket" "matchmaking-system/internal/pkg/flags" "matchmaking-system/internal/pkg/logging/applogger" "matchmaking-system/internal/pkg/logging/common" "matchmaking-system/internal/pkg/setting" "net/http" "strconv" "strings" "sync" "time" uuid "github.com/satori/go.uuid" ) // Client // @Description: wss client type Client struct { Id string // Client ID conn *websocket.Conn // Define websocket link object msg chan []byte // Define messages received and distributed symbol sync.Map // Concurrent Security - Manage User Subscription Types mux sync.Mutex // Mutual exclusive lock (used for single message sending) token string // Obtain token information (subscribe to user pending and holding orders through token information: order userId) } // WsHandlerOrder // // @Description: wss start // @param w // @param r func WsHandlerOrder(w http.ResponseWriter, r *http.Request) { // Start order WSS subscription service conn, err := upGraderOrder.Upgrade(w, r, nil) if err != nil { applogger.Error("upGrader_Order Upgrade err:%v", err) return } queryParams := r.URL.Query() token := queryParams.Get("token") tokenUserId := fmt.Sprintf("%v%v", flags.UserToken, token) if !flags.CheckSetting { applogger.Debug("WsHandlerOrder info %v", tokenUserId) } // Register users after successful connection client := &Client{ Id: uuid.NewV4().String(), conn: conn, msg: make(chan []byte), symbol: sync.Map{}, mux: sync.Mutex{}, token: tokenUserId, } go readShare(client) go writeShare(client) } // readShare // // @Description: read message // @param cl func readShare(cl *Client) { defer func() { if err := cl.conn.Close(); err != nil { return } }() for { _, msg, err := cl.conn.ReadMessage() if err != nil { applogger.Error("%v readShare.ReadMessage:%v", common.ErrContract, err) // Link failure cleaning subscription book cleanSubscriptionKey(cl) applogger.Warn("Cleanup subscription key, successful.") return } // Process business logic psgMsg := SubMessage(string(msg)) if psgMsg != nil { switch psgMsg.Type { case "ping": // Receiving ping cl.msg <- []byte(ReturnValue("pong")) case "subscribe": // Receive subscription applogger.Info("Receive client Subscribe type:%v", string(msg)) // Handle duplicate subscriptions, handle link limits _, ok := cl.symbol.Load(psgMsg.Symbol) if ok { applogger.Error("Cannot repeat subscription.") } else { // Write subscription topic manager cl.symbol.Store(psgMsg.Symbol, psgMsg.Order) // TODO: 特殊处理p1的pc端的订单列表订阅问题 var makeType int if strings.Contains(psgMsg.Symbol, "-") { splitStr := strings.Split(psgMsg.Symbol, "-") if len(splitStr) == 2 { psgMsg.Symbol = splitStr[0] makeType, err = strconv.Atoi(splitStr[1]) if err != nil { makeType = 0 } } } else { makeType = 0 } switch psgMsg.Symbol { case setting.SpotsSubscribe: // Spots go cl.orderSubSpotsSubscribe(psgMsg) case setting.ContractSubscribe: // Contract go cl.orderSubContractSubscribe(psgMsg) case setting.SecondSubscribe: // Second go cl.orderSubSecondSubscribe(psgMsg) case setting.ForexSubscribe: // Forex go cl.orderSubForexSubscribe(psgMsg) case setting.MoneySubscribe: // Money go cl.orderSubMoneySubscribe(psgMsg, makeType) case setting.ShareUsSubscribe: // ShareUs go cl.orderSubShareUsSubscribe(psgMsg) case setting.ShareMysSubscribe: // ShareMys go cl.orderSubShareMysSubscribe(psgMsg) case setting.ShareThaSubscribe: // ShareTha go cl.orderSubShareThaSubscribe(psgMsg) case setting.ShareIdnSubscribe: // ShareIdn go cl.orderSubShareIdnSubscribe(psgMsg) case setting.ShareInrSubscribe: // ShareInr go cl.orderSubShareInrSubscribe(psgMsg) case setting.ShareSgdSubscribe: // shareGbx go cl.orderSubShareSgdSubscribe(psgMsg) case setting.ShareGbxSubscribe: // ShareSgd go cl.orderSubShareGbxSubscribe(psgMsg) case setting.ShareEurSubscribe: // ShareEur go cl.orderSubShareEurSubscribe(psgMsg) case setting.ShareFurSubscribe: // ShareFur go cl.orderSubShareFurSubscribe(psgMsg) case setting.ShareJpySubscribe: // ShareJpy go cl.orderSubShareJpySubscribe(psgMsg) case setting.ShareBrlSubscribe: // ShareBrl go cl.orderSubShareBrlSubscribe(psgMsg) case setting.SubscribeShareHkd: // ShareHkd go cl.orderSubShareHkdSubscribe(psgMsg) case setting.SubscribeOptionInr: // OptionInr go cl.orderSubOptionInrSubscribe(psgMsg) case setting.SubscribeSumOptionInr: // OptionInrSum go cl.orderSubSumOptionInrSubscribe(psgMsg) case setting.SubscribeShareBlk: // shareBlk go cl.orderSubShareBlkSubscribe(psgMsg) case setting.SpotsMarketSubscribe: // spots Market go cl.orderSubSpotsMarketSubscribe(psgMsg) case setting.ContractMarketSubscribe: // contract Market go cl.orderSubContractMarketSubscribe(psgMsg) case setting.SecondMarketSubscribe: // second Market go cl.orderSubSecondMarketSubscribe(psgMsg) case setting.ShareForexMarketSubscribe: // shareForex Market cl.orderSubShareForexMarketSubscribe(psgMsg) case setting.ShareMoneyMarketSubscribe: // shareMoney Market cl.orderSubShareMoneyMarketSubscribe(psgMsg) case setting.ShareUsMarketSubscribe: // shareUs Market go cl.orderSubShareUsMarketSubscribe(psgMsg) case setting.ShareInrMarketSubscribe: // shareInr Market go cl.orderSubShareInrMarketSubscribe(psgMsg) case setting.ShareMysMarketSubscribe: // shareMys Market go cl.orderSubShareMysMarketSubscribe(psgMsg) case setting.ShareThaMarketSubscribe: // shareTha Market go cl.orderSubShareThaMarketSubscribe(psgMsg) case setting.ShareSgdMarketSubscribe: // shareSgd Market go cl.orderSubShareSgdMarketSubscribe(psgMsg) case setting.ShareFurMarketSubscribe: // shareFur Market go cl.orderSubShareFurMarketSubscribe(psgMsg) case setting.ShareEurMarketSubscribe: // shareEur Market go cl.orderSubShareEurMarketSubscribe(psgMsg) case setting.ShareBrlMarketSubscribe: // shareBrl Market go cl.orderSubShareBrlMarketSubscribe(psgMsg) case setting.ShareIdnMarketSubscribe: // shareIdn Market go cl.orderSubShareIdnMarketSubscribe(psgMsg) case setting.ShareGbxMarketSubscribe: // shareGbx Market go cl.orderSubShareGbxMarketSubscribe(psgMsg) case setting.ShareHkdMarketSubscribe: // shareHkd Market go cl.orderSubShareHkdMarketSubscribe(psgMsg) case setting.ShareJpyMarketSubscribe: // shareJpy Market go cl.orderSubShareJpyMarketSubscribe(psgMsg) case setting.AdminContractSubscribe: // Admin Contract go cl.orderSubAdminContractSubscribeByOrder(psgMsg) case setting.AdminSecondSubscribe: // Admin Second go cl.orderSubAdminSecondSubscribeByOrder(psgMsg) case setting.AdminForexSubscribe: // Admin Forex go cl.orderSubAdminForexSubscribeByOrder(psgMsg) case setting.AdminMoneySubscribe: // Admin Money go cl.orderSubAdminMoneySubscribeByOrder(psgMsg) case setting.AdminShareUsSubscribe: // Admin ShareUs go cl.orderSubAdminShareUsSubscribeByOrder(psgMsg) case setting.AdminShareMysSubscribe: // Admin ShareMys go cl.orderSubAdminShareMysSubscribeByOrder(psgMsg) case setting.AdminShareThaSubscribe: // Admin ShareTha go cl.orderSubAdminShareThaSubscribeByOrder(psgMsg) case setting.AdminShareIdnSubscribe: // Admin ShareIdn go cl.orderSubAdminShareIdnSubscribeByOrder(psgMsg) case setting.AdminShareInrSubscribe: // Admin ShareInr go cl.orderSubAdminShareInrSubscribeByOrder(psgMsg) case setting.AdminShareSgdSubscribe: // admin ShareSgd go cl.orderSubAdminShareSgdSubscribeByOrder(psgMsg) case setting.AdminShareGbxSubscribe: // admin ShareGbx go cl.orderSubAdminShareGbxSubscribeByOrder(psgMsg) case setting.AdminShareEurSubscribe: // admin ShareEur go cl.orderSubAdminShareEurSubscribeByOrder(psgMsg) case setting.AdminShareFurSubscribe: // admin ShareFur go cl.orderSubAdminShareFurSubscribeByOrder(psgMsg) case setting.AdminShareJpySubscribe: // admin ShareJpy go cl.orderSubAdminShareJpySubscribeByOrder(psgMsg) case setting.AdminShareBrlSubscribe: // admin ShareBrl go cl.orderSubAdminShareBrlSubscribeByOrder(psgMsg) case setting.SubscribeAdminShareHkd: // admin ShareHkd go cl.orderSubAdminShareHkdSubscribeByOrder(psgMsg) case setting.SubscribeAdminOptionInr: // admin OptionInr go cl.orderSubAdminOptionInrSubscribeByOrder(psgMsg) case setting.SubscribeAdminShareBlk: // admin ShareBlk go cl.orderSubAdminShareBlkSubscribeByOrder(psgMsg) case setting.AdminContractSumSubscribe: // Admin Contract Sum go cl.orderSubAdminContractSubscribeBySum(psgMsg) case setting.AdminForexSumSubscribe: // Admin Forex Sum go cl.orderSubAdminForexSubscribeBySum(psgMsg) case setting.AdminMoneySumSubscribe: // Admin Money Sum go cl.orderSubAdminMoneySubscribeBySum(psgMsg) case setting.AdminShareUsSumSubscribe: // Admin ShareUs Sum go cl.orderSubAdminShareUsSubscribeBySum(psgMsg) case setting.AdminShareMysSumSubscribe: // Admin ShareMys Sum go cl.orderSubAdminShareMysSubscribeBySum(psgMsg) case setting.AdminShareThaSumSubscribe: // Admin ShareTha Sum go cl.orderSubAdminShareThaSubscribeBySum(psgMsg) case setting.AdminShareIdnSumSubscribe: // Admin ShareIdn Sum go cl.orderSubAdminShareIdnSubscribeBySum(psgMsg) case setting.AdminShareInrSumSubscribe: // Admin ShareInr Sum go cl.orderSubAdminShareInrSubscribeBySum(psgMsg) case setting.AdminShareSgdSumSubscribe: // Admin ShareSgd Sum go cl.orderSubAdminShareSgdSubscribeBySum(psgMsg) case setting.AdminShareGbxSumSubscribe: // Admin ShareGbx Sum go cl.orderSubAdminShareGbxSubscribeBySum(psgMsg) case setting.AdminShareHkdSumSubscribe: // Admin ShareHkd Sum go cl.orderSubAdminShareHkdSubscribeBySum(psgMsg) case setting.AdminShareEurSumSubscribe: // Admin ShareEur Sum go cl.orderSubAdminShareEurSubscribeBySum(psgMsg) case setting.AdminShareFurSumSubscribe: // Admin ShareFur Sum go cl.orderSubAdminShareFurSubscribeBySum(psgMsg) case setting.AdminShareJpySumSubscribe: // Admin ShareJpy Sum go cl.orderSubAdminShareJpySubscribeBySum(psgMsg) case setting.AdminShareBrlSumSubscribe: // Admin ShareBrl Sum go cl.orderSubAdminShareBrlSubscribeBySum(psgMsg) case setting.AdminOptionInrSumSubscribe: // Admin OptionInr Sum go cl.orderSubAdminOptionInrSubscribeBySum(psgMsg) case setting.AdminShareBlkSumSubscribe: // Admin ShareBlk Sum go cl.orderSubAdminShareBlkSubscribeBySum(psgMsg) case setting.AdminShareUserSumSubscribe: // Admin UserShare Sum go cl.orderSubAdminUserShareSubscribeBySum(psgMsg) default: cl.msg <- []byte(ReturnValue("Please enter the correct subscription key.")) } } // Notify user of successful subscription cl.msg <- []byte(ReturnValue("subscribe success")) case "unSubscribe": // Receive unsubscribe applogger.Info("Receive client unSubscribe type:%v,%v", psgMsg.Symbol, cl.token) symbol, _ := cl.symbol.Load(psgMsg.Symbol) applogger.Info("Get current subscription type:%v", symbol) // Delete the specified topic in the subscription topic manager cl.symbol.Delete(psgMsg.Symbol) symbolD, _ := cl.symbol.Load(psgMsg.Symbol) applogger.Info("Subscription type after current user deletion:%v", symbolD) cl.msg <- []byte(ReturnValue("unSubscribe success")) default: // TODO: Handling other situations transmitted by customers applogger.Info("Please provide accurate instructions......") } } } } // writeShare // // @Description: write message // @param cl func writeShare(cl *Client) { ticker := time.NewTicker(pingPeriod) defer func() { ticker.Stop() if err := cl.conn.Close(); err != nil { return } }() for { select { case message, ok := <-cl.msg: // send a message // Set the dead time for writing, equivalent to HTTP timeout if !ok { // If the value is incorrect, close the connection, set the write status, and corresponding data if err := cl.send(flags.SetNull); err != nil { applogger.Error("%v writeShare.send:%v", common.ErrContract, err) } cleanSubscriptionKey(cl) return } // Write data in the form of io, with parameters of data type w, err := cl.conn.NextWriter(websocket.TextMessage) if err != nil { return } // Write data, this function is truly used to transmit data to the foreground _, err = w.Write(message) if err = w.Close(); err != nil { // Close write stream applogger.Error("%v writeShare.Close:%v", common.ErrContract, err) cleanSubscriptionKey(cl) return } case <-ticker.C: // TODO: Need to ping if the client is online } } } // cleanSubscriptionKey // // @Description: clean sub key // @param cl func cleanSubscriptionKey(cl *Client) { // 用户订单列表订阅 cl.symbol.Delete(setting.SpotsSubscribe) // 现货 cl.symbol.Delete(setting.ContractSubscribe) // 合约 cl.symbol.Delete(setting.SecondSubscribe) // 秒合约 cl.symbol.Delete(setting.ForexSubscribe) // 外汇 cl.symbol.Delete(setting.MoneySubscribe) // 综合(现货|合约|外汇) cl.symbol.Delete(setting.ShareUsSubscribe) // 美股 cl.symbol.Delete(setting.ShareIdnSubscribe) // 印尼股 cl.symbol.Delete(setting.ShareThaSubscribe) // 泰股 cl.symbol.Delete(setting.ShareInrSubscribe) // 印度股 cl.symbol.Delete(setting.ShareMysSubscribe) // 马股 cl.symbol.Delete(setting.ShareSgdSubscribe) // 新加坡股 cl.symbol.Delete(setting.ShareGbxSubscribe) // 英股 cl.symbol.Delete(setting.ShareEurSubscribe) // 德股 cl.symbol.Delete(setting.ShareFurSubscribe) // 法股 cl.symbol.Delete(setting.ShareBrlSubscribe) // 巴西股 cl.symbol.Delete(setting.ShareJpySubscribe) // 日股 cl.symbol.Delete(setting.SubscribeShareHkd) // 港股 cl.symbol.Delete(setting.SubscribeOptionInr) // 印度期权股 cl.symbol.Delete(setting.SubscribeSumOptionInr) // 印度期权总浮动盈亏 cl.symbol.Delete(setting.SubscribeShareBlk) // 股票大宗 // 管理员订单列表订阅 cl.symbol.Delete(setting.AdminContractSubscribe) // 管理员合约 cl.symbol.Delete(setting.AdminSecondSubscribe) // 管理员秒合约 cl.symbol.Delete(setting.AdminForexSubscribe) // 管理员外汇 cl.symbol.Delete(setting.AdminMoneySubscribe) // 管理员综合(现货|合约|外汇) cl.symbol.Delete(setting.AdminShareUsSubscribe) // 管理员美股 cl.symbol.Delete(setting.AdminShareThaSubscribe) // 管理员泰股 cl.symbol.Delete(setting.AdminShareIdnSubscribe) // 管理员印尼股 cl.symbol.Delete(setting.AdminShareInrSubscribe) // 管理员印度股 cl.symbol.Delete(setting.AdminShareMysSubscribe) // 管理员马股 cl.symbol.Delete(setting.AdminShareSgdSubscribe) // 管理员新加坡股 cl.symbol.Delete(setting.AdminShareGbxSubscribe) // 管理员英股 cl.symbol.Delete(setting.AdminShareEurSubscribe) // 管理员德股 cl.symbol.Delete(setting.AdminShareFurSubscribe) // 管理员法股 cl.symbol.Delete(setting.AdminShareBrlSubscribe) // 管理员巴西股 cl.symbol.Delete(setting.AdminShareJpySubscribe) // 管理员日股 cl.symbol.Delete(setting.SubscribeAdminShareHkd) // 管理员港股 cl.symbol.Delete(setting.SubscribeAdminOptionInr) // 管理员印度期权股 cl.symbol.Delete(setting.SubscribeAdminShareBlk) // 管理员股票大宗股 // 管理员持仓订单浮动盈亏 cl.symbol.Delete(setting.AdminContractSumSubscribe) // 管理员合约持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminForexSumSubscribe) // 管理员外汇持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminMoneySumSubscribe) // 管理员综合(现货|合约|外汇)持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareUsSumSubscribe) // 管理员美股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareThaSumSubscribe) // 管理员泰股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareIdnSumSubscribe) // 管理员印尼股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareInrSumSubscribe) // 管理员印度股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareMysSumSubscribe) // 管理员马股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareSgdSumSubscribe) // 管理员新加坡股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareGbxSumSubscribe) // 管理员英股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareEurSumSubscribe) // 管理员德股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareFurSumSubscribe) // 管理员法股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareBrlSumSubscribe) // 管理员巴西股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareJpySumSubscribe) // 管理员日股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareHkdSumSubscribe) // 管理员港股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminOptionInrSumSubscribe) // 管理员印度期权股持仓订单浮动盈亏订阅 cl.symbol.Delete(setting.AdminShareBlkSumSubscribe) // 管理员大宗股持仓订单浮动盈亏订阅 // 管理员用户持仓浮动盈亏 cl.symbol.Delete(setting.AdminShareUserSumSubscribe) // 用户股票-外汇市场统计 cl.symbol.Delete(setting.ShareUsMarketSubscribe) // 美股市场资产统计订阅 cl.symbol.Delete(setting.ShareMysMarketSubscribe) // 马股市场资产统计订阅 cl.symbol.Delete(setting.ShareThaMarketSubscribe) // 泰股市场资产统计订阅 cl.symbol.Delete(setting.ShareSgdMarketSubscribe) // 新加坡股市场资产统计订阅 cl.symbol.Delete(setting.ShareInrMarketSubscribe) // 印度股市场资产统计订阅 cl.symbol.Delete(setting.ShareIdnMarketSubscribe) // 印尼股市场资产统计订阅 cl.symbol.Delete(setting.ShareGbxMarketSubscribe) // 英股市场资产统计订阅 cl.symbol.Delete(setting.ShareEurMarketSubscribe) // 德股市场资产统计订阅 cl.symbol.Delete(setting.ShareFurMarketSubscribe) // 法股市场资产统计订阅 cl.symbol.Delete(setting.ShareHkdMarketSubscribe) // 港股市场资产统计订阅 cl.symbol.Delete(setting.ShareBrlMarketSubscribe) // 巴西股市场资产统计订阅 cl.symbol.Delete(setting.ShareJpyMarketSubscribe) // 日股市场资产统计订阅 cl.symbol.Delete(setting.ShareForexMarketSubscribe) // 外汇市场资产统计订阅 cl.symbol.Delete(setting.ShareMoneyMarketSubscribe) // 综合(现货|合约|外汇)市场资产统计订阅 cl.symbol.Delete(setting.ContractMarketSubscribe) // 合约市场资产统计订阅 cl.symbol.Delete(setting.SpotsMarketSubscribe) // 现货市场资产统计订阅 cl.symbol.Delete(setting.SecondMarketSubscribe) // 秒合约市场资产统计订阅 } // send // // @Description: message send // @receiver u // @param data // @return err func (u *Client) send(data string) (err error) { if u.conn == nil { applogger.Error("WebSocket sent error: no connection available") return err } u.mux.Lock() err = u.conn.WriteMessage(websocket.TextMessage, []byte(data)) u.mux.Unlock() return err }