You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
402 lines
12 KiB
402 lines
12 KiB
2 months ago
|
package marketwsscliert
|
||
|
|
||
|
import (
|
||
|
"encoding/json"
|
||
|
"fmt"
|
||
|
"wss-pool/cmd/websocketservice"
|
||
|
"wss-pool/config"
|
||
|
"wss-pool/internal/data/business"
|
||
|
red "wss-pool/internal/redis"
|
||
|
"wss-pool/logging/applogger"
|
||
|
"wss-pool/pkg/hbwssclient/marketwssclient"
|
||
|
"wss-pool/pkg/model/market"
|
||
|
)
|
||
|
|
||
|
/* 火币现货数据来源
|
||
|
https://huobiapi.github.io/docs/spot/v1/cn/#k-2
|
||
|
*/
|
||
|
// subscribeDepth 市场深度行情数据
|
||
|
func subscribeDepth(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.DepthWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.Request(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
depthResponse, ok := resp.(market.SubscribeDepthResponse)
|
||
|
if ok {
|
||
|
if &depthResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: depthResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: depthResponse.Channel})
|
||
|
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeDepth data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeLevelMbp 市场深度MBP行情数据(增量推送)(150挡)
|
||
|
func subscribeLevelMbp(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, _ := range symbolList {
|
||
|
client.Request(key, config.Config.HbGather.HbSubUids)
|
||
|
client.Subscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
|
||
|
if ok {
|
||
|
if &depthResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: depthResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: depthResponse.Channel})
|
||
|
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeLevelMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, _ := range symbolList {
|
||
|
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeFullMbp 市场深度MBP行情数据(全量推送)
|
||
|
func subscribeFullMbp(symbolList map[string][]int) {
|
||
|
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.SubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
|
||
|
if ok {
|
||
|
if &depthResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: depthResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: depthResponse.Channel})
|
||
|
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeFullMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.UnSubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeSubMbp 市场深度MBP行情数据(增量推送)
|
||
|
func subscribeSubMbp(symbolList map[string][]int) {
|
||
|
client := new(marketwssclient.MarketByPriceTickWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.Request(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
|
||
|
if ok {
|
||
|
if &depthResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: depthResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: depthResponse.Channel})
|
||
|
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeSubMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, value := range symbolList {
|
||
|
for _, vue := range value {
|
||
|
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeBbo 买一卖一逐笔行情
|
||
|
func subscribeBbo(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.BestBidOfferWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, _ := range symbolList {
|
||
|
client.Subscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
bboResponse, ok := resp.(market.SubscribeBestBidOfferResponse)
|
||
|
if ok {
|
||
|
if bboResponse.Tick != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: bboResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: bboResponse.Channel})
|
||
|
red.RedisClient.Publish(bboResponse.Channel, string(jsonMessage))
|
||
|
}
|
||
|
applogger.Info("subscribeBbo data,ServersId:%v,Sender:%v,Content:%v-%v", bboResponse.Channel, Cli.Id, bboResponse.Tick, nil)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, _ := range symbolList {
|
||
|
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Connection closed")
|
||
|
}
|
||
|
|
||
|
// subscribeKLine K线数据
|
||
|
func subscribeKLine(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.CandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for symbol, period := range symbolList {
|
||
|
for _, value := range period {
|
||
|
client.Subscribe(symbol, value, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
func(response interface{}) {
|
||
|
resp, ok := response.(market.SubscribeCandlestickResponse)
|
||
|
if ok {
|
||
|
if &resp != nil {
|
||
|
if resp.Tick != nil || resp.Data != nil {
|
||
|
//插针
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: resp.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: resp.Channel,
|
||
|
})
|
||
|
//applogger.Info("k line JSON --- %s",string(jsonMessage))
|
||
|
red.RedisClient.Publish(resp.Channel, string(jsonMessage))
|
||
|
//实时数据入库
|
||
|
applogger.Info("k line resp %v", resp.Data)
|
||
|
go business.UpdateWsMgo(resp)
|
||
|
}
|
||
|
applogger.Info("subscribeKLine data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for symbol, period := range symbolList {
|
||
|
for _, value := range period {
|
||
|
client.UnSubscribe(symbol, value, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeTrade 成交明细
|
||
|
func subscribeTrade(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.TradeWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for key, _ := range symbolList {
|
||
|
client.Request(key, config.Config.HbGather.HbSubUids)
|
||
|
client.Subscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
},
|
||
|
func(resp interface{}) {
|
||
|
depthResponse, ok := resp.(market.SubscribeTradeResponse)
|
||
|
if ok {
|
||
|
if &depthResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: depthResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: depthResponse.Channel})
|
||
|
red.RedisClient.Publish(depthResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeTrade data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, Cli.Id, depthResponse.Tick, depthResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, _ := range symbolList {
|
||
|
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeLast24h 市场概要
|
||
|
func subscribeLast24h(symbolList map[string][]string) {
|
||
|
// Initialize a new instance
|
||
|
client := new(marketwssclient.Last24hCandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
// Set the callback handlers
|
||
|
client.SetHandler(
|
||
|
// Connected handler
|
||
|
func() {
|
||
|
for key, _ := range symbolList {
|
||
|
client.Request(key, config.Config.HbGather.HbSubUids)
|
||
|
client.Subscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
},
|
||
|
// Response handler
|
||
|
func(resp interface{}) {
|
||
|
candlestickResponse, ok := resp.(market.SubscribeLast24hCandlestickResponse)
|
||
|
if ok {
|
||
|
if &candlestickResponse != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: candlestickResponse.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: candlestickResponse.Channel})
|
||
|
red.RedisClient.Publish(candlestickResponse.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeLast24h data,ServersId:%v,Sender:%v,Content:%v-%v", candlestickResponse.Channel, Cli.Id, candlestickResponse.Tick, candlestickResponse.Data)
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
// Connect to the wss and wait for the handler to handle the response
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for key, _ := range symbolList {
|
||
|
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|
||
|
|
||
|
// subscribeTicker 市场聚合行情(ticker)
|
||
|
func subscribeTicker(symbolList map[string][]string) {
|
||
|
client := new(marketwssclient.TickerWebSocketClient).Init(config.Config.HbGather.HbHost)
|
||
|
client.SetHandler(
|
||
|
func() {
|
||
|
for symbol, _ := range symbolList {
|
||
|
client.Subscribe(symbol, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
},
|
||
|
func(response interface{}) {
|
||
|
resp, ok := response.(market.TickerWebsocketResponse)
|
||
|
if ok {
|
||
|
if &resp != nil {
|
||
|
if resp.Tick != nil || resp.Data != nil {
|
||
|
jsonMessage, _ := json.Marshal(websocketservice.Message{
|
||
|
ServersId: resp.Channel,
|
||
|
Sender: Cli.Id,
|
||
|
Content: resp,
|
||
|
Symbol: resp.Channel})
|
||
|
red.RedisClient.Publish(resp.Channel, string(jsonMessage))
|
||
|
applogger.Info("subscribeTicker data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, Cli.Id, resp.Tick, resp.Data)
|
||
|
}
|
||
|
}
|
||
|
} else {
|
||
|
applogger.Warn("Unknown response: %v", resp)
|
||
|
}
|
||
|
})
|
||
|
|
||
|
client.Connect(true)
|
||
|
|
||
|
fmt.Println("Press ENTER to unsubscribe and stop...")
|
||
|
fmt.Scanln()
|
||
|
|
||
|
for symbol, _ := range symbolList {
|
||
|
client.UnSubscribe(symbol, config.Config.HbGather.HbSubUids)
|
||
|
}
|
||
|
|
||
|
client.Close()
|
||
|
applogger.Info("Client closed")
|
||
|
}
|