You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

382 lines
10 KiB

2 months ago
package business
import (
"fmt"
"go.mongodb.org/mongo-driver/bson"
"wss-pool/config"
"wss-pool/dictionary"
"wss-pool/internal"
"wss-pool/internal/data"
"wss-pool/logging/applogger"
"wss-pool/pkg/hbwssclient/marketwssclient"
"wss-pool/pkg/model"
"wss-pool/pkg/model/market"
)
// 市场深度行情数据
func MgoSubscribeDepth() {
symbolList := model.SymbolListString(dictionary.Depth)
client := new(marketwssclient.DepthWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.Request(key, vue, config.Config.HbGather.HbSubUids)
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeDepthResponse)
if ok {
if &depthResponse != nil {
applogger.Info("subscribeDepth data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// 市场深度MBP行情数据(增量推送)(150挡)
func MgoSubscribeLevelMbp() {
symbolList := model.SymbolListString([]string{})
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
applogger.Info("subscribeLevelMbp data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// 市场深度MBP行情数据(全量推送)
func MgoSubscribeFullMbp() {
symbolList := model.SymbolListInt(dictionary.LevelsRefresh)
client := new(marketwssclient.MarketByPriceWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.SubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
applogger.Info("subscribeFullMbp data,ServersId:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribeFull(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// 市场深度MBP行情数据(增量推送)
func MgoSubscribeSubMbp() {
symbolList := model.SymbolListInt(dictionary.LevelsMbp)
client := new(marketwssclient.MarketByPriceTickWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, value := range symbolList {
for _, vue := range value {
client.Request(key, vue, config.Config.HbGather.HbSubUids)
client.Subscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeMarketByPriceResponse)
if ok {
if &depthResponse != nil {
applogger.Info("subscribeSubMbp data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, value := range symbolList {
for _, vue := range value {
client.UnSubscribe(key, vue, config.Config.HbGather.HbSubUids)
}
}
client.Close()
applogger.Info("Client closed")
}
// 买一卖一逐笔行情
func MgoSubscribeBbo() {
symbolList := model.SymbolListString([]string{})
client := new(marketwssclient.BestBidOfferWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
bboResponse, ok := resp.(market.SubscribeBestBidOfferResponse)
if ok {
if bboResponse.Tick != nil {
}
applogger.Info("subscribeBbo data,ServersId:%v,Sender:%v,Content:%v-%v", bboResponse.Channel, bboResponse.Tick, nil)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Connection closed")
}
// K线数据
func MgoSubscribeKLine() {
symbolList := model.SymbolListString(dictionary.TimeCycle)
client := new(marketwssclient.CandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for symbol, period := range symbolList {
for _, value := range period {
client.Subscribe(symbol, value, config.Config.HbGather.HbSubUids)
}
}
},
func(response interface{}) {
resp, ok := response.(market.SubscribeCandlestickResponse)
if ok {
if &resp != nil {
if resp.Tick != nil || resp.Data != nil {
go func() {
Channel := resp.Channel
Tick := resp.Tick
Data := resp.Data
Timestamp := resp.Timestamp
tick := bson.D{
{"Id", Tick.Id},
{"Close", Tick.Close.String()},
{"Low", Tick.Low.String()},
{"Amount", Tick.Amount.String()},
{"High", Tick.High.String()},
{"Count", Tick.Count},
{"Open", Tick.Open.String()},
{"Vol", Tick.Vol.String()},
}
cxt := bson.D{
{"Channel", Channel},
{"Tick", tick},
{"Data", Data},
{"Timestamp", Timestamp},
}
table := internal.CheckKLineTable(Channel)
applogger.Debug("数据库表名称:%v", table)
if err := data.MgoInsertOne(table, cxt); err != nil {
applogger.Error("writeKLine err:%v", err)
return
}
}()
}
applogger.Info("subscribeKLine data Key:%v,time:%v,tick:%v,data:%v", resp.Channel, resp.Timestamp, resp.Tick, resp.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for symbol, period := range symbolList {
for _, value := range period {
client.UnSubscribe(symbol, value, config.Config.HbGather.HbSubUids)
}
}
client.UnSubscribe("btcusdt", "1min", config.Config.HbGather.HbSubUids)
client.Close()
applogger.Info("Client closed")
}
// 成交明细
func MgoSubscribeTrade() {
symbolList := model.SymbolListString([]string{})
client := new(marketwssclient.TradeWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
depthResponse, ok := resp.(market.SubscribeTradeResponse)
if ok {
if &depthResponse != nil {
applogger.Info("subscribeTrade data,ServersId:%v,Sender:%v,Content:%v-%v", depthResponse.Channel, depthResponse.Tick, depthResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// 市场概要
func MgoSubscribeLast24h() {
symbolList := model.SymbolListString([]string{})
client := new(marketwssclient.Last24hCandlestickWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for key, _ := range symbolList {
client.Request(key, config.Config.HbGather.HbSubUids)
client.Subscribe(key, config.Config.HbGather.HbSubUids)
}
},
func(resp interface{}) {
candlestickResponse, ok := resp.(market.SubscribeLast24hCandlestickResponse)
if ok {
if &candlestickResponse != nil {
applogger.Info("subscribeLast24h data,ServersId:%v,Sender:%v,Content:%v-%v", candlestickResponse.Channel, candlestickResponse.Tick, candlestickResponse.Data)
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for key, _ := range symbolList {
client.UnSubscribe(key, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}
// 聚合行情(Ticker)
func MgoSubscribeTicker() {
symbolList := model.SymbolListString([]string{})
client := new(marketwssclient.TickerWebSocketClient).Init(config.Config.HbGather.HbHost)
client.SetHandler(
func() {
for symbol, _ := range symbolList {
client.Subscribe(symbol, config.Config.HbGather.HbSubUids)
}
},
func(response interface{}) {
resp, ok := response.(market.TickerWebsocketResponse)
if ok {
if &resp != nil {
if resp.Tick != nil || resp.Data != nil {
applogger.Info("subscribeTicker data,ServersId:%v,Sender:%v,Content:%v-%v", resp.Channel, resp.Tick, resp.Data)
}
}
} else {
applogger.Warn("Unknown response: %v", resp)
}
})
client.Connect(true)
fmt.Println("Press ENTER to unsubscribe and stop...")
fmt.Scanln()
for symbol, _ := range symbolList {
client.UnSubscribe(symbol, config.Config.HbGather.HbSubUids)
}
client.Close()
applogger.Info("Client closed")
}