Press "Enter" to skip to content

从nsq搭建到go-nsq的使用

内容目录

简介

NSQ是一个基于Go语言的分布式实时消息平台。具有分布式,易于水平扩展,易于安装,易于集成(主流语言都有对应的客户端库)的特点。

NSQ是由四个重要组件构成:

  • nsqd:一个负责接收、排队、转发消息到客户端的守护进程,它可以独立运行,不过通常它是由 nsqlookupd 实例所在集群配置的
  • nsqlookupd:管理拓扑信息并提供最终一致性的发现服务的守护进程
  • nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行各种各样的管理任务
  • utilities:常见基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq

安装

这里推荐docker-compose的方式非常方便运维(其他安装方法可从参考链接中查看)

我这里使用的自己的腾讯云,系统centos7

我们需要先建一个docker-compose.yml

官网推荐配置如下

version: '2'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160"
      - "4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    ports:
      - "4150"
      - "4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    ports:
      - "4171"

然后在该文件目录下执行

docker-compose up -d

之后我们会创建一个私有网络包含nsqd,nsqlookupd,nsqadmin三个容器,宿主机会产生随机端口映射到配置文件中各容器端口号,然后我们可以使用

docker-compose ps

查看当前容器运行状态和端口映射信息

假设nsqadmin 的宿主机映射端口为1033 ,浏览器访问宿主机ip:1033 可以视图观察nsq运行状态,创建处理topic

使用

这里有2种消费者的写法,第一种是直连nsqd(tcp长连接),第二种是通过nsqlookupd的http接口查询后长连接到nsqd, 显然第二种更易于分布式容错和高可用。这里我们都贴下代码

第一种nsqd直连

package main

import (
	"flag"
	"log"
	"time"

	"github.com/nsqio/go-nsq"
)

func main() {
	go startConsumer()
	startProducer()
}

var url string

func init() {
        //具体ip,端口根据实际情况传入或者修改默认配置
	flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd")
	flag.Parse()
}

// 生产者
func startProducer() {
	cfg := nsq.NewConfig()
	producer, err := nsq.NewProducer(url, cfg)
	if err != nil {
		log.Fatal(err)
	}
	// 发布消息
	for {
		if err := producer.Publish("test", []byte("test message")); err != nil {
			log.Fatal("publish error: " + err.Error())
		}
		time.Sleep(1 * time.Second)
	}
}

// 消费者
func startConsumer() {
	cfg := nsq.NewConfig()
	consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
	if err != nil {
		log.Fatal(err)
	}
	// 设置消息处理函数
	consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		log.Println(string(message.Body))
		return nil
	}))
	// 连接到单例nsqd
	if err := consumer.ConnectToNSQD(url); err != nil {
		log.Fatal(err)
	}
	<-consumer.StopChan
}

第二种nsqlookupd方式

package main

import (
	"flag"
	"log"
	"time"

	"github.com/nsqio/go-nsq"
)

func main() {
	go startConsumer()
	startProducer()
}

var url string
var url1 string

func init() {
        //具体ip,端口根据实际情况传入或者修改默认配置
	flag.StringVar(&url, "url", "127.0.0.1:4150", "nsqd")         //tcp
	flag.StringVar(&url1, "url1", "127.0.0.1:4161", "nsqlookupd") //http

	flag.Parse()
}

// 生产者
func startProducer() {
	cfg := nsq.NewConfig()
	producer, err := nsq.NewProducer(url, cfg)
	if err != nil {
		log.Fatal(err)
	}
	// 发布消息
	for {
		if err := producer.Publish("test", []byte("test message")); err != nil {
			log.Fatal("publish error: " + err.Error())
		}
		time.Sleep(1 * time.Second)
	}
}

// 消费者
func startConsumer() {
	cfg := nsq.NewConfig()
	consumer, err := nsq.NewConsumer("test", "sensor01", cfg)
	if err != nil {
		log.Fatal(err)
	}
	// 设置消息处理函数
	consumer.AddHandler(nsq.HandlerFunc(func(message *nsq.Message) error {
		log.Println(string(message.Body))
		return nil
	}))
	// nsqlookupd
	//[]string
	if err := consumer.ConnectToNSQLookupds([]string{url1}); err != nil {
		log.Fatal(err)
	}
	<-consumer.StopChan
}

两种方式最大的区别除了函数不同

consumer.ConnectToNSQLookupds([]string{url1})

nsqlookupd方式的参数传入的一个字符串的切片,这里要特别注意。

另外如果nsqlookupd的地址确定只有一个也可以使用

consumer.ConnectToNSQLookupd(url)

在这里因为我是本机调用腾讯云公网ip 在使用第二种方式的时候遇到一个问题,调用nsqlookupd的http接口后获取到的是容器暴露的4150 的nsq 的tcp端口 ,实际上宿主机的映射端口并不是4150,会连接失败

这里我的做法是修改之前docker-compose.yml 文件,增加显式端口映射规则,新配置如下

version: '2'
services:
  nsqlookupd:
    image: nsqio/nsq
    command: /nsqlookupd
    ports:
      - "4160:4160"
      - "4161:4161"
  nsqd:
    image: nsqio/nsq
    command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
    ports:
      - "4150:4150"
      - "4151:4151"
  nsqadmin:
    image: nsqio/nsq
    command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
    ports:
      - "4171:4171"

然后我们需要重启重新编译这些容器,这里需要注意使用命令 docker-compose restart 并不会像nginx reload一样重载文件达到配置更新的目的。这里我们需要运行

docker-compose down //停止容器并且删除

docker-compose up -d//编译运行

然后我们再来重新跑第二种方式的代码,会出现一个新错误,无法解析host name ,这里是因为nsqloopupd的http接口返回的是容器的host name ,和我本机访问不在一个私有网络,dns无法正确解析。这里我是在本机mac的 /etc/hosts 增加了一条host规则来粗暴解决。

提醒

  • Producer断线后不会重连,需要自己手动重连,Consumer断线后会自动重连
  • Consumer的重连时间配置项有两个功能
    • Consumer检测到与nsqd的连接断开后,每隔x秒向nsqd请求重连
    • Consumer每隔x秒,向nsqlookupd进行http轮询,用来更新自己的nsqd地址目录
    • Consumer的重连时间默认是60s
  • Consumer可以同时接收不同nsqd node的同名topic数据,为了避免混淆,就必须在客户端进行处理
  • 在AddHandler中设置的接口回调是在另外的goroutine中执行的
  • Producer不能发布(Publish)空message,否则会导致panic

参考

go-nsq分布式实时消息平台用法  https://www.yryz.net/post/go-nsq-usage.html

单机安装和快速入门(官网)  http://nsq.io/overview/quick_start.html

docker和docker-compose搭建nsq平台(官网)   http://nsq.io/deployment/docker.html

nsq指南(官网中文版)  http://wiki.jikexueyuan.com/project/nsq-guide

[译]我们是如何使用NSQ处理7500亿消息的 http://www.jointforce.com/jfperiodical/article/1949

An Example of Using NSQ From Go http://tleyden.github.io/blog/2014/11/12/an-example-of-using-nsq-from-go/

golang使用Nsq https://segmentfault.com/a/1190000009194607

One Comment

  1. HeminWon
    HeminWon 2019年6月29日

    hostname 了解一下

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注