If, like me, you use Go and github.com/streadway/amqp to access RabbitMQ, you might wonder how to make sure your program reconnects to RabbitMQ after the connection was lost. Here’s how:
There’s a method called NotifyClose that belongs to amqp.Connection:
func NotifyClose(c chan *amqp.Error) (chan *amqp.Error)
This method registers a listener for close events caused by errors while the connection should be active. The listener can be used to re-establish the connection and to re-run the setup process.
Here’s one way to use the listener:
package main
import (
"flag"
"github.com/streadway/amqp"
"log"
"time"
)
var amqpUri = flag.String("r", "amqp://guest:[email protected]/", "RabbitMQ URI")
var (
rabbitConn *amqp.Connection
rabbitCloseError chan *amqp.Error
)
// Try to connect to the RabbitMQ server as
// long as it takes to establish a connection
//
func connectToRabbitMQ(uri string) *amqp.Connection {
for {
conn, err := amqp.Dial(uri)
if err == nil {
return conn
}
log.Println(err)
log.Printf("Trying to reconnect to RabbitMQ at %s\n", uri)
time.Sleep(500 * time.Millisecond)
}
}
// re-establish the connection to RabbitMQ in case
// the connection has died
//
func rabbitConnector(uri string) {
var rabbitErr *amqp.Error
for {
rabbitErr = <-rabbitCloseError
if rabbitErr != nil {
log.Printf("Connecting to %s\n", *amqpUri)
rabbitConn = connectToRabbitMQ(uri)
rabbitCloseError = make(chan *amqp.Error)
rabbitConn.NotifyClose(rabbitCloseError)
// run your setup process here
}
}
}
func main() {
flag.Parse()
// create the rabbitmq error channel
rabbitCloseError = make(chan *amqp.Error)
// run the callback in a separate thread
go rabbitConnector(*amqpUri)
// establish the rabbitmq connection by sending
// an error and thus calling the error callback
rabbitCloseError <- amqp.ErrClosed
}
Photo Credits
Rabbit! / Kaninchen! by Robobobobo (cropped, colors mapped to monochrome range, vignetted, licensed under CC BY-SA 2.0).