What is NSQ and how to Implement it with Go?
NSQ is an open-source real-time distributed message queue with no single point of failure. It is a reliable message delivery service with high availability.
NSQ is easy to implement with go/golang and python using the package go-nsq.
It handles a massive number of messages every day, providing fault tolerance and high availability with a reliable message delivery.
To read more about features that NSQ provides: https://nsq.io/overview/features_and_guarantees.html#features
A single nsqd
instance is designed to handle multiple streams of data at once. Streams are called “topics” and a topic has 1 or more “channels”. Each channel receives a copy of all the messages for a topic. In practice, a channel maps to a downstream service consuming a topic.
To summarise a single nsqd instance can have multiple topics -> channel (every channel receives a copy of all messages for that topic) but evenly distributed from channel -> consumers (each consumer receives a portion of the messages for that channel).
A channel can, and generally does, have multiple clients connected. Assuming all connected clients are in a state where they are ready to receive messages, each message will be delivered to a random client.
We also have nsqlookupd which provides a directory where all the consumers connect to and lookup the address for the nsqd instance which further provides them the topic they are interested in subscribing to.
How nsqlookupd knows about the nsqd instances?
At a lower level, each nsqd
has a long-lived TCP connection to nsqlookupd
over which it periodically pushes its state. This data is used to inform which nsqd
addresses nsqlookupd
will give to consumers. For consumers, an HTTP /lookup
endpoint is exposed for polling.
NSQ is designed to be used in a distributed fashion. nsqd
clients are connected (over TCP) to all instances providing the specified topic. There are no middle-men, no message brokers, and no SPOFs
This topology eliminates the need to chain single, aggregated, feeds. Instead you consume directly from allproducers. Technically, it doesn’t matter which client connects to which NSQ, as long as there are enough clients connected to all producers to satisfy the volume of messages, you’re guaranteed that all will eventually be processed.
NSQ guarantees that a message will be delivered at least once, though duplicate messages are possible. Consumers should expect this and handle the logic at their end.
Implementing NSQ with Go/Golang
Follow this doc to run a small NSQ cluster on your local machine and walk through publishing and consuming messages:
https://nsq.io/overview/quick_start.html
Producer
A Producer instance is 1:1 with a destination nsqd and will lazily connect to that instance (and re-connect) when Publish commands are executed.
We take a topic learningnsq and with the default config setting we will call the newProducer func associated with the imported go-nsq package.
Here we pass our address as a string and the initialized config.
Then we call Publish on the returned instance which further takes in the nsqtopic and slice of bytes.
Publish synchronously publishes a message body to the specified topic, returning an error if publish failed.
package main
import (
"log"
"strconv"
"github.com/nsqio/go-nsq"
)
func main() {
var nsqtopic = "learningnsq"
config := nsq.NewConfig()
w, err := nsq.NewProducer("127.0.0.1:4150", config)
if err != nil {
log.Panic("Error while connecting producer to nsqd")
}
for i := 0; i < 100; i++ {
err := w.Publish(nsqtopic, []byte("implementing nsq with golang step: "+strconv.Itoa(i)))
if err != nil {
log.Panic("Publishing of message on topic : " + nsqtopic + " failed")
}
}
w.Stop()
}
Consumer code
Consumer is a high-level type to consume from nsq.
NewConsumer creates a new instance of Consumer for the specified topic/channel.
A Consumer instance is supplied a Handler that will be executed concurrently via goroutines to handle processing the stream of messages consumed from the specified topic/channel.
package main
import (
"fmt"
"log"
"sync"
"github.com/nsqio/go-nsq"
)
type myHandler struct{}
func (*myHandler) HandleMessage(message *nsq.Message) error {
fmt.Println(string(message.Body))
return nil
}
func main() {
var nsqtopic = "learningnsq"
var wg sync.WaitGroup
wg.Add(1)
config := nsq.NewConfig()
q, err := nsq.NewConsumer(nsqtopic, "channel", config)
if err != nil {
log.Panic("Error while connecting consumer to nsqd")
}
//set the handler to the consumer to receive and process message
q.AddHandler(&myHandler{})
//connect to my nsqd
err1 := q.ConnectToNSQD("127.0.0.1:4150")
if err1 != nil {
log.Panic("Could not connect to nsqd")
}
wg.Wait()
}
Redirect to the nsqadmin Web UI, whose URL + port you will get in your terminal. It will look something as below screenshot
After running the above producer and consumer code, you can see on the nsqadmin that 100 messages got processed.
Screenshot of logs of the running consumer code: