nsq的使用

雨中笑 goland 525热度

简介有点久没有写文章了,分享下之前做项目用到的nsq吧!

一、NSQ是什么

NSQ是一个基于Go语言的分布式实时消息平台,可用于大规模系统中的实时消息服务。源码

二、部署

1、首先在电脑里安装docker

这里的步骤可以看我之前记录的Windows10下安装Docker

2、docker下部署NSQ集群

通过 Docker Compose 进行启动docker-compose.yml

version: '3'
services:
nsqlookupd:
restart: always
image: nsqio/nsq
command: /nsqlookupd --broadcast-address=192.168.31.xx
ports:
- "4160:4160"
- "4161:4161"
nsqd:
restart: always
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=192.168.31.xx:4160 --broadcast-address=192.168.31.xx --tcp-address=:4150 --http-address=:4151
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151:4151"
nsqd2:
restart: always
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=192.168.31.xx:4160 --broadcast-address=192.168.31.xx --tcp-address=:4250 --http-address=:4251
depends_on:
- nsqlookupd
ports:
- "4250:4250"
- "4251:4251"
nsqadmin:
restart: always
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=192.168.31.xx:4161
depends_on:
- nsqlookupd
ports:
- "4171:4171"

在docker-compose.yml目录下运行

docker-compose up -d

3、查看部署情况

浏览器输入http:// 192.168.31.xx/nodes


出现这个界面就成功了

三、使用

1、生产者

production.go

package main

import (
"fmt"
jsoniter "github.com/json-iterator/go"
"time"

nsq "github.com/nsqio/go-nsq"
)

func main() {
// 定义nsq生产者
var producer *nsq.Producer
// 初始化生产者
producer, err := nsq.NewProducer("127.0.0.1:4150", nsq.NewConfig())
if err != nil {
panic(err)
}

err = producer.Ping()
if nil != err {
// 关闭生产者
producer.Stop()
producer = nil
}

topic := "test"
message := map[string]interface{}{
"type": "HighRiskUser",
"create_time": time.Now().Unix(),
"user_id": 123456,
}
j := jsoniter.ConfigCompatibleWithStandardLibrary
b, err := j.Marshal(message)
if err != nil {
fmt.Println("authent/data_serialize_error", "event msg serialzie error")
}
// 发布消息
err = producer.Publish(topic, b)
if err != nil {
fmt.Printf("producer.Publish,err : %v", err)
}
fmt.Println(message)

fmt.Println("producer.Publish success")
}

在命令行运行 go  run production.go

2、消费者

consume.go

package main

import (
"fmt"
"time"

"github.com/nsqio/go-nsq"
)

// ConsumerT 消费者
type ConsumerT struct{}

// 主函数
func main() {
InitConsumer("test", "test-channel", "127.0.0.1:4161")
for {
time.Sleep(time.Second * 10)
}
}

// HandleMessage 处理消息
func (*ConsumerT) HandleMessage(msg *nsq.Message) error {
fmt.Println("receive", msg.NSQDAddress, "message:", string(msg.Body))
return nil
}

// InitConsumer 初始化消费者
func InitConsumer(topic string, channel string, address string) {
cfg := nsq.NewConfig()
cfg.LookupdPollInterval = time.Second //设置重连时间
c, err := nsq.NewConsumer(topic, channel, cfg) // 新建一个消费者
if err != nil {
panic(err)
}
c.SetLogger(nil, 0) //屏蔽系统日志
c.AddHandler(&ConsumerT{}) // 添加消费者接口

// 建立NSQLookupd连接
if err := c.ConnectToNSQLookupd(address); err != nil {
panic(err)
}
}

在另外一个命令行下运行 go  run consume.go

3、查看情况

可看到1步骤输出:

2022/04/11 20:09:44 INF    1 (127.0.0.1:4150) connecting to nsqd
map[create_time:1649678984 type:HighRiskUser user_id:123456]
producer.Publish success

2步骤输出:

receive 192.168.31.64:4150 message: {"create_time":1649678981,"type":"HighRiskUser","user_id":123456}


很赞哦!(1)

本文阅读量 1011发布于 2022年4月11日

您的访问IP 3.141.47.221最早于 2024年5月1日 8时03分45秒 阅读过本文 为本文提供了 1 热度 1 阅读量

文章评论
回帖