You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
540 lines
14 KiB
540 lines
14 KiB
package websocketservice
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/satori/go.uuid"
|
|
"go.uber.org/zap"
|
|
"log"
|
|
"net/http"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
"wss-pool/config"
|
|
"wss-pool/internal"
|
|
"wss-pool/internal/gzip"
|
|
"wss-pool/internal/model"
|
|
red "wss-pool/internal/redis"
|
|
"wss-pool/logging/applogger"
|
|
"wss-pool/pkg/model/market"
|
|
"wss-pool/pkg/model/stock"
|
|
)
|
|
|
|
const (
|
|
// Time allowed to write a message to the peer.
|
|
writeWait = 10 * time.Second
|
|
|
|
// Time allowed to read the next pong message from the peer.
|
|
pongWait = 60 * time.Second
|
|
|
|
// Send pings to peer with this period. Must be less than pongWait.
|
|
pingPeriod = (pongWait * 9) / 10
|
|
|
|
// Maximum message size allowed from peer.
|
|
maxMessageSize = 512
|
|
SpotsStatus int = 1
|
|
ContractStatus int = 2
|
|
RedisCONTRACT string = "CONTRACT:LIST"
|
|
RedisDIGITAL string = "DIGITAL:LIST"
|
|
StatusPass int = 1
|
|
)
|
|
|
|
// Define a websocket connection object that contains information for each connection
|
|
type User struct {
|
|
Id string // Client ID
|
|
conn *websocket.Conn // Define websocket link objects
|
|
msg chan []byte // Define messages received and distributed
|
|
symbol sync.Map // Define a collection of user subscription types
|
|
stop chan string
|
|
start chan string
|
|
mux sync.Mutex
|
|
}
|
|
|
|
var (
|
|
wsConMap = map[string][]*websocket.Conn{}
|
|
mutex = sync.RWMutex{}
|
|
msgChan = make(chan []byte)
|
|
mutexSpotConn = sync.RWMutex{}
|
|
)
|
|
|
|
// Define the message body for the output
|
|
type Message struct {
|
|
ServersId string `json:"serversId,omitempty"` // Service ID (used to receive the client push ID of the third-party push server)
|
|
Sender string `json:"sender,omitempty"` // sender
|
|
Recipient string `json:"recipient,omitempty"` // recipient
|
|
Content any `json:"content,omitempty"` // send content
|
|
Symbol string `json:"symbol,omitempty"` // Subscription type
|
|
Logo string `json:"logo_link,omitempty"` // Subscription type
|
|
KeepDecimal string `json:"keep_decimal"`
|
|
}
|
|
|
|
var SpotMarketCache = sync.Map{}
|
|
var ContractCache = sync.Map{}
|
|
|
|
// Define an UpGrader to upgrade a regular HTTP connection to a websocket connection
|
|
var up = &websocket.Upgrader{
|
|
// Define read/write buffer size
|
|
WriteBufferSize: 1024,
|
|
ReadBufferSize: 1024,
|
|
// Verification request
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// If it is not a get request, return an error
|
|
if r.Method != "GET" {
|
|
fmt.Println("Request method error")
|
|
return false
|
|
}
|
|
// If chat is not included in the path, an error is returned
|
|
if r.URL.Path != "/quotes-wss" {
|
|
fmt.Println("Request path error")
|
|
return false
|
|
}
|
|
// Verification rules can also be customized according to other needs
|
|
return true
|
|
},
|
|
}
|
|
|
|
func Connect(host, addr string) {
|
|
//广播启动
|
|
go write()
|
|
http.HandleFunc("/quotes-wss", wsHandle)
|
|
url := fmt.Sprintf("%v%v", host, addr)
|
|
fmt.Println(url)
|
|
err := http.ListenAndServe(url, nil)
|
|
if err != nil {
|
|
log.Fatal("ListenAndServe: ", err)
|
|
}
|
|
}
|
|
|
|
func wsHandle(w http.ResponseWriter, r *http.Request) {
|
|
// Obtain a link through the upgraded upgrade tool
|
|
conn, err := up.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
applogger.Info("获取连接失败:%v", err)
|
|
return
|
|
}
|
|
// Register users after successful connection
|
|
user := &User{
|
|
Id: uuid.NewV4().String(),
|
|
conn: conn,
|
|
msg: make(chan []byte),
|
|
stop: make(chan string),
|
|
start: make(chan string),
|
|
symbol: sync.Map{},
|
|
mux: sync.Mutex{},
|
|
}
|
|
read(user)
|
|
//go hearBeat(user)
|
|
}
|
|
|
|
// Read the message sent by the client and process the return response
|
|
func read(user *User) {
|
|
defer func() {
|
|
user.conn.Close()
|
|
}()
|
|
for {
|
|
_, msg, err := user.conn.ReadMessage()
|
|
if err != nil {
|
|
offLine(user.conn)
|
|
applogger.Info("user exit:%v", user.conn.RemoteAddr().String())
|
|
return
|
|
}
|
|
// Process business logic
|
|
psgMsg := model.SubMessage(string(msg))
|
|
fmt.Println(psgMsg)
|
|
if psgMsg != nil {
|
|
switch psgMsg.Type {
|
|
case "ping": // Receiving ping
|
|
aloneSend(user.conn, []byte(model.ReturnValue("pong")))
|
|
case "subscribe": // Receive subscription
|
|
applogger.Info("接收到订阅消息体:", string(msg))
|
|
aloneSend(user.conn, []byte(model.ReturnValue("subscribe success")))
|
|
//获取 服务是否 订阅该数据
|
|
mutex.RLock()
|
|
conns, ok := wsConMap[psgMsg.Symbol]
|
|
mutex.RUnlock()
|
|
if ok {
|
|
//查询client是否订阅
|
|
conns = checkClient(conns, user.conn)
|
|
} else {
|
|
//添加订阅
|
|
conns = make([]*websocket.Conn, 0)
|
|
conns = append(conns, user.conn)
|
|
}
|
|
applogger.Info("psgMsg.Symbol", psgMsg.Symbol)
|
|
mutex.Lock()
|
|
wsConMap[psgMsg.Symbol] = conns
|
|
mutex.Unlock()
|
|
if !ok {
|
|
go user.userSubscribe(psgMsg.Symbol)
|
|
}
|
|
case "unSubscribe": // Receive unsubscribe
|
|
applogger.Info("Received unsubscribe message body:", string(msg))
|
|
mutex.RLock()
|
|
conns, ok := wsConMap[psgMsg.Symbol]
|
|
mutex.RUnlock()
|
|
if ok {
|
|
//取消订阅
|
|
removeWsconn(conns, user.conn, psgMsg.Symbol)
|
|
}
|
|
aloneSend(user.conn, []byte(model.ReturnValue("unSubscribe success")))
|
|
applogger.Debug("Subscription type after current user deletion:%v", ok)
|
|
default:
|
|
// TODO: Handling other situations transmitted by customers
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 清理客户端
|
|
func removeWsconn(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error {
|
|
index := -1
|
|
for i, v := range conn {
|
|
if v == userConn {
|
|
index = i
|
|
break
|
|
}
|
|
}
|
|
if index >= 0 {
|
|
conn = append(conn[:index], conn[index+1:]...)
|
|
mutex.Lock()
|
|
wsConMap[symbol] = conn
|
|
mutex.Unlock()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func deleteWsconn(symbol string) {
|
|
mutex.Lock()
|
|
defer mutex.Unlock()
|
|
delete(wsConMap, symbol)
|
|
}
|
|
|
|
// 检查是否订阅过
|
|
func checkClient(conns []*websocket.Conn, conn *websocket.Conn) []*websocket.Conn {
|
|
for _, v := range conns {
|
|
if v == conn {
|
|
return conns
|
|
}
|
|
}
|
|
conns = append(conns, conn)
|
|
return conns
|
|
}
|
|
|
|
// 广播
|
|
func broadcastWebSocket(msg []byte, symbol string) {
|
|
mutex.RLock()
|
|
conns, ok := wsConMap[symbol]
|
|
mutex.RUnlock()
|
|
if !ok {
|
|
applogger.Error("Parsing data information:%v")
|
|
return
|
|
}
|
|
for _, val := range conns {
|
|
if err := aloneSend(val, msg); err != nil {
|
|
removeWsconn(conns, val, symbol)
|
|
}
|
|
}
|
|
}
|
|
|
|
// 断开连接 清理订阅
|
|
func offLine(userConn *websocket.Conn) {
|
|
for key, v := range wsConMap {
|
|
removeWsconn(v, userConn, key)
|
|
}
|
|
}
|
|
|
|
// send message
|
|
func aloneSend(conn *websocket.Conn, message []byte) error {
|
|
mutexSpotConn.Lock()
|
|
defer mutexSpotConn.Unlock()
|
|
conn.SetWriteDeadline(time.Now().Add(writeWait))
|
|
w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type
|
|
if err != nil {
|
|
//发送失败
|
|
applogger.Error("Failed to conn.NextWriter :%v", err)
|
|
return err
|
|
}
|
|
if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground
|
|
applogger.Error("Failed Write message :%v", err)
|
|
return err
|
|
}
|
|
if err := w.Close(); err != nil { // Close write stream
|
|
applogger.Error("Failed to close write stream:%v", err)
|
|
return nil
|
|
}
|
|
return nil
|
|
}
|
|
|
|
//func hearBeat(user *User) {
|
|
// defer func() {
|
|
// user.conn.Close()
|
|
// }()
|
|
// ticker := time.NewTicker(writeWait) // Set timing
|
|
// for {
|
|
// select {
|
|
// case <-ticker.C:
|
|
// TODO: Need to add a server to ping the client
|
|
// //发送心跳
|
|
// if err := aloneSend(user.conn, []byte(model.ReturnValue("ping"))); err != nil {
|
|
// applogger.Error("Failed to send heartbeat message: ", err)
|
|
// //ping 不通 清理客户端
|
|
// offLine(user.conn)
|
|
// return
|
|
// }
|
|
// }
|
|
// }
|
|
//}
|
|
|
|
// Producer sends messages
|
|
func write() {
|
|
for message := range msgChan {
|
|
// TODO: 延迟200毫秒
|
|
time.Sleep(20 * time.Millisecond)
|
|
var subMsg Message
|
|
json.Unmarshal(message, &subMsg)
|
|
//广播
|
|
broadcastWebSocket(message, subMsg.Symbol)
|
|
applogger.Info("broadcast WebSocket Sub message info:%v", subMsg.Symbol)
|
|
}
|
|
}
|
|
|
|
// User subscription and cancellation
|
|
func (u *User) userSubscribe(symbol string) {
|
|
applogger.Info("新交易对", symbol)
|
|
pubSub := red.RedisClient.Subscribe(symbol)
|
|
defer func() {
|
|
pubSub.Close()
|
|
}()
|
|
_, err := pubSub.Receive()
|
|
if err != nil {
|
|
applogger.Error("failed to receive from control PubSub,%v", zap.Error(err))
|
|
return
|
|
}
|
|
//fmt.Println(pubSub)
|
|
ch := pubSub.Channel()
|
|
for msg := range ch {
|
|
var subMsg Message
|
|
//合约深度解压包
|
|
isSubscribeCtDepth := strings.Contains(symbol, "USDT.depth.step")
|
|
switch isSubscribeCtDepth {
|
|
case true:
|
|
content, err := gzip.GZipDecompress([]byte(msg.Payload))
|
|
if err != nil {
|
|
applogger.Error("UnGZip data error: %s", err)
|
|
return
|
|
}
|
|
result := market.SubscribeCtDepthResponse{}
|
|
if err := json.Unmarshal([]byte(content), &result); err != nil {
|
|
applogger.Error("Depth Unmarshal ", err)
|
|
close(u.msg)
|
|
return
|
|
}
|
|
//fmt.Println(result)
|
|
subMsg.Symbol = result.Channel
|
|
subMsg.Content = result
|
|
subMsg.ServersId = result.Channel
|
|
message, _ := json.Marshal(subMsg)
|
|
msg.Payload = string(message)
|
|
default:
|
|
//applogger.Info("Subscribe date:%v", msg.Payload)
|
|
if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
|
|
applogger.Error("Parsing data information:%v", err)
|
|
close(u.msg)
|
|
return
|
|
}
|
|
}
|
|
//fmt.Println("返回数据 : ", msg.Payload)
|
|
//交易全被被取消订阅
|
|
mutex.RLock()
|
|
conns := wsConMap[subMsg.Symbol]
|
|
mutex.RUnlock()
|
|
if len(conns) > 0 {
|
|
msgChan <- []byte(msg.Payload)
|
|
} else {
|
|
applogger.Debug("Starting unsubscribe.....", symbol)
|
|
deleteWsconn(symbol)
|
|
pubSub.Unsubscribe(symbol)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func HashValue(hashListName string) []stock.PHPData {
|
|
keys := red.Scan(hashListName)
|
|
result := make([]stock.PHPData, 0)
|
|
for _, v := range keys {
|
|
res, _ := red.HGetAll(v)
|
|
fmt.Println(res)
|
|
item := stock.PHPData{}
|
|
for field, value := range res {
|
|
switch field {
|
|
case "name":
|
|
item.Name = value
|
|
case "code":
|
|
item.Code = value
|
|
case "keep_decimal":
|
|
item.KeepDecimal = value
|
|
case "face_value":
|
|
item.FaceValue, _ = strconv.Atoi(value)
|
|
case "max_pry":
|
|
item.MaxPry, _ = strconv.Atoi(value)
|
|
case "min_pry":
|
|
item.MinPry, _ = strconv.Atoi(value)
|
|
case "logo_link":
|
|
item.LogoLink = value
|
|
case "status":
|
|
item.Status, _ = strconv.Atoi(value)
|
|
}
|
|
}
|
|
//if item.Status == StatusPass {
|
|
result = append(result, item)
|
|
//}
|
|
}
|
|
return result
|
|
}
|
|
|
|
func HashValueOnce(key string) (stock.PHPData, error) {
|
|
res, _ := red.HGetAll(key)
|
|
item := stock.PHPData{}
|
|
for field, value := range res {
|
|
switch field {
|
|
case "name":
|
|
item.Name = value
|
|
case "code":
|
|
item.Code = value
|
|
case "keep_decimal":
|
|
item.KeepDecimal = value
|
|
case "face_value":
|
|
item.FaceValue, _ = strconv.Atoi(value)
|
|
case "max_pry":
|
|
item.MaxPry, _ = strconv.Atoi(value)
|
|
case "min_pry":
|
|
item.MinPry, _ = strconv.Atoi(value)
|
|
case "logo_link":
|
|
item.LogoLink = value
|
|
case "status":
|
|
item.Status, _ = strconv.Atoi(value)
|
|
}
|
|
}
|
|
if item.Status == StatusPass {
|
|
return item, nil
|
|
}
|
|
return stock.PHPData{}, errors.New("data is null")
|
|
}
|
|
|
|
// market_type //1现货,2合约,3美股
|
|
func PHPResutl(market_type int) stock.PHPRes {
|
|
var eodModel stock.PHPRes
|
|
url := fmt.Sprintf("https://%v/bs/market",
|
|
config.Config.HbApi.PHPHost)
|
|
bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":"%d","trade_name":"","page":1,"page_size":1000}`, market_type))
|
|
if err != nil {
|
|
applogger.Error("Failed to query data:%v", err)
|
|
return eodModel
|
|
}
|
|
if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil {
|
|
applogger.Error("eodModel json Unmarshal err: %v", err)
|
|
return eodModel
|
|
}
|
|
return eodModel
|
|
}
|
|
|
|
// market_type 1现货,2合约,3美股
|
|
func PHPMarketTrade(market_type, num int, trade_name string) stock.PHPMarketTradeList {
|
|
var eodModel stock.PHPMarketTradeList
|
|
url := fmt.Sprintf("https://%v/bs/market_trade",
|
|
config.Config.HbApi.PHPHost)
|
|
fmt.Println(url)
|
|
bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":%d,"trade_name":"%s","num":%d}`, market_type, trade_name, num))
|
|
fmt.Println(bodyStr)
|
|
if err != nil {
|
|
applogger.Error("Failed to query data:%v", err)
|
|
return eodModel
|
|
}
|
|
if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil {
|
|
applogger.Error("eodModel json Unmarshal err: %v", err)
|
|
return eodModel
|
|
}
|
|
return eodModel
|
|
}
|
|
|
|
func SubscriptionCache() {
|
|
spot := HashValue(RedisDIGITAL)
|
|
contract := HashValue(RedisCONTRACT)
|
|
applogger.Info("spot :", spot, "contract :", contract)
|
|
for _, v := range spot {
|
|
symbol := fmt.Sprintf("market.%susdt.kline.1day", strings.ToLower(v.Name))
|
|
go UserMainSubscribe(symbol, SpotsStatus, v.Name)
|
|
}
|
|
for _, v := range contract {
|
|
symbol := fmt.Sprintf("market.%s.kline.1day", v.Name)
|
|
go UserMainSubscribe(symbol, ContractStatus, v.Name)
|
|
}
|
|
}
|
|
|
|
// 1 合约 2 现货
|
|
func UserMainSubscribe(symbol string, market_type int, name string) {
|
|
applogger.Info("新交易对", symbol)
|
|
var subMsg Message
|
|
pubSub := red.RedisClient.Subscribe(symbol)
|
|
defer func() {
|
|
pubSub.Close()
|
|
}()
|
|
_, err := pubSub.Receive()
|
|
if err != nil {
|
|
applogger.Error("failed to receive from control PubSub,%v", zap.Error(err))
|
|
return
|
|
}
|
|
ch := pubSub.Channel()
|
|
for msg := range ch {
|
|
//applogger.Info("Subscribe date:%v", msg.Payload)
|
|
if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
|
|
applogger.Error("Parsing data information:%v", err)
|
|
return
|
|
}
|
|
if subMsg.Symbol == "" {
|
|
applogger.Error("symbol not data :%v", err)
|
|
continue
|
|
}
|
|
if market_type == SpotsStatus {
|
|
val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisDIGITAL, name))
|
|
if err != nil {
|
|
//applogger.Error(name, err.Error())
|
|
SpotMarketCache.Delete(strings.ToLower(name))
|
|
continue
|
|
}
|
|
subMsg.Logo = val.LogoLink
|
|
subMsg.Symbol = strings.ToLower(val.Name)
|
|
subMsg.KeepDecimal = val.KeepDecimal
|
|
SpotMarketCache.Store(strings.ToLower(name), subMsg)
|
|
} else {
|
|
val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisCONTRACT, name))
|
|
if err != nil {
|
|
//applogger.Error(name, err.Error())
|
|
ContractCache.Delete(name)
|
|
continue
|
|
}
|
|
subMsg.Logo = val.LogoLink
|
|
subMsg.Symbol = val.Name
|
|
subMsg.KeepDecimal = val.KeepDecimal
|
|
ContractCache.Store(name, subMsg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Send data to websocket websocketServer
|
|
func (u *User) send(data string) (err error) {
|
|
if u.conn == nil {
|
|
applogger.Error("WebSocket sent error: no connection available")
|
|
return err
|
|
}
|
|
|
|
u.mux.Lock()
|
|
err = u.conn.WriteMessage(websocket.TextMessage, []byte(data))
|
|
u.mux.Unlock()
|
|
|
|
return err
|
|
}
|
|
|