package marketwsscliert import ( "encoding/json" "github.com/gorilla/websocket" "github.com/robfig/cron" "time" "wss-pool/cmd/common" "wss-pool/cmd/websocketcollect/us" "wss-pool/cmd/websocketservice" "wss-pool/config" "wss-pool/dictionary" "wss-pool/internal/data/business" "wss-pool/logging/applogger" "wss-pool/pkg/model" ) var Cli websocketservice.User // 数字币行情采集 func RunHBDataRedis(checkData string) { HbMarketSpots(checkData) HbContract(checkData) } // 火币现货行情数据 func HbMarketSpots(checkData string) { switch checkData { case "subscribeDepth": // 市场深度行情数据 8863 subscribeDepth(model.SymbolListString(dictionary.Depth)) case "subscribeLevelMbp": // 市场深度MBP行情数据(增量推送)(150挡) 8864 subscribeLevelMbp(model.SymbolListString([]string{})) case "subscribeFullMbp": // 市场深度MBP行情数据(全量推送) 8865 subscribeFullMbp(model.SymbolListInt(dictionary.LevelsRefresh)) case "subscribeSubMbp": // 市场深度MBP行情数据(增量推送) 8866 subscribeSubMbp(model.SymbolListInt(dictionary.LevelsMbp)) case "subscribeBbo": // 买一卖一逐笔行情 8867 subscribeBbo(model.SymbolListString([]string{})) case "subscribeKLine": // K线数据 8868 subscribeKLine(model.SymbolListString(dictionary.TimeCycle)) case "subscribeTrade": // 成交明细 8869 subscribeTrade(model.SymbolListString([]string{})) case "subscribeLast24h": // 市场概要 8847 subscribeLast24h(model.SymbolListString([]string{})) case "subscribeTicker": // 聚合行情(Ticker) 8848 subscribeTicker(model.SymbolListString([]string{})) default: applogger.Info("Please select the data source that needs to be connected......") } } // 币本位永续合约行情数据 func HbContract(checkData string) { switch checkData { case "subscribeCtKline": // k线数据 8841 //加载配置数据 go GetModifyContract() subscribeCtKline(model.SymbolCtListString(dictionary.ContractTime)) case "subscribeCtDepth": // 深度信息 8842 subscribeCtDepth(model.SymbolCtListString(dictionary.ContractDepth)) case "subscribeCtAddDepth": // 新增深度信息 8843 subscribeCtAddDepth(model.SymbolCtListString(dictionary.ContractAddDepth)) case "subscribeCtBbo": // 买一卖一行情数据 8844 subscribeCtBbo(model.SymbolCtListString([]string{})) case "subscribeCtDetail": // 合约详情数据 8845 subscribeCtDetail(model.SymbolCtListString([]string{})) case "subscribeCtTradeDetail": // 合约贸易详情数据 8846 subscribeCtTradeDetail(model.SymbolCtListString([]string{})) default: applogger.Info("Please select the data source that needs to be connected......") } } // 美股行情采集 func ShareMarket(checkData string) { // TODO: 不需要推送插针数据 go business.NewPinStock(common.GetRedisNoPin(config.Config.Redis.NoPinAss)) switch checkData { case "usShare": // US ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: applogger.Info("Execute automatic subscription........") if conn := subscribeFinnhub(); conn != nil { subscribeMarketUsBak(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 美股行情采集分发 func ShareMarketBak(checkData string) { // TODO: 不需要推送插针数据 go business.NewPinStock(common.GetRedisNoPin(config.Config.Redis.NoPinAss)) switch checkData { case "usShare": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("ws", 7777); conn != nil { webSocketsWrite(conn, "us", "subscribe") subscribeMarketUsBakNew(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇行情采集分发 func ForexMarketBak(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("forexWs", 7778); conn != nil { webSocketsWrite(conn, "forex", "subscribe") pinInsertionRun() subscribeMarketForexBakNew(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇行情天采集分发 func ForexMarketDayBak(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("forexWs", 7778); conn != nil { webSocketsWrite(conn, "forexDay", "subscribe") pinInsertionDayRun() subscribeMarketDayForexBakNew(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇买一卖一报价采集分发 func ForexMarketQuoteBak(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("forexWs", 7778); conn != nil { webSocketsWrite(conn, "quotes", "subscribe") subscribeMarketForexQuoteBakNew(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇成交报价采集分发 func ForexMarketTradeBak(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("forexWs", 7778); conn != nil { webSocketsWrite(conn, "trade", "subscribe") subscribeMarketForexTradeBakNew(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇成交报价存储 func ForexMarketTradeBak2(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 10) defer ticker.Stop() for { select { case <-ticker.C: if conn := subscribeDispense("forexWs", 7778); conn != nil { webSocketsWrite(conn, "tradeStorage", "subscribe") subscribeMarketForexTradeBak2New(conn) } } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 外汇成交报价数据清理 func ForexMarketClearTradeBak2(checkData string) { switch checkData { case "forex": ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() for { select { case <-ticker.C: DeleteForexTrade() } } default: applogger.Info("Please select the data source that needs to be connected......") } } // 股票分发推送 func webSocketsWrite(conn *websocket.Conn, topic, content string) { message := us.Message{ Topic: topic, Content: content, } msg, err := json.Marshal(message) if err != nil { applogger.Error("marshal:%v", err) return } err = conn.WriteMessage(websocket.TextMessage, msg) if err != nil { applogger.Error("write:%v", err) return } } // 外汇交易对更新闭盘价 func ForexUpdateClosePrice() { // 创建一个cron调度器 c := cron.New() // 添加任务,每天0点执行 err := c.AddFunc("0 0 0 * * 1-5", func() { ForexUpdateCode() }) if err != nil { applogger.Error("", err) } // 启动cron调度器 c.Start() // 阻塞主线程,防止程序退出 select {} }