package business import ( "fmt" "go.mongodb.org/mongo-driver/bson" "wss-pool/config" "wss-pool/dictionary" "wss-pool/internal" "wss-pool/internal/data" "wss-pool/logging/applogger" "wss-pool/pkg/hbwssclient/marketwssclient" "wss-pool/pkg/model" "wss-pool/pkg/model/market" ) // 市场深度行情数据 func MgoSubscribeDepth() { symbolList := model.SymbolListString(dictionary.Depth) client := new(marketwssclient.DepthWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.Request(key, vue, config.Config.HbGather.HbSubUids) client.Subscribe(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeDepthResponse) if ok { if &depthResponse != nil { applogger.Info("subscribeDepth data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // 市场深度MBP行情数据(增量推送)(150挡) func MgoSubscribeLevelMbp() { symbolList := model.SymbolListString([]string{}) client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { applogger.Info("subscribeLevelMbp data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // 市场深度MBP行情数据(全量推送) func MgoSubscribeFullMbp() { symbolList := model.SymbolListInt(dictionary.LevelsRefresh) client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.SubscribeFull(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { applogger.Info("subscribeFullMbp data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribeFull(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // 市场深度MBP行情数据(增量推送) func MgoSubscribeSubMbp() { symbolList := model.SymbolListInt(dictionary.LevelsMbp) client := new(marketwssclient.MarketByPriceTickWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, value := range symbolList { for _, vue := range value { client.Request(key, vue, config.Config.HbGather.HbSubUids) client.Subscribe(key, vue, config.Config.HbGather.HbSubUids) } } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse) if ok { if &depthResponse != nil { applogger.Info("subscribeSubMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, value := range symbolList { for _, vue := range value { client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids) } } client.Close() applogger.Info("Client closed") } // 买一卖一逐笔行情 func MgoSubscribeBbo() { symbolList := model.SymbolListString([]string{}) client := new(marketwssclient.BestBidOfferWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { bboResponse, ok := resp.(market.SubscribeBestBidOfferResponse) if ok { if bboResponse.Tick != nil { } applogger.Info("subscribeBbo data,ServersId:%v,Sender:%v,Content:%v-%v", bboResponse.Channel, bboResponse.Tick, nil) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Connection closed") } // K线数据 func MgoSubscribeKLine() { symbolList := model.SymbolListString(dictionary.TimeCycle) client := new(marketwssclient.CandlestickWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for symbol, period := range symbolList { for _, value := range period { client.Subscribe(symbol, value, config.Config.HbGather.HbSubUids) } } }, func(response interface{}) { resp, ok := response.(market.SubscribeCandlestickResponse) if ok { if &resp != nil { if resp.Tick != nil || resp.Data != nil { go func() { Channel := resp.Channel Tick := resp.Tick Data := resp.Data Timestamp := resp.Timestamp tick := bson.D{ {"Id", Tick.Id}, {"Close", Tick.Close.String()}, {"Low", Tick.Low.String()}, {"Amount", Tick.Amount.String()}, {"High", Tick.High.String()}, {"Count", Tick.Count}, {"Open", Tick.Open.String()}, {"Vol", Tick.Vol.String()}, } cxt := bson.D{ {"Channel", Channel}, {"Tick", tick}, {"Data", Data}, {"Timestamp", Timestamp}, } table := internal.CheckKLineTable(Channel) applogger.Debug("数据库表名称:%v", table) if err := data.MgoInsertOne(table, cxt); err != nil { applogger.Error("writeKLine err:%v", err) return } }() } applogger.Info("subscribeKLine data Key:%v,time:%v,tick:%v,data:%v", resp.Channel, resp.Timestamp, resp.Tick, resp.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, period := range symbolList { for _, value := range period { client.UnSubscribe(symbol, value, config.Config.HbGather.HbSubUids) } } client.UnSubscribe("btcusdt", "1min", config.Config.HbGather.HbSubUids) client.Close() applogger.Info("Client closed") } // 成交明细 func MgoSubscribeTrade() { symbolList := model.SymbolListString([]string{}) client := new(marketwssclient.TradeWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { depthResponse, ok := resp.(market.SubscribeTradeResponse) if ok { if &depthResponse != nil { applogger.Info("subscribeTrade data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // 市场概要 func MgoSubscribeLast24h() { symbolList := model.SymbolListString([]string{}) client := new(marketwssclient.Last24hCandlestickWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for key, _ := range symbolList { client.Request(key, config.Config.HbGather.HbSubUids) client.Subscribe(key, config.Config.HbGather.HbSubUids) } }, func(resp interface{}) { candlestickResponse, ok := resp.(market.SubscribeLast24hCandlestickResponse) if ok { if &candlestickResponse != nil { applogger.Info("subscribeLast24h data,ServersId:%v,Sender:%v,Content:%v-%v", candlestickResponse.Channel, candlestickResponse.Tick, candlestickResponse.Data) } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for key, _ := range symbolList { client.UnSubscribe(key, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") } // 聚合行情(Ticker) func MgoSubscribeTicker() { symbolList := model.SymbolListString([]string{}) client := new(marketwssclient.TickerWebSocketClient).Init(config.Config.HbGather.HbHost) client.SetHandler( func() { for symbol, _ := range symbolList { client.Subscribe(symbol, config.Config.HbGather.HbSubUids) } }, func(response interface{}) { resp, ok := response.(market.TickerWebsocketResponse) if ok { if &resp != nil { if resp.Tick != nil || resp.Data != nil { applogger.Info("subscribeTicker data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, resp.Tick, resp.Data) } } } else { applogger.Warn("Unknown response: %v", resp) } }) client.Connect(true) fmt.Println("Press ENTER to unsubscribe and stop...") fmt.Scanln() for symbol, _ := range symbolList { client.UnSubscribe(symbol, config.Config.HbGather.HbSubUids) } client.Close() applogger.Info("Client closed") }