You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
6.4 KiB
241 lines
6.4 KiB
package us
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"github.com/gorilla/websocket"
|
|
"log"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
"wss-pool/cmd/websocketcollect/cache"
|
|
"wss-pool/config"
|
|
"wss-pool/logging/applogger"
|
|
"wss-pool/pkg/model"
|
|
)
|
|
|
|
var (
|
|
UsNewCode = make(map[string]bool)
|
|
ctx, cancel = context.WithCancel(context.Background())
|
|
wg sync.WaitGroup
|
|
)
|
|
var upGrader = websocket.Upgrader{
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
// Verification request`
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
// If it is not a get request, return an error
|
|
if r.Method != "GET" {
|
|
fmt.Println("Request method error")
|
|
return false
|
|
}
|
|
// If chat is not included in the path, an error is returned
|
|
if r.URL.Path != "/ws" {
|
|
fmt.Println("Request path error")
|
|
return false
|
|
}
|
|
// Verification rules can also be customized according to other needs
|
|
return true
|
|
},
|
|
}
|
|
|
|
// Client represents a WebSocket client.
|
|
type Client struct {
|
|
conn *websocket.Conn
|
|
subscriptions map[string]bool // Tracks which topics the client is subscribed to
|
|
}
|
|
|
|
// Hub maintains the set of active clients and broadcasts messages to the clients.
|
|
type Hub struct {
|
|
clients map[*Client]bool // All connected clients
|
|
broadcast chan Message // Broadcast channel for messages
|
|
topics map[string][]*Client // Topic to clients mapping for subscriptions
|
|
mu sync.Mutex // Mutex for safe concurrent access
|
|
}
|
|
|
|
// Message structure to hold the message content and the topic
|
|
type Message struct {
|
|
Topic string `json:"topic"`
|
|
Content string `json:"content"`
|
|
}
|
|
|
|
// Initialize a new Hub
|
|
func newHub() *Hub {
|
|
return &Hub{
|
|
clients: make(map[*Client]bool),
|
|
broadcast: make(chan Message),
|
|
topics: make(map[string][]*Client),
|
|
}
|
|
}
|
|
|
|
// Start the Hub to listen for messages
|
|
func (h *Hub) run() {
|
|
for {
|
|
msg := <-h.broadcast
|
|
// Lock the hub to safely access the topics map
|
|
h.mu.Lock()
|
|
if clients, ok := h.topics[msg.Topic]; ok {
|
|
for _, client := range clients {
|
|
err := client.conn.WriteJSON(msg)
|
|
if err != nil {
|
|
log.Printf("Error writing message to client: %v", err)
|
|
client.conn.Close()
|
|
delete(h.clients, client)
|
|
}
|
|
}
|
|
}
|
|
h.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// Handle WebSocket connections
|
|
func handleConnection(hub *Hub, w http.ResponseWriter, r *http.Request) {
|
|
conn, err := upGrader.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Println("Error during connection upgrade:", err)
|
|
return
|
|
}
|
|
client := &Client{conn: conn, subscriptions: make(map[string]bool)}
|
|
hub.clients[client] = true
|
|
|
|
go func() {
|
|
defer func() {
|
|
delete(hub.clients, client)
|
|
for topic := range client.subscriptions {
|
|
hub.mu.Lock()
|
|
hub.topics[topic] = removeClientFromTopic(hub.topics[topic], client)
|
|
hub.mu.Unlock()
|
|
}
|
|
}()
|
|
for {
|
|
var msg Message
|
|
if err = conn.ReadJSON(&msg); err != nil {
|
|
log.Println("Error reading message:", err)
|
|
break
|
|
}
|
|
applogger.Info("message info:%v---%v", msg.Topic, msg.Content)
|
|
|
|
if msg.Content == "ping" || msg.Content == "subscribe" {
|
|
client.subscriptions[msg.Topic] = true
|
|
hub.mu.Lock()
|
|
hub.topics[msg.Topic] = append(hub.topics[msg.Topic], client)
|
|
hub.mu.Unlock()
|
|
}
|
|
switch msg.Content {
|
|
case "subscribe":
|
|
hub.broadcast <- Message{Topic: msg.Topic, Content: "subscribe success"}
|
|
case "ping":
|
|
hub.broadcast <- Message{Topic: msg.Topic, Content: "pong"}
|
|
default:
|
|
// TODO: 广播客户端订阅消息
|
|
//hub.broadcast <- msg
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
// ShareMarket content
|
|
func (h *Hub) ShareMarket() {
|
|
ticker := time.NewTicker(time.Second * 10)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
applogger.Info("Execute automatic subscription........")
|
|
if conn := subscribeFinnHub(); conn != nil {
|
|
// 客户端broadcast,消息推送
|
|
if err := h.SendAllClientNew(conn); err != nil {
|
|
break
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
func (h *Hub) SendAllClientNew(conn *websocket.Conn) error {
|
|
sendUsCodeNew(conn, false) // 初始化所有项目股票code订阅
|
|
go singleSendUsCodeNew(conn) // 单线程 订阅后续新增的股票
|
|
for {
|
|
_, msg, err := conn.ReadMessage()
|
|
if err != nil {
|
|
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
|
|
applogger.Error("IsUnexpectedCloseError error: %v", err)
|
|
}
|
|
return errors.New("Error reading message")
|
|
}
|
|
|
|
applogger.Debug("ReadMessage data info:%v", string(msg))
|
|
|
|
var subModel model.FinnhubMessage
|
|
if err := json.Unmarshal(msg, &subModel); err != nil {
|
|
applogger.Error("subModel Unmarshal info err: %v", err)
|
|
continue
|
|
}
|
|
// send to websocket
|
|
h.broadcast <- Message{Topic: "us", Content: string(msg)}
|
|
}
|
|
}
|
|
|
|
// Link to third-party services
|
|
func subscribeFinnHub() *websocket.Conn {
|
|
url := fmt.Sprintf("wss://%v?token=%v", config.Config.FinnhubUs.FinnhubWss, config.Config.FinnhubUs.FinnhubKey)
|
|
applogger.Info("subscribeFinnHub info Url:%v", url)
|
|
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
applogger.Error("Failed to link to wss server:%v", err)
|
|
return nil
|
|
}
|
|
|
|
return conn
|
|
}
|
|
func singleSendUsCodeNew(conn *websocket.Conn) {
|
|
for {
|
|
time.Sleep(10 * time.Minute)
|
|
//发送测试数据检查该连接是否有效
|
|
//if err := Send(conn, fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", "testcode.ping")); err != nil {
|
|
// applogger.Error("ws:%v", err)
|
|
// return
|
|
//}
|
|
applogger.Debug("send new us stock start")
|
|
sendUsCodeNew(conn, true)
|
|
}
|
|
}
|
|
func sendUsCodeNew(conn *websocket.Conn, isSingle bool) {
|
|
stockRes := cache.ReadListDB()
|
|
for _, code := range stockRes {
|
|
sendPing := fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", code)
|
|
if !isSingle { //订阅全部美股
|
|
fmt.Println("subscribe:", code)
|
|
Send(conn, sendPing)
|
|
UsNewCode[code] = true
|
|
continue
|
|
}
|
|
if !UsNewCode[code] { //追加订阅新股票
|
|
applogger.Debug("new us stock:%v", sendPing)
|
|
Send(conn, sendPing)
|
|
UsNewCode[code] = true
|
|
}
|
|
}
|
|
}
|
|
func Send(conn *websocket.Conn, data string) error {
|
|
if conn == nil {
|
|
applogger.Error("WebSocket sent error: no connection available")
|
|
return errors.New("WebSocket sent error: no connection available")
|
|
}
|
|
if err := conn.WriteMessage(websocket.TextMessage, []byte(data)); err != nil {
|
|
applogger.Error("WebSocket sent error: data=%s, error=%s", data, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Remove a client from a topic
|
|
func removeClientFromTopic(clients []*Client, c *Client) []*Client {
|
|
for i, client := range clients {
|
|
if client == c {
|
|
return append(clients[:i], clients[i+1:]...)
|
|
}
|
|
}
|
|
return clients
|
|
}
|
|
|