You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

412 lines
12 KiB

package websocketservice
import (
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"github.com/satori/go.uuid"
"go.uber.org/zap"
"log"
"math"
"net/http"
"strings"
"sync"
"time"
"wss-pool/cmd/common"
"wss-pool/dictionary"
"wss-pool/internal/model"
red "wss-pool/internal/redis"
"wss-pool/logging/applogger"
mol "wss-pool/pkg/model"
)
// Define a websocket connection object that contains information for each connection
type Client struct {
Id string // Client ID
conn *websocket.Conn // Define websocket link objects
msg chan []byte // Define messages received and distributed
symbol sync.Map // Concurrent Security - Manage User Subscription Types
mux sync.Mutex
}
type StockMessage struct {
S string `json:"s,omitempty"` // 股票代码
Country string `json:"country"` //国家
StockCode string `json:"stock_code" bson:"stock_code"` // 股票代码
Symbol string `json:"symbol"`
Stock string `json:"stock"` // 期权代码
IsStockIndex bool `json:"is_stock_index"`
IsOptionList bool `json:"is_option_list"`
IsOptionInfo bool `json:"is_option_info"`
}
var (
wsStockConMap = map[string][]*websocket.Conn{}
mutexStock = sync.RWMutex{}
msgStockChan = make(chan []byte)
mutexConn = sync.RWMutex{}
TotalNum int
mutexTotal = sync.RWMutex{}
countryMap = make(map[string][]string)
mutexCountry = sync.RWMutex{}
clearClientChan = make(chan *websocket.Conn)
)
const (
stockConnNum int = 20
)
// Define an UpGrader to upgrade a regular HTTP connection to a websocket connection
var upServer = &websocket.Upgrader{
// Define read/write buffer size
WriteBufferSize: 1024,
ReadBufferSize: 1024,
// Verification request
CheckOrigin: func(r *http.Request) bool {
// If it is not a get request, return an error
if r.Method != "GET" {
fmt.Println("Request method error")
return false
}
// If chat is not included in the path, an error is returned
if r.URL.Path != "/quotes-share-wss" {
fmt.Println("Request path error")
return false
}
token := r.URL.Query().Get("token")
if !common.CheckToken(token) {
applogger.Debug("token expired")
return false
}
// Verification rules can also be customized according to other needs
return true
},
}
// ShareConnect
func ShareConnect(host, addr string) {
go writeShare()
go offLineStock()
http.HandleFunc("/quotes-share-wss", wsHandleShare)
url := fmt.Sprintf("%v%v", host, addr)
err := http.ListenAndServe(url, nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}
func wsHandleShare(w http.ResponseWriter, r *http.Request) {
// Obtain a link through the upgraded upgrade tool
conn, err := upServer.Upgrade(w, r, nil)
if err != nil {
applogger.Info("Failed to obtain connection:%v", err)
return
}
// Register users after successful connection
client := &Client{
Id: uuid.NewV4().String(),
conn: conn,
msg: make(chan []byte),
symbol: sync.Map{},
mux: sync.Mutex{},
}
readShare(client)
}
func setTotalNum(num int) {
mutexTotal.Lock()
defer mutexTotal.Unlock()
TotalNum += num
}
func getTotalNum() {
mutexTotal.RLock()
defer mutexTotal.RUnlock()
applogger.Debug("number of colleagues online :%v", TotalNum)
}
// Read the message sent by the client and process the return response
func readShare(cl *Client) {
defer cl.conn.Close()
setTotalNum(1)
getTotalNum()
for {
_, msg, err := cl.conn.ReadMessage()
if err != nil {
clearClientChan <- cl.conn
applogger.Debug("user exit:%v", cl.conn.RemoteAddr().String())
return
}
// Process business logic
psgMsg := model.SubMessage(string(msg))
if psgMsg != nil {
switch psgMsg.Type {
case "ping": // Receiving ping
aloneSendStock(cl.conn, []byte(model.ReturnValue("pong")))
case "subscribe": // Receive subscription
country, stock := getCountry(psgMsg.Symbol)
if !dictionary.StockCountryMap[country] {
applogger.Error(country, "incorrect subscription information 不属于合规的股票市场")
aloneSendStock(cl.conn, []byte(model.ReturnValue("incorrect subscription information")))
clearClientChan <- cl.conn
return
}
aloneSendStock(cl.conn, []byte(model.ReturnValue("subscribe success")))
//获取 服务是否 订阅该数据
mutexStock.RLock()
conns, ok := wsStockConMap[psgMsg.Symbol]
mutexStock.RUnlock()
if ok {
//查询client是否订阅
conns = checkClient(conns, cl.conn)
} else {
//添加订阅
conns = make([]*websocket.Conn, 0)
conns = append(conns, cl.conn)
}
mutexStock.Lock()
wsStockConMap[psgMsg.Symbol] = conns
mutexStock.Unlock()
applogger.Info("psgMsg.Symbol:%v,wsStockClient:%v", psgMsg.Symbol, wsStockConMap)
if !ok {
go cl.userPSubscribeUs(country, stock)
}
case "unSubscribe": // Receive unsubscribe
applogger.Info("Received unsubscribe message body:", string(msg))
mutexStock.RLock()
conns, ok := wsStockConMap[psgMsg.Symbol]
mutexStock.RUnlock()
if ok {
//取消订阅
removeWsconnStock(conns, cl.conn, psgMsg.Symbol)
}
aloneSendStock(cl.conn, []byte(model.ReturnValue("unSubscribe success")))
applogger.Debug("Subscription type after current user deletion:%v", ok)
default:
// TODO: Handling other situations transmitted by customers
applogger.Debug("Please provide accurate instructions......")
}
}
}
}
// 废弃 客户端
func offLineStock() {
for userConn := range clearClientChan {
setTotalNum(-1)
getTotalNum()
for key, v := range wsStockConMap {
removeWsconnStock(v, userConn, key)
}
}
}
// send message
func aloneSendStock(conn *websocket.Conn, message []byte) error {
mutexConn.Lock()
defer mutexConn.Unlock()
//applogger.Debug("aloneSendStock :%v",conn,message)
conn.SetWriteDeadline(time.Now().Add(writeWait))
w, err := conn.NextWriter(websocket.TextMessage) // Write data in the form of io, with parameters of data type
if err != nil {
//发送失败
applogger.Error("Failed to conn.NextWriter :%v", err)
return err
}
if _, err := w.Write(message); err != nil { // Write data, this function is truly used to transmit data to the foreground
applogger.Error("Failed Write message :%v", err)
return err
}
if err := w.Close(); err != nil { // Close write stream
applogger.Error("Failed to close write stream:%v", err)
return nil
}
return nil
}
// 清理客户端
func removeWsconnStock(conn []*websocket.Conn, userConn *websocket.Conn, symbol string) error {
index := -1
for i, v := range conn {
if v == userConn {
index = i
break
}
}
if index >= 0 {
conn = append(conn[:index], conn[index+1:]...)
mutexStock.Lock()
wsStockConMap[symbol] = conn
mutexStock.Unlock()
}
return nil
}
func deleteWsconnStock(symbol string) {
mutexStock.Lock()
defer mutexStock.Unlock()
delete(wsStockConMap, symbol)
}
// 广播
func broadcastWebSocketStock(msg []byte, symbol string) {
//applogger.Debug("订阅客户端:%v---%v", wsStockConMap, symbol)
mutexStock.RLock()
conns, ok := wsStockConMap[symbol]
mutexStock.RUnlock()
if !ok {
applogger.Error("Parsing data information:%v")
return
}
total := len(conns)
if total <= 0 {
return
}
start := time.Now()
connDiv := int(math.Ceil(float64(total) / float64(stockConnNum)))
//分批并发推送
for i := 0; i < connDiv; i++ {
startIndex := i * stockConnNum
endIndex := (i + 1) * stockConnNum
if endIndex > total {
endIndex = total
}
wg := sync.WaitGroup{}
for _, val := range conns[startIndex:endIndex] {
wg.Add(1)
go func(val *websocket.Conn, msg []byte) {
defer wg.Done()
aloneSendStock(val, msg)
}(val, msg)
}
wg.Wait()
}
applogger.Info("broadcast WebSocket info : %v ;total:%v;time-consuming %v ", symbol, total, time.Since(start))
}
func writeShare() {
for message := range msgStockChan {
if strings.Contains(string(message), "\"ev\":\"CAS\"") { // 外汇行情订阅
var subMsg mol.ForexJsonData
if err := json.Unmarshal(message, &subMsg); err != nil {
applogger.Error(err.Error())
}
broadcastWebSocketStock(message, fmt.Sprintf("%s.Forex", subMsg.Pair))
} else if strings.Contains(string(message), "\"ev\":\"CAS-D\"") { // 外汇行情天订阅
var subMsg mol.ForexJsonData
if err := json.Unmarshal(message, &subMsg); err != nil {
applogger.Error(err.Error())
}
broadcastWebSocketStock(message, fmt.Sprintf("%s.DayForex", subMsg.Pair))
} else if strings.Contains(string(message), "\"ev\":\"C\"") { // 外汇买一卖一报价
var subMsg mol.ForexLastQuote
if err := json.Unmarshal(message, &subMsg); err != nil {
applogger.Error(err.Error())
}
broadcastWebSocketStock(message, fmt.Sprintf("%s.LastForex", subMsg.P))
} else if strings.Contains(string(message), "\"ev\":\"T\"") {
var subMsg mol.ForexTrade
if err := json.Unmarshal(message, &subMsg); err != nil {
applogger.Error(err.Error())
}
broadcastWebSocketStock(message, fmt.Sprintf("%s.TradeForex", subMsg.Code))
} else {
var subMsg StockMessage
if err := json.Unmarshal(message, &subMsg); err != nil {
applogger.Error(err.Error())
}
if subMsg.S == "" {
if subMsg.IsStockIndex { // 指数行情订阅
broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.StockCode, common.StockIndexPrefix))
} else if subMsg.IsOptionList { // 期权列表行情订阅
broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionList)))
} else if subMsg.IsOptionInfo { // 期权详情订阅
broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Stock, fmt.Sprintf("%s%s%s", common.StockOption, common.CapitalizeFirstLetter(subMsg.Country), common.StockOptionInfo)))
} else { // 东南亚股票市场行情订阅(tradingView)
broadcastWebSocketStock(message, fmt.Sprintf("%s.%s", subMsg.Symbol, common.CapitalizeFirstLetter(subMsg.Country)))
}
continue
}
// 美股行情订阅
broadcastWebSocketStock(message, fmt.Sprintf("%s.US", subMsg.S))
}
}
}
func getCountry(symbol string) (string, string) {
symbolArr := strings.Split(symbol, ".")
if len(symbolArr) < 2 {
applogger.Error("symbol 有误")
return "", ""
}
county := symbolArr[len(symbolArr)-1]
return county, symbol[0 : strings.Index(symbol, county)-1]
}
// 按市场订阅
func (cl *Client) userPSubscribeUs(country, symbol string) {
mutexCountry.RLock()
symbols, ok := countryMap[country]
mutexCountry.RUnlock()
//提加
mutexCountry.Lock()
countryMap[country] = append(symbols, symbol)
mutexCountry.Unlock()
if ok {
//applogger.Error(country, "已订阅")
return
}
applogger.Debug(country, "start a stock subscription")
pubSub := red.RedisClient.PSubscribe(fmt.Sprintf("*.%s", country))
defer func() {
pubSub.Close()
}()
_, err := pubSub.Receive()
if err != nil {
applogger.Error("failed to receive from control PubSub,%v", zap.Error(err))
return
}
ch := pubSub.Channel()
for msg := range ch {
mutexStock.RLock()
conns, ok := wsStockConMap[msg.Channel]
mutexStock.RUnlock()
//未订阅股票跳出
if !ok {
continue
}
if len(conns) > 0 {
msgStockChan <- []byte(msg.Payload)
} else {
deleteWsconnStock(msg.Channel)
mutexCountry.RLock()
symbols := countryMap[country]
mutexCountry.RUnlock()
index := -1
msgChannel := strings.Split(msg.Channel, ".")
for i, v := range symbols {
if v == msgChannel[0] {
index = i
break
}
}
if index >= 0 {
symbols = append(symbols[:index], symbols[index+1:]...)
mutexCountry.Lock()
countryMap[country] = symbols
mutexCountry.Unlock()
}
// 退订
if len(symbols) <= 0 {
mutexCountry.Lock()
delete(countryMap, country)
mutexCountry.Unlock()
applogger.Debug("Starting unsubscribe.....", country)
pubSub.PUnsubscribe(fmt.Sprintf("*.%s", country))
return
}
}
}
}