1.简单的循环订阅
package main import ( "fmt" "time" "os" "strconv" "math/rand" "github.com/gomodule/redigo/redis" ) const RMQ string = "mqtest" func producer() { redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("yaoyao")) if err != nil { fmt.Println(err) return } defer redis_conn.Close() rand.Seed(time.Now().UnixNano()) var i = 1 for { _,err = redis_conn.Do("PUBLISH", RMQ, strconv.Itoa(i)) if(err!=nil) { fmt.Println("produce error") continue } fmt.Printf("produce element:%d\n", i) time.Sleep(time.Duration(rand.Intn(10))*time.Second) i++ } } func consumer() { redis_conn, err := redis.Dial("tcp", "127.0.0.1:6379", redis.DialPassword("yaoyao")) if err != nil { fmt.Println(err) return } defer redis_conn.Close() rand.Seed(time.Now().UnixNano()) for { ele,err := redis.String(redis_conn.Do("SUBSCRIBE", RMQ)) if(err != nil) { fmt.Println("no msg.sleep now") time.Sleep(time.Duration(rand.Intn(10))*time.Second) } else { fmt.Printf("cosume element:%s\n", ele) } } } func main() { list := os.Args if(list[1] == "pro") { go producer() } else if (list[1] == "con") { go consumer() } for { time.Sleep(time.Duration(10000)*time.Second) } }2. 回调函数订阅
发布端
package main import( //"github.com/go-redis/redis" "github.com/gomodule/redigo/redis" log "github.com/astaxie/beego/logs" ) func main() { client, err := redis.Dial("tcp", "127.0.0.1:6379",redis.DialPassword("yaoyao")) if err != nil { log.Critical("redis dial failed.") } defer client.Close() _, err = client.Do("Publish", "test_chan1", "hell one") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan2", "hello2") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Do("Publish", "test_chan3", "hello3") if err != nil { log.Critical("redis Publish failed.") } _, err = client.Send( "test_chan1", "hell one one") if err != nil { log.Critical("redis Publish failed.") } }订阅端
package main import ( //"github.com/go-redis/redis" "fmt" "github.com/labstack/gommon/log" "github.com/gomodule/redigo/redis" "time" //"reflect" "unsafe" ) type SubscribeCallback func (channel, message string) type Subscriber struct { client redis.PubSubConn cbMap map[string]SubscribeCallback } func (c *Subscriber) Connect(ip string, port uint16) { conn, err := redis.Dial("tcp", "127.0.0.1:6379",redis.DialPassword("yaoyao")) if err != nil { log.Error("redis dial failed.") } c.client = redis.PubSubConn{conn} c.cbMap = make(map[string]SubscribeCallback) go func() { for { log.Info("wait...") switch res := c.client.Receive().(type) { case redis.Message: channel := (*string)(unsafe.Pointer(&res.Channel)) message := (*string)(unsafe.Pointer(&res.Data)) c.cbMap[*channel](*channel, *message) case redis.Subscription: fmt.Printf("%s: %s %d\n", res.Channel, res.Kind, res.Count) case error: log.Error("error handle...") continue } } }() } func (c *Subscriber) Close() { err := c.client.Close() if err != nil{ log.Error("redis close error.") } } func (c *Subscriber) Subscribe(channel interface{}, cb SubscribeCallback) { err := c.client.Subscribe(channel) if err != nil{ log.Error("redis Subscribe error.") } c.cbMap[channel.(string)] = cb } func TestCallback1(chann, msg string){ log.Info("TestCallback1 channel : ", chann, " message : ", msg) } func TestCallback2(chann, msg string){ log.Info("TestCallback2 channel : ", chann, " message : ", msg) } func TestCallback3(chann, msg string){ log.Info("TestCallback3 channel : ", chann, " message : ", msg) } func main() { log.Info("===========main start============") var sub Subscriber sub.Connect("127.0.0.1", 6397) sub.Subscribe("test_chan1", TestCallback1) sub.Subscribe("test_chan2", TestCallback2) sub.Subscribe("test_chan3", TestCallback3) for{ // 这段代码的作用就是 阻止线程结束 time.Sleep(1 * time.Second) } }