You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

402 lines
12 KiB

2 months ago
package marketwsscliert
import (
"encoding/json"
"fmt"
"wss-pool/cmd/websocketservice"
"wss-pool/config"
"wss-pool/internal/data/business"
red "wss-pool/internal/redis"
"wss-pool/logging/applogger"
"wss-pool/pkg/hbwssclient/marketwssclient"
"wss-pool/pkg/model/market"
)
/* 火币现货数据来源
https://huobiapi.github.io/docs/spot/v1/cn/#k-2
*/
// subscribeDepth 市场深度行情数据
func subscribeDepth(symbolList map[string][]string) {
client := new(marketwssclient.DepthWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.Request(key, vue, config.Config.HbGather.HbSubUids)
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeDepthResponse)
if ok {
if &depthResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: depthResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: depthResponse.Channel})
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
applogger.Info("subscribeDepth data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// subscribeLevelMbp 市场深度MBP行情数据(增量推送)(150挡)
func subscribeLevelMbp(symbolList map[string][]string) {
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: depthResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: depthResponse.Channel})
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
applogger.Info("subscribeLevelMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// subscribeFullMbp 市场深度MBP行情数据(全量推送)
func subscribeFullMbp(symbolList map[string][]int) {
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.SubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: depthResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: depthResponse.Channel})
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
applogger.Info("subscribeFullMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// subscribeSubMbp 市场深度MBP行情数据(增量推送)
func subscribeSubMbp(symbolList map[string][]int) {
client := new(marketwssclient.MarketByPriceTickWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.Request(key, vue, config.Config.HbGather.HbSubUids)
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: depthResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: depthResponse.Channel})
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
applogger.Info("subscribeSubMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// subscribeBbo 买一卖一逐笔行情
func subscribeBbo(symbolList map[string][]string) {
client := new(marketwssclient.BestBidOfferWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
bboResponse, ok := resp.(market.SubscribeBestBidOfferResponse)
if ok {
if bboResponse.Tick != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: bboResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: bboResponse.Channel})
red.RedisClient.Publish(bboResponse.Channel, string(jsonMessage))
}
applogger.Info("subscribeBbo data,ServersId:%v,Sender:%v,Content:%v-%v", bboResponse.Channel, Cli.Id, bboResponse.Tick, nil)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Connection closed")
}
// subscribeKLine K线数据
func subscribeKLine(symbolList map[string][]string) {
client := new(marketwssclient.CandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for symbol, period := range symbolList {
for _, value := range period {
client.Subscribe(symbol, value, config.Config.HbGather.HbSubUids)
}
}
},
func(response interface{}) {
resp, ok := response.(market.SubscribeCandlestickResponse)
if ok {
if &resp != nil {
if resp.Tick != nil || resp.Data != nil {
//插针
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: resp.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: resp.Channel,
})
//applogger.Info("k line JSON --- %s",string(jsonMessage))
red.RedisClient.Publish(resp.Channel, string(jsonMessage))
//实时数据入库
applogger.Info("k line resp %v", resp.Data)
go business.UpdateWsMgo(resp)
}
applogger.Info("subscribeKLine data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for symbol, period := range symbolList {
for _, value := range period {
client.UnSubscribe(symbol, value, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// subscribeTrade 成交明细
func subscribeTrade(symbolList map[string][]string) {
client := new(marketwssclient.TradeWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeTradeResponse)
if ok {
if &depthResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: depthResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: depthResponse.Channel})
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
applogger.Info("subscribeTrade data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// subscribeLast24h 市场概要
func subscribeLast24h(symbolList map[string][]string) {
// Initialize a new instance
client := new(marketwssclient.Last24hCandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
// Set the callback handlers
client.SetHandler(
// Connected handler
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
// Response handler
func(resp interface{}) {
candlestickResponse, ok := resp.(market.SubscribeLast24hCandlestickResponse)
if ok {
if &candlestickResponse != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: candlestickResponse.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: candlestickResponse.Channel})
red.RedisClient.Publish(candlestickResponse.Channel, string(jsonMessage))
applogger.Info("subscribeLast24h data,ServersId:%v,Sender:%v,Content:%v-%v", candlestickResponse.Channel, Cli.Id, candlestickResponse.Tick, candlestickResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
// Connect to the wss and wait for the handler to handle the response
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// subscribeTicker 市场聚合行情(ticker)
func subscribeTicker(symbolList map[string][]string) {
client := new(marketwssclient.TickerWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for symbol, _ := range symbolList {
client.Subscribe(symbol, config.Config.HbGather.HbSubUids)
}
},
func(response interface{}) {
resp, ok := response.(market.TickerWebsocketResponse)
if ok {
if &resp != nil {
if resp.Tick != nil || resp.Data != nil {
jsonMessage, _ := json.Marshal(websocketservice.Message{
ServersId: resp.Channel,
Sender: Cli.Id,
Content: resp,
Symbol: resp.Channel})
red.RedisClient.Publish(resp.Channel, string(jsonMessage))
applogger.Info("subscribeTicker data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data)
}
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for symbol, _ := range symbolList {
client.UnSubscribe(symbol, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}