You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
6.4 KiB
241 lines
6.4 KiB
package us
import (
var (
UsNewCode = make(map[string]bool)
ctx, cancel = context.WithCancel(context.Background())
wg sync.WaitGroup
var upGrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
// Verification request`
CheckOrigin: func(r *http.Request) bool {
// If it is not a get request, return an error
if r.Method != "GET" {
fmt.Println("Request method error")
return false
// If chat is not included in the path, an error is returned
if r.URL.Path != "/ws" {
fmt.Println("Request path error")
return false
// Verification rules can also be customized according to other needs
return true
// Client represents a WebSocket client.
type Client struct {
conn *websocket.Conn
subscriptions map[string]bool // Tracks which topics the client is subscribed to
// Hub maintains the set of active clients and broadcasts messages to the clients.
type Hub struct {
clients map[*Client]bool // All connected clients
broadcast chan Message // Broadcast channel for messages
topics map[string][]*Client // Topic to clients mapping for subscriptions
mu sync.Mutex // Mutex for safe concurrent access
// Message structure to hold the message content and the topic
type Message struct {
Topic string `json:"topic"`
Content string `json:"content"`
// Initialize a new Hub
func newHub() *Hub {
return &Hub{
clients: make(map[*Client]bool),
broadcast: make(chan Message),
topics: make(map[string][]*Client),
// Start the Hub to listen for messages
func (h *Hub) run() {
for {
msg := <-h.broadcast
// Lock the hub to safely access the topics map
if clients, ok := h.topics[msg.Topic]; ok {
for _, client := range clients {
err := client.conn.WriteJSON(msg)
if err != nil {
log.Printf("Error writing message to client: %v", err)
delete(h.clients, client)
// Handle WebSocket connections
func handleConnection(hub *Hub, w http.ResponseWriter, r *http.Request) {
conn, err := upGrader.Upgrade(w, r, nil)
if err != nil {
log.Println("Error during connection upgrade:", err)
client := &Client{conn: conn, subscriptions: make(map[string]bool)}
hub.clients[client] = true
go func() {
defer func() {
delete(hub.clients, client)
for topic := range client.subscriptions {
hub.topics[topic] = removeClientFromTopic(hub.topics[topic], client)
for {
var msg Message
if err = conn.ReadJSON(&msg); err != nil {
log.Println("Error reading message:", err)
applogger.Info("message info:%v---%v", msg.Topic, msg.Content)
if msg.Content == "ping" || msg.Content == "subscribe" {
client.subscriptions[msg.Topic] = true
hub.topics[msg.Topic] = append(hub.topics[msg.Topic], client)
switch msg.Content {
case "subscribe":
hub.broadcast <- Message{Topic: msg.Topic, Content: "subscribe success"}
case "ping":
hub.broadcast <- Message{Topic: msg.Topic, Content: "pong"}
// TODO: 广播客户端订阅消息
//hub.broadcast <- msg
// ShareMarket content
func (h *Hub) ShareMarket() {
ticker := time.NewTicker(time.Second * 10)
defer ticker.Stop()
for {
select {
case <-ticker.C:
applogger.Info("Execute automatic subscription........")
if conn := subscribeFinnHub(); conn != nil {
// 客户端broadcast,消息推送
if err := h.SendAllClientNew(conn); err != nil {
func (h *Hub) SendAllClientNew(conn *websocket.Conn) error {
sendUsCodeNew(conn, false) // 初始化所有项目股票code订阅
go singleSendUsCodeNew(conn) // 单线程 订阅后续新增的股票
for {
_, msg, err := conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
applogger.Error("IsUnexpectedCloseError error: %v", err)
return errors.New("Error reading message")
applogger.Debug("ReadMessage data info:%v", string(msg))
var subModel model.FinnhubMessage
if err := json.Unmarshal(msg, &subModel); err != nil {
applogger.Error("subModel Unmarshal info err: %v", err)
// send to websocket
h.broadcast <- Message{Topic: "us", Content: string(msg)}
// Link to third-party services
func subscribeFinnHub() *websocket.Conn {
url := fmt.Sprintf("wss://%v?token=%v", config.Config.FinnhubUs.FinnhubWss, config.Config.FinnhubUs.FinnhubKey)
applogger.Info("subscribeFinnHub info Url:%v", url)
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
applogger.Error("Failed to link to wss server:%v", err)
return nil
return conn
func singleSendUsCodeNew(conn *websocket.Conn) {
for {
time.Sleep(10 * time.Minute)
//if err := Send(conn, fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", "")); err != nil {
// applogger.Error("ws:%v", err)
// return
applogger.Debug("send new us stock start")
sendUsCodeNew(conn, true)
func sendUsCodeNew(conn *websocket.Conn, isSingle bool) {
stockRes := cache.ReadListDB()
for _, code := range stockRes {
sendPing := fmt.Sprintf("{\"type\":\"subscribe\",\"symbol\":\"%v\"}", code)
if !isSingle { //订阅全部美股
fmt.Println("subscribe:", code)
Send(conn, sendPing)
UsNewCode[code] = true
if !UsNewCode[code] { //追加订阅新股票
applogger.Debug("new us stock:%v", sendPing)
Send(conn, sendPing)
UsNewCode[code] = true
func Send(conn *websocket.Conn, data string) error {
if conn == nil {
applogger.Error("WebSocket sent error: no connection available")
return errors.New("WebSocket sent error: no connection available")
if err := conn.WriteMessage(websocket.TextMessage, []byte(data)); err != nil {
applogger.Error("WebSocket sent error: data=%s, error=%s", data, err)
return err
return nil
// Remove a client from a topic
func removeClientFromTopic(clients []*Client, c *Client) []*Client {
for i, client := range clients {
if client == c {
return append(clients[:i], clients[i+1:]...)
return clients