I did some refactoring this week, and the result is a much improved workling. Adding new work brokers is a snap now. I’ll show you how I added RabbitMQ, so that you can go about adding your own brokers. So without further ado…
RabbitMQ, a quick introduction
A lot of Ruby people have been talking about using RabbitMQ as their Queue of choice. Soundcloud.com are using it, as is new bamboo founder Johnathan Conway, who is using it at his video startup http://www.vzaar.com/. He says:
RabbitMQ – Now this is the matrons knockers when it comes to kick ass, ultra fast and scalable messaging. It simply rocks, with performance off the hook. It’s written in Erlang and supports the AMPQ protocol.
Follow the instructions here to get this beauty installed.
Adding Ampq to Workling
There are two new base classes you can extend to add new brokers. I’ll describe how this is done, but the code i show is already a part of workling. Skip to ‘activating amqp in your application’ to see how this is activated.
Clients
Clients help workling to connect to job brokers. To add an AmqpClient, we need to extend from Workling::Client::Base and implement a couple of methods.
require 'workling/clients/base'
require 'mq'
#
# An Ampq client
#
module Workling
module Clients
class AmqpClient < Workling::Clients::Base
# starts the client.
def connect
@amq = MQ.new
end
# stops the client.
def close
@amq.close
end
# request work
def request(queue, value)
@amq.queue(queue).publish(value)
end
# retrieve work
def retrieve(queue)
@amq.queue(queue)
end
# subscribe to a queue
def subscribe(queue)
@amq.queue(queue).subscribe do |value|
yield value
end
end
end
end
end
Were’s using the eventmachine amqp client for this, you can find it up on github. connect and close do exactly what it says on the tin: connecting to rabbitmq and closing the connection.
request and retrieve are responsible for placing work on rabbitmq. The methods are passed the correct queue, and a value that contains the worker method arguments. If you need control over the queue names, look at the RDoc for Workling::Routing::Base. In our case, there’s no special requirement here.
Finally, we implement a subscribe method. Use this if your broker supports callbacks, as is the case with amqp. This method expects to a block, which we pass into the amqp subscribe method here. The block will be called when a message is available on the queue, and the result is yielded into the block.
Having subscription callbacks is very nice, because this way, we don’t need to keep calling get on the queue to see if something new is waiting.
So now we’re done! That’s all you need to add RabbitMQ to workling. Configure it in your application as descibed below.
Invokers
There’s still potential to improve things though. Workling 0.4.0 introduces the idea of invokers. Invokers grab work off a job broker, using a client (see above). They subclass Workling::Remote::Invokers::Base. Read the RDoc for a description of the methods.
Workling comes with a couple of standard invokers, like the BasicPoller. This invoker simply keeps hitting the broker every n seconds, checking for new work and executing it immediately. The ThreadedInvoker does the same, but spawns a Thread for every Worker class the project defines.
So Amqp: it would be nice if we had an invoker that makes use of the subscription callbacks. Easily done, lets have a look:
require 'eventmachine'
require 'workling/remote/invokers/base'
#
# Subscribes the workers to the correct queues.
#
module Workling
module Remote
module Invokers
class EventmachineSubscriber < Workling::Remote::Invokers::Base
def initialize(routing, client_class)
super
end
#
# Starts EM loop and sets up subscription callbacks for workers.
#
def listen
EM.run do
connect do
routes.each do |queue|
@client.subscribe(queue) do |args|
run(queue, args)
end
end
end
end
end
def stop
EM.stop if EM.reactor_running?
end
end
end
end
end
Invokers have to implement two methods, listen and stop. Listen starts the main listener loop, which is responsible for starting work when it becomes available.
In our case, we need to start an EM loop around listen. This is because the Ruby AMQP library needs to run inside of an eventmachine reactor loop.
Next, inside of listen, we need to iterate through all defined routes. There is a route for each worker method you defined in your application. The routes double as queue names. For this, you can use the helper method routes. Now we attach a callback to each queue. We can use the helper method run, which executes the worker method associated with the queue, passing along any supplied arguments.
That’s it! We now have a more effective Invoker, which we can activate in our application like this…
Configuring your application to use AMQP (with RabbitMQ)
Again, follow the instructions here to get RabbitMQ running on your OSX development machine.
Workling::Remote.invoker = Workling::Remote::Invokers::EventmachineSubscriber
Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new
enjoy!
First off, you’ll need to install RabbitMQ. Instructions for OSX here. Once you’ve done that, install the pure ruby amqp library:
gem sources -a http://gems.github.com
gem install tmm1-amqp
You’re good to go. Now open up two IRB sessions. Paste the following code into the first session:
require 'mq'
EM.run {
amq = MQ.new
EM.add_periodic_timer(1) { amq.queue("noises").publish("moo") }
}
Your publishing code has to run inside of an Event Machine loop. You can start it using EM.run. If you’re running inside of an Evented Container such as Thin or Evented Mongrel, you can skip this. The meat of it is just amq.queue("noises").publish("moo")that.
Now open a second irb session in another terminal tab, and paste this in:
require 'mq'
EM.run {
amq = MQ.new
amq.queue("noises").subscribe { |noise|
puts noise
}
}
You’ll get the moos off the Queue “noises”. No polling required, which is very nice, you simply register a callback with amqp, and it’ll be called with the message when one becomes available.
Niceness!
RabbitMQ is a reliable, high performance queue Server written in Erlang.
Lets fiddle.
Before you start
Before you can start, you have to have installed Macports. Just in case, here’s how it’s done:
- Register at the Apple Developers Connection. Go to downloads/developer tools, and download the latest version of the XCode Developer tools. Grab a Flask of coffee.
- Download and run the latest macports installer http://svn.macports.org/repository/macports/downloads/. Grab another Flask.
Installing Erlang
If you’re on the 10.5.3 update, you have to edit the erlang portsfile to avoid a bus error bug:
sudo vi /opt/local/var/macports/sources/rsync.macports.org/release/ports/lang/erlang/Portfile
Delete this line from configure.args attribute:
--enable-hipe \
Now you can use port to install erlang:
port install erlang
this is going to take a while.
Installing RabbitMQ
Grab and unpack the lastest generic Unix version of RabbitMQ. The current version as of writing is 1.4.0.
mkdir /tmp/rabbit-mq && cd /tmp/rabbit-mq
wget http://www.rabbitmq.com/releases/rabbitmq-server/v1.4.0/rabbitmq-server-generic-unix-1.4.0.tar.gz
tar xvfz rabbitmq-server-generic-unix-1.4.0.tar.gz
Now move this stuff into erlang’s magical mystery directory:
sudo mv rabbitmq_server-1.4.0 /opt/local/lib/erlang/lib
You’re good to go! Start up like this:
sudo /opt/local/lib/erlang/lib/rabbitmq_server-1.4.0/sbin/rabbitmq-server
Change permissions if you don’t want to run this as root.
UPDATE: I’ve been having troubes starting the server up, since the tables in the mnesia database backing rabbitmq are locked. I don’t know why this is the case. You can get this running again brute force styleee by deleting the database:
sudo rm -rf /var/lib/rabbitmq/mnesia



