Redis和NSQ都有完善的订阅和发布实现,但参考它们的源码实现,做个实际例子,对两边的异同和这种机制会更有印象。
练习实现简单的 订阅/取消订阅/发布信息 功能,足够了。
Server.go
Server结构中的Dict用map保存了Channel的相关信息,而Channel结构中则用一个map保存了订阅这个Channel的Client.
这个与Redis中不太一样,Redis中的Dict用Channel的名字作为map的key,value则是其对应的Client列表。而Client中则保
存了其所有订阅的Channel信息,无所谓好坏,Redis相对更方便Client统计信息,练习中的这种则更简明些。
package pubsub import ( "errors" "sync" ) type Client struct { Id int Ip string } type Server struct { Dict map[string]*Channel //map[Channel.Name]*Channel sync.RWMutex } func NewServer() *Server { s := &Server{} s.Dict = make(map[string]*Channel) //所有channel return s } //订阅 func (srv *Server) Subscribe(client *Client, channelName string) { // 客户是否在Channel的客户列表中 srv.RLock() ch, found := srv.Dict[channelName] srv.RUnlock() if !found { ch = NewChannel(channelName) ch.AddClient(client) srv.Lock() srv.Dict[channelName] = ch srv.Unlock() } else { ch.AddClient(client) } } //取消订阅 func (srv *Server) Unsubscribe(client *Client, channelName string) { srv.RLock() ch, found := srv.Dict[channelName] srv.RUnlock() if found { if ch.DeleteClient(client) == 0 { ch.Exit() srv.Lock() delete(srv.Dict, channelName) srv.Unlock() } } } //发布消息 func (srv *Server) PublishMessage(channelName, message string) (bool, error) { srv.RLock() ch, found := srv.Dict[channelName] if !found { srv.RUnlock() return false, errors.New("channelName不存在!") } srv.RUnlock() ch.Notify(message) ch.Wait() return true, nil }
Channel.go
每个Channel 负责将信息放入WaitGroup,发送到Client或队列,例子中是打印一条信息。 当clients为空时,则exit().
import ( "fmt" "sync" "sync/atomic" ) type Channel struct { Name string clients map[int]*Client // exitChan chan int sync.RWMutex waitGroup WaitGroupWrapper messageCount uint64 exitFlag int32 } func NewChannel(channelName string) *Channel { return &Channel{ Name: channelName, // exitChan: make(chan int), clients: make(map[int]*Client), } } func (ch *Channel) AddClient(client *Client) bool { ch.RLock() _, found := ch.clients[client.Id] ch.RUnlock() ch.Lock() if !found { ch.clients[client.Id] = client } ch.Unlock() return found } func (ch *Channel) DeleteClient(client *Client) int { var ret int ch.ReplyMsg( fmt.Sprintf("从channel:%s 中删除client:%d ", ch.Name, client.Id)) ch.Lock() delete(ch.clients, client.Id) ch.Unlock() ch.RLock() ret = len(ch.clients) ch.RUnlock() return ret } func (ch *Channel) Notify(message string) bool { ch.RLock() defer ch.RUnlock() for cid, _ := range ch.clients { ch.ReplyMsg( fmt.Sprintf("channel:%s client:%d message:%s", ch.Name, cid, message)) } return true } func (ch *Channel) ReplyMsg(message string) { ch.waitGroup.Wrap(func() { fmt.Println(message) }) } func (ch *Channel) Wait() { ch.waitGroup.Wait() } func (ch *Channel) Exiting() bool { return atomic.LoadInt32(&ch.exitFlag) == 1 } func (ch *Channel) Exit() { if !atomic.CompareAndSwapInt32(&ch.exitFlag, 0, 1) { return } //close(ch.exitChan) ch.Wait() } func (ch *Channel) PutMessage(clientID int, message string) { ch.RLock() defer ch.RUnlock() if ch.Exiting() { return } //select { // case <-t.exitChan: // return //} fmt.Println(ch.Name, ":", message) atomic.AddUint64(&ch.messageCount, 1) return }
主程序:
//订阅/发布 练习 //author: Xiong Chuan Liang //date: 2015-3-17 package main import ( . "pubsub" ) func main(){ c1 := &Client{Id:100,Ip:"172.18.1.1"} c3:= &Client{Id:300,Ip:"172.18.1.3"} srv := NewServer() srv.Subscribe(c1,"Topic") srv.Subscribe(c3,"Topic") srv.PublishMessage("Topic","测试信息1") srv.Unsubscribe(c3,"Topic") srv.PublishMessage("Topic","测试信息2222") srv.Subscribe(c1,"Topic2") srv.Subscribe(c3,"Topic2") srv.PublishMessage("Topic2"," Topic2的测试信息") } /* 运行结果: channel:Topic client:100 message:测试信息1 channel:Topic client:300 message:测试信息1 从channel:Topic 中删除client:300 channel:Topic client:100 message:测试信息2222 channel:Topic2 client:100 message: Topic2的测试信息 channel:Topic2 client:300 message: Topic2的测试信息 */
没做太复杂的测试,粗略看好像没有问题。
MAIL: xcl_168@aliyun.com
BLOG: http://blog.csdn.net/xcl168
原文:http://blog.csdn.net/xcl168/article/details/44355611