What is NSQ and how to Implement it with Go?

Rahul Kapoor
4 min readJul 4, 2023

--

Implementing NSQ with Go

NSQ is an open-source real-time distributed message queue with no single point of failure. It is a reliable message delivery service with high availability.

NSQ is easy to implement with go/golang and python using the package go-nsq.

It handles a massive number of messages every day, providing fault tolerance and high availability with a reliable message delivery.

To read more about features that NSQ provides: https://nsq.io/overview/features_and_guarantees.html#features

A single nsqd instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic. In practice, a channel maps to a downstream service consuming a topic.

nsqd instance showing topics and channels

To summarise a single nsqd instance can have multiple topics -> channel (every channel receives a copy of all messages for that topic) but evenly distributed from channel -> consumers (each consumer receives a portion of the messages for that channel).

A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client.

We also have nsqlookupd which provides a directory where all the consumers connect to and lookup the address for the nsqd instance which further provides them the topic they are interested in subscribing to.

How nsqlookupd knows about the nsqd instances?
At a lower level, each nsqd has a long-lived TCP connection to nsqlookupd over which it periodically pushes its state. This data is used to inform which nsqd addresses nsqlookupd will give to consumers. For consumers, an HTTP /lookup endpoint is exposed for polling.

NSQ is designed to be used in a distributed fashion. nsqd clients are connected (over TCP) to all instances providing the specified topic. There are no middle-men, no message brokers, and no SPOFs

nsqd topology which eliminates SPOF (Single Point Of Failure)

This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from allproducers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.

NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and handle the logic at their end.

Implementing NSQ with Go/Golang

Follow this doc to run a small NSQ cluster on your local machine and walk through publishing and consuming messages:
https://nsq.io/overview/quick_start.html

Producer

A Producer instance is 1:1 with a destination nsqd and will lazily connect to that instance (and re-connect) when Publish commands are executed.

We take a topic learningnsq and with the default config setting we will call the newProducer func associated with the imported go-nsq package.

Here we pass our address as a string and the initialized config.

Then we call Publish on the returned instance which further takes in the nsqtopic and slice of bytes.

Publish synchronously publishes a message body to the specified topic, returning an error if publish failed.

package main

import (
"log"
"strconv"

"github.com/nsqio/go-nsq"
)

func main() {
var nsqtopic = "learningnsq"

config := nsq.NewConfig()
w, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic("Error while connecting producer to nsqd")
}

for i := 0; i < 100; i++ {
err := w.Publish(nsqtopic, []byte("implementing nsq with golang step: "+strconv.Itoa(i)))
if err != nil {
log.Panic("Publishing of message on topic : " + nsqtopic + " failed")
}
}

w.Stop()
}

Consumer code

Consumer is a high-level type to consume from nsq.

NewConsumer creates a new instance of Consumer for the specified topic/channel.

A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel.

package main

import (
"fmt"
"log"
"sync"

"github.com/nsqio/go-nsq"
)

type myHandler struct{}

func (*myHandler) HandleMessage(message *nsq.Message) error {
fmt.Println(string(message.Body))
return nil
}

func main() {
var nsqtopic = "learningnsq"

var wg sync.WaitGroup
wg.Add(1)

config := nsq.NewConfig()
q, err := nsq.NewConsumer(nsqtopic, "channel", config)
if err != nil {
log.Panic("Error while connecting consumer to nsqd")
}

//set the handler to the consumer to receive and process message
q.AddHandler(&myHandler{})

//connect to my nsqd
err1 := q.ConnectToNSQD("127.0.0.1:4150")
if err1 != nil {
log.Panic("Could not connect to nsqd")
}

wg.Wait()
}

Redirect to the nsqadmin Web UI, whose URL + port you will get in your terminal. It will look something as below screenshot

nsqadmin Web UI with details of topic and messages

After running the above producer and consumer code, you can see on the nsqadmin that 100 messages got processed.

Counter of messages processed at nsqadmin Web UI

Screenshot of logs of the running consumer code:

Snapshot showing messages being logged implemented in myHandler

--

--