You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
86 lines
2.0 KiB
86 lines
2.0 KiB
2 months ago
|
package pubsub
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type (
|
||
|
subscriber chan interface{} //订阅者,类型为管道
|
||
|
topicFunc func(v interface{}) bool //主题,是一个过滤器函数
|
||
|
)
|
||
|
|
||
|
// 发布者对象
|
||
|
type publisher struct {
|
||
|
m sync.RWMutex //读写锁
|
||
|
buffer int //订阅队列缓存大小
|
||
|
timeout time.Duration //发布超时时间
|
||
|
subscribers map[subscriber]topicFunc //订阅者信息
|
||
|
}
|
||
|
|
||
|
// 构建一个新的发布者对象
|
||
|
func NewPublisher(buffer int, publishTimeout time.Duration) *publisher {
|
||
|
return &publisher{
|
||
|
m: sync.RWMutex{},
|
||
|
buffer: buffer,
|
||
|
timeout: publishTimeout,
|
||
|
subscribers: make(map[subscriber]topicFunc),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 添加一个新的订阅者,订阅过滤器筛选后的主题
|
||
|
func (p *publisher) SubscriberTopic(topic topicFunc) chan interface{} {
|
||
|
ch := make(chan interface{}, p.buffer)
|
||
|
p.m.Lock()
|
||
|
defer p.m.Unlock()
|
||
|
p.subscribers[ch] = topic
|
||
|
return ch
|
||
|
}
|
||
|
|
||
|
// 添加一个订阅者,订阅所有主题
|
||
|
func (p *publisher) SubscriberAllTopic() chan interface{} {
|
||
|
return p.SubscriberTopic(nil)
|
||
|
}
|
||
|
|
||
|
// 退出订阅
|
||
|
func (p *publisher) Exict(sub chan interface{}) {
|
||
|
p.m.Lock()
|
||
|
defer p.m.Unlock()
|
||
|
delete(p.subscribers, sub)
|
||
|
close(sub)
|
||
|
}
|
||
|
|
||
|
// 关闭发布者对象,同时关闭所有订阅者管道
|
||
|
func (p *publisher) Close() {
|
||
|
p.m.Lock()
|
||
|
defer p.m.Unlock()
|
||
|
|
||
|
for sub := range p.subscribers {
|
||
|
close(sub)
|
||
|
delete(p.subscribers, sub)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 发布一个主题
|
||
|
func (p *publisher) Publish(v interface{}) {
|
||
|
p.m.RLock()
|
||
|
defer p.m.RUnlock()
|
||
|
wg := sync.WaitGroup{}
|
||
|
for sub, topic := range p.subscribers { //向所有的订阅者管道发送主题
|
||
|
wg.Add(1)
|
||
|
go p.SendTopic(sub, topic, v, &wg)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// 向订阅者发送主题
|
||
|
func (p *publisher) SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
|
||
|
defer wg.Done()
|
||
|
if topic != nil && !topic(v) { //订阅者未订阅这个主题,不发送
|
||
|
return
|
||
|
}
|
||
|
select {
|
||
|
case sub <- v:
|
||
|
case <-time.After(p.timeout): //超时后就不再发送
|
||
|
}
|
||
|
}
|