package marketwsscliert import ( "encoding/json" "fmt" "wss-pool/cmd/websocketservice" "wss-pool/config" "wss-pool/internal/data/business" red "wss-pool/internal/redis" "wss-pool/logging/applogger" "wss-pool/pkg/hbwssclient/marketwssclient" "wss-pool/pkg/model/market" ) /*U本币合约数据 https://huobiapi.github.io/docs/usdt_swap/v1/cn/ */ // subscribeCtKline 合约kline数据 func subscribeCtKline(symbolList map[string][]string) { client := new(marketwssclient.ContractKLineWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { for symbol, period := range symbolList { for _, value := range period { client.Subscribe(symbol, value, config.Config.HbContract.HbContractSubUids) } } }, func(response interface{}) { resp, ok := response.(market.SubscribeCtKlineResponse) if ok { if &resp != nil { if resp.Tick != nil || resp.Data != nil { //插针 resp = RunModify(resp) jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) //实时数据入库 go business.UpdateSubscribeCtKline(resp) applogger.Info("subscribeCtKline data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, period := range symbolList { for _, value := range period { client.UnSubscribe(symbol, value, config.Config.HbContract.HbContractSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeCtDepth 合约深度数据源 func subscribeCtDepth(symbolList map[string][]string) { //改为一对一 发送 for symbol, period := range symbolList { for _, value := range period { topic := fmt.Sprintf("market.%s.depth.%s", symbol, value) client := new(marketwssclient.ContractDepthWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { client.Subscribe(symbol, value, config.Config.HbContract.HbContractSubUids) }, func(response interface{}) { resp, ok := response.(interface{}) if ok { if &resp != nil { //jsonMessage, _ := json.Marshal(websocketservice.Message{ // ServersId: topic, // Sender: Cli.Id, // Content: resp, // Symbol: topic}) //合约深度压缩数据 red.RedisClient.Publish(topic, resp) applogger.Info("subscribeCtDepth data,ServersId:%v,Sender:%v,Content:%v", topic, Cli.Id) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) defer client.Close() } } fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() client := new(marketwssclient.ContractDepthWebSocketClient).Init(config.Config.HbContract.HbContractHost) for symbol, period := range symbolList { for _, value := range period { client.UnSubscribe(symbol, value, config.Config.HbContract.HbContractSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeCtAddDepth 合约深度增量数据 func subscribeCtAddDepth(symbolList map[string][]string) { client := new(marketwssclient.ContractDepthSizeWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { for symbol, period := range symbolList { for _, value := range period { client.Subscribe(symbol, value, config.Config.HbContract.HbContractSubUids) } } }, func(response interface{}) { resp, ok := response.(market.SubscribeCtAddDepthResponse) if ok { if &resp != nil { if resp.Tick != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) applogger.Info("subscribeCtAddDepth data,ServersId:%v,Sender:%v,Content:%v", resp.Channel, Cli.Id, resp.Tick) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, period := range symbolList { for _, value := range period { client.UnSubscribe(symbol, value, config.Config.HbContract.HbContractSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeCtBbo TODO: 合约买一卖一逐笔行情数据 func subscribeCtBbo(symbolList map[string][]string) { client := new(marketwssclient.ContractBBOWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { for symbol, _ := range symbolList { client.Subscribe(symbol, config.Config.HbContract.HbContractSubUids) } }, func(response interface{}) { resp, ok := response.(market.SubscribeCtBboResponse) if ok { if &resp != nil { if resp.Tick != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) applogger.Info("subscribeCtBbo data,ServersId:%v,Sender:%v,Content:%v", resp.Channel, Cli.Id, resp.Tick) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, _ := range symbolList { client.UnSubscribe(symbol, config.Config.HbContract.HbContractSubUids) } client.Close() applogger.Info("Client closed") } // subscribeCtDetail TODO: 合约详情数据 func subscribeCtDetail(symbolList map[string][]string) { client := new(marketwssclient.ContractDetailWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { for symbol, _ := range symbolList { client.Subscribe(symbol, config.Config.HbContract.HbContractSubUids) } }, func(response interface{}) { resp, ok := response.(market.SubscribeCtDetailResponse) if ok { if &resp != nil { if resp.Tick != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) applogger.Info("subscribeCtDetail data,ServersId:%v,Sender:%v,Content:%v", resp.Channel, Cli.Id, resp.Tick) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, _ := range symbolList { client.UnSubscribe(symbol, config.Config.HbContract.HbContractSubUids) } client.Close() applogger.Info("Client closed") } // subscribeCtTradeDetail 合约贸易详情数据 func subscribeCtTradeDetail(symbolList map[string][]string) { client := new(marketwssclient.ContractTradeDetailWebSocketClient).Init(config.Config.HbContract.HbContractHost) client.SetHandler( func() { for symbol, _ := range symbolList { client.Subscribe(symbol, config.Config.HbContract.HbContractSubUids) } }, func(response interface{}) { resp, ok := response.(market.SubscribeCtTradeDetailResponse) if ok { if &resp != nil { if resp.Tick != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) applogger.Info("subscribeCtTradeDetail data,ServersId:%v,Sender:%v,Content:%v", resp.Channel, Cli.Id, resp.Tick) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, _ := range symbolList { client.UnSubscribe(symbol, config.Config.HbContract.HbContractSubUids) } client.Close() applogger.Info("Client closed") }