Make sure your connection to RabbitMQ stays active

How to reconnect to RabbitMQ in Sean Treadway's amqp library for Go

Posted by Tobias Begalke on Thu May 12 2016
In Golang RabbitMQ Howto
Tags golang rabbitmq amqp

If, like me, you use Go and github.com/streadway/amqp to access RabbitMQ, you might wonder how to make sure your program reconnects to RabbitMQ after the connection was lost. Here’s how:

There’s a method called NotifyClose that belongs to amqp.Connection:

func NotifyClose(c chan *amqp.Error) (chan *amqp.Error)

This method registers a listener for close events caused by errors while the connection should be active. The listener can be used to re-establish the connection and to re-run the setup process.

Here’s one way to use the listener:

package main

import (
  "flag"
  "github.com/streadway/amqp"
  "log"
  "time"
)

var amqpUri = flag.String("r", "amqp://guest:[email protected]/", "RabbitMQ URI")

var (
  rabbitConn       *amqp.Connection
  rabbitCloseError chan *amqp.Error
)

// Try to connect to the RabbitMQ server as
// long as it takes to establish a connection
//
func connectToRabbitMQ(uri string) *amqp.Connection {
  for {
    conn, err := amqp.Dial(uri)

    if err == nil {
      return conn
    }   

    log.Println(err)
    log.Printf("Trying to reconnect to RabbitMQ at %s\n", uri)
    time.Sleep(500 * time.Millisecond)
  }
}

// re-establish the connection to RabbitMQ in case
// the connection has died
//
func rabbitConnector(uri string) {
  var rabbitErr *amqp.Error

  for {
    rabbitErr = <-rabbitCloseError
    if rabbitErr != nil {
      log.Printf("Connecting to %s\n", *amqpUri)

      rabbitConn = connectToRabbitMQ(uri)
      rabbitCloseError = make(chan *amqp.Error)
      rabbitConn.NotifyClose(rabbitCloseError)

      // run your setup process here
    }   
  }
}

func main() {
  flag.Parse()

  // create the rabbitmq error channel
  rabbitCloseError = make(chan *amqp.Error)

  // run the callback in a separate thread
  go rabbitConnector(*amqpUri)

  // establish the rabbitmq connection by sending
  // an error and thus calling the error callback
  rabbitCloseError <- amqp.ErrClosed

}

Photo Credits

Rabbit! / Kaninchen! by Robobobobo (cropped, colors mapped to monochrome range, vignetted, licensed under CC BY-SA 2.0).