package bawssclient import ( "encoding/json" "fmt" "github.com/shopspring/decimal" "wss-pool/dictionary" "wss-pool/logging/applogger" "wss-pool/pkg/client/bawebsocketclientbase" "wss-pool/pkg/model/market" ) type SubscribeDepthResponse struct { LastUpdateId string `json:"lastUpdateId"` Bids [][]string `json:"bids"` Asks [][]string `json:"asks"` } // Responsible to handle Depth data from WebSocket type DepthWebSocketClient struct { bawebsocketclientbase.WebSocketClientBase } // Initializer func (p *DepthWebSocketClient) Init(host string) *DepthWebSocketClient { p.WebSocketClientBase.Init(host) return p } // Request the full Depth data according to specified criteria //func (p *DepthWebSocketClient) Request(symbol string) { // topic := fmt.Sprintf("market.%s.kline.%s", symbol, period) // req := fmt.Sprintf("{\"req\": \"%s\", \"from\":%d, \"to\":%d, \"id\": \"%s\" }", topic, from, to, clientId) // // p.Send(req) // // applogger.Info("WebSocket requested, topic=%s, clientId=%s", topic, clientId) //} // Set callback handler func (p *DepthWebSocketClient) SetHandler( connectedHandler bawebsocketclientbase.ConnectedHandler, responseHandler bawebsocketclientbase.ResponseHandler) { p.WebSocketClientBase.SetHandler(connectedHandler, p.handleMessage, responseHandler) } // Request the full Depth data according to specified criteria // Subscribe Depth data func (p *DepthWebSocketClient) Subscribe(symbol string) { topis := make([]string, 0) sub := BaKLineParam{ Method: "SUBSCRIBE", Params: append(topis, fmt.Sprintf("%susdt@depth5@100ms", symbol)), ID: bawebsocketclientbase.Randomnumber(), } data, _ := json.Marshal(sub) p.Send(data) applogger.Info("WebSocket subscribed, clientId=%s", sub.ID) } // Unsubscribe Depth data func (p *DepthWebSocketClient) UnSubscribe(symbol string) { topis := make([]string, 0) for _, v := range dictionary.BaTimeCycle { topis = append(topis, fmt.Sprintf("%susdt@%s", symbol, v)) } sub := BaKLineParam{ Method: "UNSUBSCRIBE", Params: append(topis, fmt.Sprintf("%susdt@depth5@100ms", symbol)), ID: bawebsocketclientbase.Randomnumber(), } data, _ := json.Marshal(sub) p.Send(data) applogger.Info("WebSocket unsubscribed, clientId=%s", sub.ID) } func (p *DepthWebSocketClient) handleMessage(msg string) (interface{}, error) { baRes := SubscribeDepthResponse{} err := json.Unmarshal([]byte(msg), &baRes) //转火币数据结构 bids := make([][]decimal.Decimal, 0) for _, v := range baRes.Bids { if len(v) >= 2 { info := make([]decimal.Decimal, 0) price, _ := decimal.NewFromString(v[0]) info = append(info, price) size, _ := decimal.NewFromString(v[1]) info = append(info, size) bids = append(bids, info) } } asks := make([][]decimal.Decimal, 0) for _, v := range baRes.Asks { if len(v) >= 2 { info := make([]decimal.Decimal, 0) price, _ := decimal.NewFromString(v[0]) info = append(info, price) size, _ := decimal.NewFromString(v[1]) info = append(info, size) asks = append(asks, info) } } result := market.SubscribeMarketByPriceResponse{ Tick: &market.MarketByPrice{ Bids: bids, Asks: asks, }, } return result, err }