简介
NSQ是一个基于Go语言的分布式实时消息平台。具有分布式,易于水平扩展,易于安装,易于集成(主流语言都有对应的客户端库)的特点。
NSQ是由四个重要组件构成:
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程,它可以独立运行,不过通常它是由
nsqlookupd
实例所在集群配置的 - nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
- nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
- utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
安装
这里推荐docker-compose的方式非常方便运维(其他安装方法可从参考链接中查看)
我这里使用的自己的腾讯云,系统centos7
我们需要先建一个docker-compose.yml
官网推荐配置如下
version: '2' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160" - "4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 ports: - "4150" - "4151" nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 ports: - "4171"
然后在该文件目录下执行
docker-compose up -d
之后我们会创建一个私有网络包含nsqd,nsqlookupd,nsqadmin三个容器,宿主机会产生随机端口映射到配置文件中各容器端口号,然后我们可以使用
docker-compose ps
查看当前容器运行状态和端口映射信息
假设nsqadmin 的宿主机映射端口为1033 ,浏览器访问宿主机ip:1033 可以视图观察nsq运行状态,创建处理topic
使用
这里有2种消费者的写法,第一种是直连nsqd(tcp长连接),第二种是通过nsqlookupd的http接口查询后长连接到nsqd, 显然第二种更易于分布式容错和高可用。这里我们都贴下代码
第一种nsqd直连
package main import ( "flag" "log" "time" "github.com/nsqio/go-nsq" ) func main() { go startConsumer() startProducer() } var url string func init() { //具体ip,端口根据实际情况传入或者修改默认配置 flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd") flag.Parse() } // 生产者 func startProducer() { cfg := nsq.NewConfig() producer, err := nsq.NewProducer(url, cfg) if err != nil { log.Fatal(err) } // 发布消息 for { if err := producer.Publish("test", []byte("test message")); err != nil { log.Fatal("publish error: " + err.Error()) } time.Sleep(1 * time.Second) } } // 消费者 func startConsumer() { cfg := nsq.NewConfig() consumer, err := nsq.NewConsumer("test", "sensor01", cfg) if err != nil { log.Fatal(err) } // 设置消息处理函数 consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Println(string(message.Body)) return nil })) // 连接到单例nsqd if err := consumer.ConnectToNSQD(url); err != nil { log.Fatal(err) } <-consumer.StopChan }
第二种nsqlookupd方式
package main import ( "flag" "log" "time" "github.com/nsqio/go-nsq" ) func main() { go startConsumer() startProducer() } var url string var url1 string func init() { //具体ip,端口根据实际情况传入或者修改默认配置 flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd") //tcp flag.StringVar(&url1, "url1", "127.0.0.1:4161", "nsqlookupd") //http flag.Parse() } // 生产者 func startProducer() { cfg := nsq.NewConfig() producer, err := nsq.NewProducer(url, cfg) if err != nil { log.Fatal(err) } // 发布消息 for { if err := producer.Publish("test", []byte("test message")); err != nil { log.Fatal("publish error: " + err.Error()) } time.Sleep(1 * time.Second) } } // 消费者 func startConsumer() { cfg := nsq.NewConfig() consumer, err := nsq.NewConsumer("test", "sensor01", cfg) if err != nil { log.Fatal(err) } // 设置消息处理函数 consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error { log.Println(string(message.Body)) return nil })) // nsqlookupd //[]string if err := consumer.ConnectToNSQLookupds([]string{url1}); err != nil { log.Fatal(err) } <-consumer.StopChan }
两种方式最大的区别除了函数不同
consumer.ConnectToNSQLookupds([]string{url1})
nsqlookupd方式的参数传入的一个字符串的切片,这里要特别注意。
另外如果nsqlookupd的地址确定只有一个也可以使用
consumer.ConnectToNSQLookupd(url)
在这里因为我是本机调用腾讯云公网ip 在使用第二种方式的时候遇到一个问题,调用nsqlookupd的http接口后获取到的是容器暴露的4150 的nsq 的tcp端口 ,实际上宿主机的映射端口并不是4150,会连接失败
这里我的做法是修改之前docker-compose.yml 文件,增加显式端口映射规则,新配置如下
version: '2' services: nsqlookupd: image: nsqio/nsq command: /nsqlookupd ports: - "4160:4160" - "4161:4161" nsqd: image: nsqio/nsq command: /nsqd --lookupd-tcp-address=nsqlookupd:4160 ports: - "4150:4150" - "4151:4151" nsqadmin: image: nsqio/nsq command: /nsqadmin --lookupd-http-address=nsqlookupd:4161 ports: - "4171:4171"
然后我们需要重启重新编译这些容器,这里需要注意使用命令 docker-compose restart 并不会像nginx reload一样重载文件达到配置更新的目的。这里我们需要运行
docker-compose down //停止容器并且删除 docker-compose up -d//编译运行
然后我们再来重新跑第二种方式的代码,会出现一个新错误,无法解析host name ,这里是因为nsqloopupd的http接口返回的是容器的host name ,和我本机访问不在一个私有网络,dns无法正确解析。这里我是在本机mac的 /etc/hosts 增加了一条host规则来粗暴解决。
提醒
- Producer断线后不会重连,需要自己手动重连,Consumer断线后会自动重连
- Consumer的重连时间配置项有两个功能
- Consumer检测到与nsqd的连接断开后,每隔x秒向nsqd请求重连
- Consumer每隔x秒,向nsqlookupd进行http轮询,用来更新自己的nsqd地址目录
- Consumer的重连时间默认是60s
- Consumer可以同时接收不同nsqd node的同名topic数据,为了避免混淆,就必须在客户端进行处理
- 在AddHandler中设置的接口回调是在另外的goroutine中执行的
- Producer不能发布(Publish)空message,否则会导致panic
参考
go-nsq分布式实时消息平台用法 https://www.yryz.net/post/go-nsq-usage.html
单机安装和快速入门(官网) http://nsq.io/overview/quick_start.html
docker和docker-compose搭建nsq平台(官网) http://nsq.io/deployment/docker.html
nsq指南(官网中文版) http://wiki.jikexueyuan.com/project/nsq-guide
[译]我们是如何使用NSQ处理7500亿消息的 http://www.jointforce.com/jfperiodical/article/1949
An Example of Using NSQ From Go http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/
golang使用Nsq https://segmentfault.com/a/1190000009194607
hostname 了解一下