You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

115 lines
3.1 KiB

2 months ago
package bawssclient
import (
"encoding/json"
"fmt"
"github.com/shopspring/decimal"
"wss-pool/dictionary"
"wss-pool/logging/applogger"
"wss-pool/pkg/client/bawebsocketclientbase"
"wss-pool/pkg/model/market"
)
type SubscribeDepthResponse struct {
LastUpdateId string `json:"lastUpdateId"`
Bids [][]string `json:"bids"`
Asks [][]string `json:"asks"`
}
// Responsible to handle Depth data from WebSocket
type DepthWebSocketClient struct {
bawebsocketclientbase.WebSocketClientBase
}
// Initializer
func (p *DepthWebSocketClient) Init(host string) *DepthWebSocketClient {
p.WebSocketClientBase.Init(host)
return p
}
// Request the full Depth data according to specified criteria
//func (p *DepthWebSocketClient) Request(symbol string) {
// topic := fmt.Sprintf("market.%s.kline.%s", symbol, period)
// req := fmt.Sprintf("{\"req\": \"%s\", \"from\":%d, \"to\":%d, \"id\": \"%s\" }", topic, from, to, clientId)
//
// p.Send(req)
//
// applogger.Info("WebSocket requested, topic=%s, clientId=%s", topic, clientId)
//}
// Set callback handler
func (p *DepthWebSocketClient) SetHandler(
connectedHandler bawebsocketclientbase.ConnectedHandler,
responseHandler bawebsocketclientbase.ResponseHandler) {
p.WebSocketClientBase.SetHandler(connectedHandler, p.handleMessage, responseHandler)
}
// Request the full Depth data according to specified criteria
// Subscribe Depth data
func (p *DepthWebSocketClient) Subscribe(symbol string) {
topis := make([]string, 0)
sub := BaKLineParam{
Method: "SUBSCRIBE",
Params: append(topis, fmt.Sprintf("%susdt@depth5@100ms", symbol)),
ID: bawebsocketclientbase.Randomnumber(),
}
data, _ := json.Marshal(sub)
p.Send(data)
applogger.Info("WebSocket subscribed, clientId=%s", sub.ID)
}
// Unsubscribe Depth data
func (p *DepthWebSocketClient) UnSubscribe(symbol string) {
topis := make([]string, 0)
for _, v := range dictionary.BaTimeCycle {
topis = append(topis, fmt.Sprintf("%susdt@%s", symbol, v))
}
sub := BaKLineParam{
Method: "UNSUBSCRIBE",
Params: append(topis, fmt.Sprintf("%susdt@depth5@100ms", symbol)),
ID: bawebsocketclientbase.Randomnumber(),
}
data, _ := json.Marshal(sub)
p.Send(data)
applogger.Info("WebSocket unsubscribed, clientId=%s", sub.ID)
}
func (p *DepthWebSocketClient) handleMessage(msg string) (interface{}, error) {
baRes := SubscribeDepthResponse{}
err := json.Unmarshal([]byte(msg), &baRes)
//转火币数据结构
bids := make([][]decimal.Decimal, 0)
for _, v := range baRes.Bids {
if len(v) >= 2 {
info := make([]decimal.Decimal, 0)
price, _ := decimal.NewFromString(v[0])
info = append(info, price)
size, _ := decimal.NewFromString(v[1])
info = append(info, size)
bids = append(bids, info)
}
}
asks := make([][]decimal.Decimal, 0)
for _, v := range baRes.Asks {
if len(v) >= 2 {
info := make([]decimal.Decimal, 0)
price, _ := decimal.NewFromString(v[0])
info = append(info, price)
size, _ := decimal.NewFromString(v[1])
info = append(info, size)
asks = append(asks, info)
}
}
result := market.SubscribeMarketByPriceResponse{
Tick: &market.MarketByPrice{
Bids: bids,
Asks: asks,
},
}
return result, err
}