You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

86 lines
2.0 KiB

2 months ago
package pubsub
import (
"sync"
"time"
)
type (
subscriber chan interface{} //订阅者,类型为管道
topicFunc func(v interface{}) bool //主题,是一个过滤器函数
)
// 发布者对象
type publisher struct {
m sync.RWMutex //读写锁
buffer int //订阅队列缓存大小
timeout time.Duration //发布超时时间
subscribers map[subscriber]topicFunc //订阅者信息
}
// 构建一个新的发布者对象
func NewPublisher(buffer int, publishTimeout time.Duration) *publisher {
return &publisher{
m: sync.RWMutex{},
buffer: buffer,
timeout: publishTimeout,
subscribers: make(map[subscriber]topicFunc),
}
}
// 添加一个新的订阅者,订阅过滤器筛选后的主题
func (p *publisher) SubscriberTopic(topic topicFunc) chan interface{} {
ch := make(chan interface{}, p.buffer)
p.m.Lock()
defer p.m.Unlock()
p.subscribers[ch] = topic
return ch
}
// 添加一个订阅者,订阅所有主题
func (p *publisher) SubscriberAllTopic() chan interface{} {
return p.SubscriberTopic(nil)
}
// 退出订阅
func (p *publisher) Exict(sub chan interface{}) {
p.m.Lock()
defer p.m.Unlock()
delete(p.subscribers, sub)
close(sub)
}
// 关闭发布者对象,同时关闭所有订阅者管道
func (p *publisher) Close() {
p.m.Lock()
defer p.m.Unlock()
for sub := range p.subscribers {
close(sub)
delete(p.subscribers, sub)
}
}
// 发布一个主题
func (p *publisher) Publish(v interface{}) {
p.m.RLock()
defer p.m.RUnlock()
wg := sync.WaitGroup{}
for sub, topic := range p.subscribers { //向所有的订阅者管道发送主题
wg.Add(1)
go p.SendTopic(sub, topic, v, &wg)
}
}
// 向订阅者发送主题
func (p *publisher) SendTopic(sub subscriber, topic topicFunc, v interface{}, wg *sync.WaitGroup) {
defer wg.Done()
if topic != nil && !topic(v) { //订阅者未订阅这个主题,不发送
return
}
select {
case sub <- v:
case <-time.After(p.timeout): //超时后就不再发送
}
}