You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

540 lines
14 KiB

package websocketservice
import (
"encoding/json"
"errors"
"fmt"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"wss-pool/config"
"wss-pool/internal"
"wss-pool/internal/gzip"
"wss-pool/internal/model"
red "wss-pool/internal/redis"
"wss-pool/logging/applogger"
"wss-pool/pkg/model/market"
"wss-pool/pkg/model/stock"
)
const (
// Time allowed to write a message to the peer.
writeWait = 10 * time.Second
// Time allowed to read the next pong message from the peer.
pongWait = 60 * time.Second
// Send pings to peer with this period. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size allowed from peer.
maxMessageSize = 512
SpotsStatus int = 1
ContractStatus int = 2
RedisCONTRACT string = "CONTRACT:LIST"
RedisDIGITAL string = "DIGITAL:LIST"
StatusPass int = 1
)
// Define a websocket connection object that contains information for each connection
type User struct {
Id string // Client ID
conn *websocket.Conn // Define websocket link objects
msg chan []byte // Define messages received and distributed
symbol sync.Map // Define a collection of user subscription types
stop chan string
start chan string
mux sync.Mutex
}
var (
wsConMap = map[string][]*websocket.Conn{}
mutex = sync.RWMutex{}
msgChan = make(chan []byte)
mutexSpotConn = sync.RWMutex{}
)
// Define the message body for the output
type Message struct {
ServersId string `json:"serversId,omitempty"` // Service ID (used to receive the client push ID of the third-party push server)
Sender string `json:"sender,omitempty"` // sender
Recipient string `json:"recipient,omitempty"` // recipient
Content any `json:"content,omitempty"` // send content
Symbol string `json:"symbol,omitempty"` // Subscription type
Logo string `json:"logo_link,omitempty"` // Subscription type
KeepDecimal string `json:"keep_decimal"`
}
var SpotMarketCache = sync.Map{}
var ContractCache = sync.Map{}
// Define an UpGrader to upgrade a regular HTTP connection to a websocket connection
var up = &websocket.Upgrader{
// Define read/write buffer size
WriteBufferSize: 1024,
ReadBufferSize: 1024,
// Verification request
CheckOrigin: func(r *http.Request) bool {
// If it is not a get request, return an error
if r.Method != "GET" {
fmt.Println("Request method error")
return false
}
// If chat is not included in the path, an error is returned
if r.URL.Path != "/quotes-wss" {
fmt.Println("Request path error")
return false
}
// Verification rules can also be customized according to other needs
return true
},
}
func Connect(host, addr string) {
//广播启动
go write()
http.HandleFunc("/quotes-wss", wsHandle)
url := fmt.Sprintf("%v%v", host, addr)
fmt.Println(url)
err := http.ListenAndServe(url, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
func wsHandle(w http.ResponseWriter, r *http.Request) {
// Obtain a link through the upgraded upgrade tool
conn, err := up.Upgrade(w, r, nil)
if err != nil {
applogger.Info("获取连接失败:%v", err)
return
}
// Register users after successful connection
user := &User{
Id: uuid.NewV4().String(),
conn: conn,
msg: make(chan []byte),
stop: make(chan string),
start: make(chan string),
symbol: sync.Map{},
mux: sync.Mutex{},
}
read(user)
//go hearBeat(user)
}
// Read the message sent by the client and process the return response
func read(user *User) {
defer func() {
user.conn.Close()
}()
for {
_, msg, err := user.conn.ReadMessage()
if err != nil {
offLine(user.conn)
applogger.Info("user exit:%v", user.conn.RemoteAddr().String())
return
}
// Process business logic
psgMsg := model.SubMessage(string(msg))
fmt.Println(psgMsg)
if psgMsg != nil {
switch psgMsg.Type {
case "ping": // Receiving ping
aloneSend(user.conn, []byte(model.ReturnValue("pong")))
case "subscribe": // Receive subscription
applogger.Info("接收到订阅消息体:", string(msg))
aloneSend(user.conn, []byte(model.ReturnValue("subscribe success")))
//获取 服务是否 订阅该数据
mutex.RLock()
conns, ok := wsConMap[psgMsg.Symbol]
mutex.RUnlock()
if ok {
//查询client是否订阅
conns = checkClient(conns, user.conn)
} else {
//添加订阅
conns = make([]*websocket.Conn, 0)
conns = append(conns, user.conn)
}
applogger.Info("psgMsg.Symbol", psgMsg.Symbol)
mutex.Lock()
wsConMap[psgMsg.Symbol] = conns
mutex.Unlock()
if !ok {
go user.userSubscribe(psgMsg.Symbol)
}
case "unSubscribe": // Receive unsubscribe
applogger.Info("Received unsubscribe message body:", string(msg))
mutex.RLock()
conns, ok := wsConMap[psgMsg.Symbol]
mutex.RUnlock()
if ok {
//取消订阅
removeWsconn(conns, user.conn, psgMsg.Symbol)
}
aloneSend(user.conn, []byte(model.ReturnValue("unSubscribe success")))
applogger.Debug("Subscription type after current user deletion:%v", ok)
default:
// TODO: Handling other situations transmitted by customers
}
}
}
}
// 清理客户端
func removeWsconn(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error {
index := -1
for i, v := range conn {
if v == userConn {
index = i
break
}
}
if index >= 0 {
conn = append(conn[:index], conn[index+1:]...)
mutex.Lock()
wsConMap[symbol] = conn
mutex.Unlock()
}
return nil
}
func deleteWsconn(symbol string) {
mutex.Lock()
defer mutex.Unlock()
delete(wsConMap, symbol)
}
// 检查是否订阅过
func checkClient(conns []*websocket.Conn, conn *websocket.Conn) []*websocket.Conn {
for _, v := range conns {
if v == conn {
return conns
}
}
conns = append(conns, conn)
return conns
}
// 广播
func broadcastWebSocket(msg []byte, symbol string) {
mutex.RLock()
conns, ok := wsConMap[symbol]
mutex.RUnlock()
if !ok {
applogger.Error("Parsing data information:%v")
return
}
for _, val := range conns {
if err := aloneSend(val, msg); err != nil {
removeWsconn(conns, val, symbol)
}
}
}
// 断开连接 清理订阅
func offLine(userConn *websocket.Conn) {
for key, v := range wsConMap {
removeWsconn(v, userConn, key)
}
}
// send message
func aloneSend(conn *websocket.Conn, message []byte) error {
mutexSpotConn.Lock()
defer mutexSpotConn.Unlock()
conn.SetWriteDeadline(time.Now().Add(writeWait))
w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type
if err != nil {
//发送失败
applogger.Error("Failed to conn.NextWriter :%v", err)
return err
}
if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground
applogger.Error("Failed Write message :%v", err)
return err
}
if err := w.Close(); err != nil { // Close write stream
applogger.Error("Failed to close write stream:%v", err)
return nil
}
return nil
}
//func hearBeat(user *User) {
// defer func() {
// user.conn.Close()
// }()
// ticker := time.NewTicker(writeWait) // Set timing
// for {
// select {
// case <-ticker.C:
// TODO: Need to add a server to ping the client
// //发送心跳
// if err := aloneSend(user.conn, []byte(model.ReturnValue("ping"))); err != nil {
// applogger.Error("Failed to send heartbeat message: ", err)
// //ping 不通 清理客户端
// offLine(user.conn)
// return
// }
// }
// }
//}
// Producer sends messages
func write() {
for message := range msgChan {
// TODO: 延迟200毫秒
time.Sleep(20 * time.Millisecond)
var subMsg Message
json.Unmarshal(message, &subMsg)
//广播
broadcastWebSocket(message, subMsg.Symbol)
applogger.Info("broadcast WebSocket Sub message info:%v", subMsg.Symbol)
}
}
// User subscription and cancellation
func (u *User) userSubscribe(symbol string) {
applogger.Info("新交易对", symbol)
pubSub := red.RedisClient.Subscribe(symbol)
defer func() {
pubSub.Close()
}()
_, err := pubSub.Receive()
if err != nil {
applogger.Error("failed to receive from control PubSub,%v", zap.Error(err))
return
}
//fmt.Println(pubSub)
ch := pubSub.Channel()
for msg := range ch {
var subMsg Message
//合约深度解压包
isSubscribeCtDepth := strings.Contains(symbol, "USDT.depth.step")
switch isSubscribeCtDepth {
case true:
content, err := gzip.GZipDecompress([]byte(msg.Payload))
if err != nil {
applogger.Error("UnGZip data error: %s", err)
return
}
result := market.SubscribeCtDepthResponse{}
if err := json.Unmarshal([]byte(content), &result); err != nil {
applogger.Error("Depth Unmarshal ", err)
close(u.msg)
return
}
//fmt.Println(result)
subMsg.Symbol = result.Channel
subMsg.Content = result
subMsg.ServersId = result.Channel
message, _ := json.Marshal(subMsg)
msg.Payload = string(message)
default:
//applogger.Info("Subscribe date:%v", msg.Payload)
if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
applogger.Error("Parsing data information:%v", err)
close(u.msg)
return
}
}
//fmt.Println("返回数据 : ", msg.Payload)
//交易全被被取消订阅
mutex.RLock()
conns := wsConMap[subMsg.Symbol]
mutex.RUnlock()
if len(conns) > 0 {
msgChan <- []byte(msg.Payload)
} else {
applogger.Debug("Starting unsubscribe.....", symbol)
deleteWsconn(symbol)
pubSub.Unsubscribe(symbol)
return
}
}
}
func HashValue(hashListName string) []stock.PHPData {
keys := red.Scan(hashListName)
result := make([]stock.PHPData, 0)
for _, v := range keys {
res, _ := red.HGetAll(v)
fmt.Println(res)
item := stock.PHPData{}
for field, value := range res {
switch field {
case "name":
item.Name = value
case "code":
item.Code = value
case "keep_decimal":
item.KeepDecimal = value
case "face_value":
item.FaceValue, _ = strconv.Atoi(value)
case "max_pry":
item.MaxPry, _ = strconv.Atoi(value)
case "min_pry":
item.MinPry, _ = strconv.Atoi(value)
case "logo_link":
item.LogoLink = value
case "status":
item.Status, _ = strconv.Atoi(value)
}
}
//if item.Status == StatusPass {
result = append(result, item)
//}
}
return result
}
func HashValueOnce(key string) (stock.PHPData, error) {
res, _ := red.HGetAll(key)
item := stock.PHPData{}
for field, value := range res {
switch field {
case "name":
item.Name = value
case "code":
item.Code = value
case "keep_decimal":
item.KeepDecimal = value
case "face_value":
item.FaceValue, _ = strconv.Atoi(value)
case "max_pry":
item.MaxPry, _ = strconv.Atoi(value)
case "min_pry":
item.MinPry, _ = strconv.Atoi(value)
case "logo_link":
item.LogoLink = value
case "status":
item.Status, _ = strconv.Atoi(value)
}
}
if item.Status == StatusPass {
return item, nil
}
return stock.PHPData{}, errors.New("data is null")
}
// market_type //1现货,2合约,3美股
func PHPResutl(market_type int) stock.PHPRes {
var eodModel stock.PHPRes
url := fmt.Sprintf("https://%v/bs/market",
config.Config.HbApi.PHPHost)
bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":"%d","trade_name":"","page":1,"page_size":1000}`, market_type))
if err != nil {
applogger.Error("Failed to query data:%v", err)
return eodModel
}
if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil {
applogger.Error("eodModel json Unmarshal err: %v", err)
return eodModel
}
return eodModel
}
// market_type 1现货,2合约,3美股
func PHPMarketTrade(market_type, num int, trade_name string) stock.PHPMarketTradeList {
var eodModel stock.PHPMarketTradeList
url := fmt.Sprintf("https://%v/bs/market_trade",
config.Config.HbApi.PHPHost)
fmt.Println(url)
bodyStr, err := internal.HttpPost(url, fmt.Sprintf(`{"market_type":%d,"trade_name":"%s","num":%d}`, market_type, trade_name, num))
fmt.Println(bodyStr)
if err != nil {
applogger.Error("Failed to query data:%v", err)
return eodModel
}
if err = json.Unmarshal([]byte(bodyStr), &eodModel); err != nil {
applogger.Error("eodModel json Unmarshal err: %v", err)
return eodModel
}
return eodModel
}
func SubscriptionCache() {
spot := HashValue(RedisDIGITAL)
contract := HashValue(RedisCONTRACT)
applogger.Info("spot :", spot, "contract :", contract)
for _, v := range spot {
symbol := fmt.Sprintf("market.%susdt.kline.1day", strings.ToLower(v.Name))
go UserMainSubscribe(symbol, SpotsStatus, v.Name)
}
for _, v := range contract {
symbol := fmt.Sprintf("market.%s.kline.1day", v.Name)
go UserMainSubscribe(symbol, ContractStatus, v.Name)
}
}
// 1 合约 2 现货
func UserMainSubscribe(symbol string, market_type int, name string) {
applogger.Info("新交易对", symbol)
var subMsg Message
pubSub := red.RedisClient.Subscribe(symbol)
defer func() {
pubSub.Close()
}()
_, err := pubSub.Receive()
if err != nil {
applogger.Error("failed to receive from control PubSub,%v", zap.Error(err))
return
}
ch := pubSub.Channel()
for msg := range ch {
//applogger.Info("Subscribe date:%v", msg.Payload)
if err := json.Unmarshal([]byte(msg.Payload), &subMsg); err != nil {
applogger.Error("Parsing data information:%v", err)
return
}
if subMsg.Symbol == "" {
applogger.Error("symbol not data :%v", err)
continue
}
if market_type == SpotsStatus {
val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisDIGITAL, name))
if err != nil {
//applogger.Error(name, err.Error())
SpotMarketCache.Delete(strings.ToLower(name))
continue
}
subMsg.Logo = val.LogoLink
subMsg.Symbol = strings.ToLower(val.Name)
subMsg.KeepDecimal = val.KeepDecimal
SpotMarketCache.Store(strings.ToLower(name), subMsg)
} else {
val, err := HashValueOnce(fmt.Sprintf("%s:%s", RedisCONTRACT, name))
if err != nil {
//applogger.Error(name, err.Error())
ContractCache.Delete(name)
continue
}
subMsg.Logo = val.LogoLink
subMsg.Symbol = val.Name
subMsg.KeepDecimal = val.KeepDecimal
ContractCache.Store(name, subMsg)
}
}
}
// Send data to websocket websocketServer
func (u *User) send(data string) (err error) {
if u.conn == nil {
applogger.Error("WebSocket sent error: no connection available")
return err
}
u.mux.Lock()
err = u.conn.WriteMessage(websocket.TextMessage, []byte(data))
u.mux.Unlock()
return err
}