package marketwsscliert import ( "encoding/json" "fmt" "wss-pool/cmd/websocketservice" "wss-pool/config" "wss-pool/internal/data/business" red "wss-pool/internal/redis" "wss-pool/logging/applogger" "wss-pool/pkg/hbwssclient/marketwssclient" "wss-pool/pkg/model/market" ) /* 火币现货数据来源 https://huobiapi.github.io/docs/spot/v1/cn/#k-2 */ // subscribeDepth 市场深度行情数据 func subscribeDepth(symbolList map[string][]string) { client := new(marketwssclient.DepthWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.Request(key, vue, config.Config.HbGather.HbSubUids) client.Subscribe(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeDepthResponse) if ok { if &depthResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: depthResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: depthResponse.Channel}) red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage)) applogger.Info("subscribeDepth data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeLevelMbp 市场深度MBP行情数据(增量推送)(150挡) func subscribeLevelMbp(symbolList map[string][]string) { client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: depthResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: depthResponse.Channel}) red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage)) applogger.Info("subscribeLevelMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // subscribeFullMbp 市场深度MBP行情数据(全量推送) func subscribeFullMbp(symbolList map[string][]int) { client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.SubscribeFull(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: depthResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: depthResponse.Channel}) red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage)) applogger.Info("subscribeFullMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribeFull(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeSubMbp 市场深度MBP行情数据(增量推送) func subscribeSubMbp(symbolList map[string][]int) { client := new(marketwssclient.MarketByPriceTickWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.Request(key, vue, config.Config.HbGather.HbSubUids) client.Subscribe(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: depthResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: depthResponse.Channel}) red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage)) applogger.Info("subscribeSubMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeBbo 买一卖一逐笔行情 func subscribeBbo(symbolList map[string][]string) { client := new(marketwssclient.BestBidOfferWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { bboResponse, ok := resp.(market.SubscribeBestBidOfferResponse) if ok { if bboResponse.Tick != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: bboResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: bboResponse.Channel}) red.RedisClient.Publish(bboResponse.Channel, string(jsonMessage)) } applogger.Info("subscribeBbo data,ServersId:%v,Sender:%v,Content:%v-%v", bboResponse.Channel, Cli.Id, bboResponse.Tick, nil) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Connection closed") } // subscribeKLine K线数据 func subscribeKLine(symbolList map[string][]string) { client := new(marketwssclient.CandlestickWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for symbol, period := range symbolList { for _, value := range period { client.Subscribe(symbol, value, config.Config.HbGather.HbSubUids) } } }, func(response interface{}) { resp, ok := response.(market.SubscribeCandlestickResponse) if ok { if &resp != nil { if resp.Tick != nil || resp.Data != nil { //插针 jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel, }) //applogger.Info("k line JSON --- %s",string(jsonMessage)) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) //实时数据入库 applogger.Info("k line resp %v", resp.Data) go business.UpdateWsMgo(resp) } applogger.Info("subscribeKLine data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, period := range symbolList { for _, value := range period { client.UnSubscribe(symbol, value, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // subscribeTrade 成交明细 func subscribeTrade(symbolList map[string][]string) { client := new(marketwssclient.TradeWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeTradeResponse) if ok { if &depthResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: depthResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: depthResponse.Channel}) red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage)) applogger.Info("subscribeTrade data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // subscribeLast24h 市场概要 func subscribeLast24h(symbolList map[string][]string) { // Initialize a new instance client := new(marketwssclient.Last24hCandlestickWebSocketClient).Init(config.Config.HbGather.HbHost) // Set the callback handlers client.SetHandler( // Connected handler func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, // Response handler func(resp interface{}) { candlestickResponse, ok := resp.(market.SubscribeLast24hCandlestickResponse) if ok { if &candlestickResponse != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: candlestickResponse.Channel, Sender: Cli.Id, Content: resp, Symbol: candlestickResponse.Channel}) red.RedisClient.Publish(candlestickResponse.Channel, string(jsonMessage)) applogger.Info("subscribeLast24h data,ServersId:%v,Sender:%v,Content:%v-%v", candlestickResponse.Channel, Cli.Id, candlestickResponse.Tick, candlestickResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) // Connect to the wss and wait for the handler to handle the response client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // subscribeTicker 市场聚合行情(ticker) func subscribeTicker(symbolList map[string][]string) { client := new(marketwssclient.TickerWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for symbol, _ := range symbolList { client.Subscribe(symbol, config.Config.HbGather.HbSubUids) } }, func(response interface{}) { resp, ok := response.(market.TickerWebsocketResponse) if ok { if &resp != nil { if resp.Tick != nil || resp.Data != nil { jsonMessage, _ := json.Marshal(websocketservice.Message{ ServersId: resp.Channel, Sender: Cli.Id, Content: resp, Symbol: resp.Channel}) red.RedisClient.Publish(resp.Channel, string(jsonMessage)) applogger.Info("subscribeTicker data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, _ := range symbolList { client.UnSubscribe(symbol, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") }