play/type blog

We are creating Germany's juiciest event platform, boomloop.com. Because we love the Internet more than our own mothers. See for yourself. check out boomloop.com


I did some refactoring this week, and the result is a much improved workling. Adding new work brokers is a snap now. I’ll show you how I added RabbitMQ, so that you can go about adding your own brokers. So without further ado…

RabbitMQ, a quick introduction

A lot of Ruby people have been talking about using RabbitMQ as their Queue of choice. Soundcloud.com are using it, as is new bamboo founder Johnathan Conway, who is using it at his video startup http://www.vzaar.com/. He says:

RabbitMQ – Now this is the matrons knockers when it comes to kick ass, ultra fast and scalable messaging. It simply rocks, with performance off the hook. It’s written in Erlang and supports the AMPQ protocol.

Follow the instructions here to get this beauty installed.

Adding Ampq to Workling

There are two new base classes you can extend to add new brokers. I’ll describe how this is done, but the code i show is already a part of workling. Skip to ‘activating amqp in your application’ to see how this is activated.

Clients

Clients help workling to connect to job brokers. To add an AmqpClient, we need to extend from Workling::Client::Base and implement a couple of methods.

require 'workling/clients/base'
require 'mq'

#
#  An Ampq client
#
module Workling
  module Clients
    class AmqpClient < Workling::Clients::Base

      # starts the client. 
      def connect
        @amq = MQ.new
      end

      # stops the client.
      def close
        @amq.close
      end

      # request work
      def request(queue, value)
        @amq.queue(queue).publish(value)
      end

      # retrieve work
      def retrieve(queue)
        @amq.queue(queue)
      end

      # subscribe to a queue
      def subscribe(queue)
        @amq.queue(queue).subscribe do |value|
          yield value
        end
      end

    end
  end
end

Were’s using the eventmachine amqp client for this, you can find it up on github. connect and close do exactly what it says on the tin: connecting to rabbitmq and closing the connection.

request and retrieve are responsible for placing work on rabbitmq. The methods are passed the correct queue, and a value that contains the worker method arguments. If you need control over the queue names, look at the RDoc for Workling::Routing::Base. In our case, there’s no special requirement here.

Finally, we implement a subscribe method. Use this if your broker supports callbacks, as is the case with amqp. This method expects to a block, which we pass into the amqp subscribe method here. The block will be called when a message is available on the queue, and the result is yielded into the block.

Having subscription callbacks is very nice, because this way, we don’t need to keep calling get on the queue to see if something new is waiting.

So now we’re done! That’s all you need to add RabbitMQ to workling. Configure it in your application as descibed below.

Invokers

There’s still potential to improve things though. Workling 0.4.0 introduces the idea of invokers. Invokers grab work off a job broker, using a client (see above). They subclass Workling::Remote::Invokers::Base. Read the RDoc for a description of the methods.

Workling comes with a couple of standard invokers, like the BasicPoller. This invoker simply keeps hitting the broker every n seconds, checking for new work and executing it immediately. The ThreadedInvoker does the same, but spawns a Thread for every Worker class the project defines.

So Amqp: it would be nice if we had an invoker that makes use of the subscription callbacks. Easily done, lets have a look:

require 'eventmachine'
require 'workling/remote/invokers/base'

#
#  Subscribes the workers to the correct queues. 
# 
module Workling
  module Remote
    module Invokers
      class EventmachineSubscriber < Workling::Remote::Invokers::Base

        def initialize(routing, client_class)
          super
        end

        #
        #  Starts EM loop and sets up subscription callbacks for workers. 
        #
        def listen
          EM.run do
            connect do
              routes.each do |queue|
                @client.subscribe(queue) do |args|
                  run(queue, args)
                end
              end
            end
          end
        end

        def stop
          EM.stop if EM.reactor_running?
        end
      end
    end
  end
end

Invokers have to implement two methods, listen and stop. Listen starts the main listener loop, which is responsible for starting work when it becomes available.

In our case, we need to start an EM loop around listen. This is because the Ruby AMQP library needs to run inside of an eventmachine reactor loop.

Next, inside of listen, we need to iterate through all defined routes. There is a route for each worker method you defined in your application. The routes double as queue names. For this, you can use the helper method routes. Now we attach a callback to each queue. We can use the helper method run, which executes the worker method associated with the queue, passing along any supplied arguments.

That’s it! We now have a more effective Invoker, which we can activate in our application like this…

Configuring your application to use AMQP (with RabbitMQ)

Again, follow the instructions here to get RabbitMQ running on your OSX development machine.

Workling::Remote.invoker = Workling::Remote::Invokers::EventmachineSubscriber
Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new

enjoy!

First off, you’ll need to install RabbitMQ. Instructions for OSX here. Once you’ve done that, install the pure ruby amqp library:

    gem sources -a http://gems.github.com
    gem install tmm1-amqp

You’re good to go. Now open up two IRB sessions. Paste the following code into the first session:

    require 'mq'

    EM.run {
      amq = MQ.new
      EM.add_periodic_timer(1) { amq.queue("noises").publish("moo") }
    }

Your publishing code has to run inside of an Event Machine loop. You can start it using EM.run. If you’re running inside of an Evented Container such as Thin or Evented Mongrel, you can skip this. The meat of it is just amq.queue("noises").publish("moo")that.

Now open a second irb session in another terminal tab, and paste this in:

    require 'mq'

    EM.run {
      amq = MQ.new
      amq.queue("noises").subscribe { |noise|
        puts noise
      }
    }

You’ll get the moos off the Queue “noises”. No polling required, which is very nice, you simply register a callback with amqp, and it’ll be called with the message when one becomes available.

Niceness!

RabbitMQ is a reliable, high performance queue Server written in Erlang.

Lets fiddle.

Before you start

Before you can start, you have to have installed Macports. Just in case, here’s how it’s done:

  • Register at the Apple Developers Connection. Go to downloads/developer tools, and download the latest version of the XCode Developer tools. Grab a Flask of coffee.
  • Download and run the latest macports installer http://svn.macports.org/repository/macports/downloads/. Grab another Flask.

Installing Erlang

If you’re on the 10.5.3 update, you have to edit the erlang portsfile to avoid a bus error bug:

    sudo vi /opt/local/var/macports/sources/rsync.macports.org/release/ports/lang/erlang/Portfile

Delete this line from configure.args attribute:

    --enable-hipe \

Now you can use port to install erlang:

    port install erlang

this is going to take a while.

Installing RabbitMQ

Grab and unpack the lastest generic Unix version of RabbitMQ. The current version as of writing is 1.4.0.

    mkdir /tmp/rabbit-mq && cd /tmp/rabbit-mq
    wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.4.0/rabbitmq-server-generic-unix-1.4.0.tar.gz
    tar xvfz rabbitmq-server-generic-unix-1.4.0.tar.gz

Now move this stuff into erlang’s magical mystery directory:

    sudo mv rabbitmq_server-1.4.0 /opt/local/lib/erlang/lib

You’re good to go! Start up like this:

    sudo /opt/local/lib/erlang/lib/rabbitmq_server-1.4.0/sbin/rabbitmq-server

Change permissions if you don’t want to run this as root.

UPDATE: I’ve been having troubes starting the server up, since the tables in the mnesia database backing rabbitmq are locked. I don’t know why this is the case. You can get this running again brute force styleee by deleting the database:

    sudo rm -rf /var/lib/rabbitmq/mnesia