Nov

Workling 0.4.1 is out, now with RabbitMQ support and simple extensions
I did some refactoring this week, and the result is a much improved workling. Adding new work brokers is a snap now. I’ll show you how I added RabbitMQ, so that you can go about adding your own brokers. So without further ado…
RabbitMQ, a quick introduction
A lot of Ruby people have been talking about using RabbitMQ as their Queue of choice. Soundcloud.com are using it, as is new bamboo founder Johnathan Conway, who is using it at his video startup http://www.vzaar.com/. He says:
RabbitMQ – Now this is the matrons knockers when it comes to kick ass, ultra fast and scalable messaging. It simply rocks, with performance off the hook. It’s written in Erlang and supports the AMPQ protocol.
Follow the instructions here to get this beauty installed.
Adding Ampq to Workling
There are two new base classes you can extend to add new brokers. I’ll describe how this is done, but the code i show is already a part of workling. Skip to ‘activating amqp in your application’ to see how this is activated.
Clients
Clients help workling to connect to job brokers. To add an AmqpClient, we need to extend from Workling::Client::Base and implement a couple of methods.
require 'workling/clients/base'
require 'mq'
#
# An Ampq client
#
module Workling
module Clients
class AmqpClient < Workling::Clients::Base
# starts the client.
def connect
@amq = MQ.new
end
# stops the client.
def close
@amq.close
end
# request work
def request(queue, value)
@amq.queue(queue).publish(value)
end
# retrieve work
def retrieve(queue)
@amq.queue(queue)
end
# subscribe to a queue
def subscribe(queue)
@amq.queue(queue).subscribe do |value|
yield value
end
end
end
end
end
Were’s using the eventmachine amqp client for this, you can find it up on github. connect and close do exactly what it says on the tin: connecting to rabbitmq and closing the connection.
request and retrieve are responsible for placing work on rabbitmq. The methods are passed the correct queue, and a value that contains the worker method arguments. If you need control over the queue names, look at the RDoc for Workling::Routing::Base. In our case, there’s no special requirement here.
Finally, we implement a subscribe method. Use this if your broker supports callbacks, as is the case with amqp. This method expects to a block, which we pass into the amqp subscribe method here. The block will be called when a message is available on the queue, and the result is yielded into the block.
Having subscription callbacks is very nice, because this way, we don’t need to keep calling get on the queue to see if something new is waiting.
So now we’re done! That’s all you need to add RabbitMQ to workling. Configure it in your application as descibed below.
Invokers
There’s still potential to improve things though. Workling 0.4.0 introduces the idea of invokers. Invokers grab work off a job broker, using a client (see above). They subclass Workling::Remote::Invokers::Base. Read the RDoc for a description of the methods.
Workling comes with a couple of standard invokers, like the BasicPoller. This invoker simply keeps hitting the broker every n seconds, checking for new work and executing it immediately. The ThreadedInvoker does the same, but spawns a Thread for every Worker class the project defines.
So Amqp: it would be nice if we had an invoker that makes use of the subscription callbacks. Easily done, lets have a look:
require 'eventmachine'
require 'workling/remote/invokers/base'
#
# Subscribes the workers to the correct queues.
#
module Workling
module Remote
module Invokers
class EventmachineSubscriber < Workling::Remote::Invokers::Base
def initialize(routing, client_class)
super
end
#
# Starts EM loop and sets up subscription callbacks for workers.
#
def listen
EM.run do
connect do
routes.each do |queue|
@client.subscribe(queue) do |args|
run(queue, args)
end
end
end
end
end
def stop
EM.stop if EM.reactor_running?
end
end
end
end
end
Invokers have to implement two methods, listen and stop. Listen starts the main listener loop, which is responsible for starting work when it becomes available.
In our case, we need to start an EM loop around listen. This is because the Ruby AMQP library needs to run inside of an eventmachine reactor loop.
Next, inside of listen, we need to iterate through all defined routes. There is a route for each worker method you defined in your application. The routes double as queue names. For this, you can use the helper method routes. Now we attach a callback to each queue. We can use the helper method run, which executes the worker method associated with the queue, passing along any supplied arguments.
That’s it! We now have a more effective Invoker, which we can activate in our application like this…
Configuring your application to use AMQP (with RabbitMQ)
Again, follow the instructions here to get RabbitMQ running on your OSX development machine.
Workling::Remote.invoker = Workling::Remote::Invokers::EventmachineSubscriber
Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new
enjoy!



Comments
There are 16 Comments for this post. Write comment →
Hi,
Is there any way that this works with JRuby?
Regards,
Carl
@Carl I don’t see why not. Out of the box, you’re going to have problems installing the ruby amqp client on github (ttm1-amqp), because the eventmachine dependency requires you to compile native code.
but it should be easy to add a pure java amqp client implementation to workling. check it out here: http://www.rabbitmq.com/java-client.html
you’d need to write a workling client using that library. it’s very easy, you only have to implement a handful of methods, which you’ll find described in Workling::Clients::Base. Check out the Ampq Client for an example implementation.
once you have the client going, set it up in your app like this:
should all work fine.
i’d love to know how this works out for you. if you have any problems, don’t hesitate to contact me.
/r
@Carl ymmv: http://groups.google.com/group/ruby-amqp/browse_thread/thread/c83a50248b69fe4c
Hi,
Great refactoring, thx!
On the RabbitMQ side, with the tmm1-amqp AMQP driver for Ruby/EventMachine it looks like evented mongrel is required on the web server side right? Because MQ.new refuses to initialize outside a EM reactor loop.
I am running my workers on a separate machine and the worker side is fine since it has its EM loop in EventmachineSubscriber. I am trying to figure how to have the web server side working for posting job requests.
Thanks,
Colin.
@colin yeah you need an evented app container to get this running. i msgd tmm1 to ask if it was possible to run this outside of an EM loop, but looks like a no. so go with ebb or thin or evented mongrel. if you want to use this without switching servers, just write a new AMQP client that works without EM.
check out the other libraries you could use to connect to rabbit, i’m sure there’s one out there. then follow the article about how to add a new client, it’s pretty easy.
I decided not to go the evented mongrel way and found Apache Qpid http://cwiki.apache.org/qpid/ and they have a Ruby client compatible with AMQP 0.8 so it should work with RabbitMQ. I had lots of problems with the current trunk and finally reverted to the M3 release which so far seems to work. I am currently gemifying this lib and will probably make it available on github. I’ll keep you posted here on the progress.
Thanks,
Colin.
@colin that’s excellent :) this is exactly the kind of collaboration i was hoping for when i refacored recently. looking forward to your gem!
It’s not clear to me what the purpose of retrieve(queue) is in your first code snippet. It doesn’t seem to match up with the description given underneath:
“request and retrieve are responsible for placing work on rabbitmq. The methods are passed the correct queue, and a value that contains the worker method arguments.”
That seems to describe the ‘request’ method, but not the ‘retrieve’ method.
Minor nitpick: “ampq” is used a few times in this post, rather than “amqp”
It looks like there are at least two good options out there now for scheduling background jobs over an amqp broker in a rails application. This one, using workling, and Ezra’s recent project, Nanite.
From the reading I’ve done so far, it seems like the main difference between these options are:
o Workling is designed to allow a number of different methods of queuing or dispatching jobs, while Nanite is meant specifically for AMQP brokers (such as RabbitMQ).
o Nanite incorporates heartbeat monitoring from worker processes. The information given in each heartbeat message can include numbers useful in determining how jobs should be scheduled (system load average by default). That information is made available to job publishers, which may use it to help decide which process gets the next job. Workling doesn’t appear to have a similar feature.
o Nanite allows the process dispatching a job to choose how the dispatching decision will be made (eg round robin, least-busy, or any arbitrary comparison function). With workling, jobs are published along with a ‘route’ which in AMQP corresponds to a queue. So, if a dispatcher using Workling wants to specify something about the way a job will be assigned, that would be the lever it should use.
o Nanite expects a job dispatcher to supply a proc which the worker process will run. Workling could be used that way, but the most natural fit is to write the logic for each Workling process in its own class file, and to keep that file in a ‘workers’ directory next to ‘models’, ‘controllers’ etc.
Does this seem about right? Please let me know if I’ve left something out, or left a misunderstanding in…
So, I’m getting an error when using ampq…
Workling::WorklingError: couldn’t start amq client. if you’re running this in a server environment, then make sure the server is evented (ie use thin or evented mongrel, not normal mongrel.)
Could you go in to some details as to how to go about implementing this environment? Am I going to need to have our sysadmins/ops-guys switch over to evented mongrel?
Hi,
I got Workling talking to RabbitMQ but the message coming out appears to be in String format (and not a hash) -
in –
{:uid=>”cow_workers:moo:adbe0c613edddd1c5cf5bac2b5e24fa0”, :id=>123}
out –
“uidcow_workers:moo:4668b451883b843759ce7e1fde566d5fid123”
So Workling appears to be unable to extract the uid and the other values from the message.
Any ideas what I may be doing wrong?
This was from a minimal rails app setup according to the current instructions, single cow controller, single cow worker.
The following config in development.rb –
Workling::Remote.invoker = Workling::Remote::Invokers::EventmachineSubscriber
Workling::Remote.dispatcher = Workling::Remote::Runners::ClientRunner.new
Workling::Remote.dispatcher.client = Workling::Clients::AmqpClient.new
Rails is running under “thin start”
Thanks,
Simon
I’m having the same issue as William. I am semi-expecting it to need at least “guest”, “guest” credentials to pass to RabbitMQ or something but where should that be configured in Workling?
I’m seeing the same issue as Simon. Hash parameters come through as strings. It works when I use starling so it definitely seems RabbitMQ is the problem.
I would recommend against going this route for Workling because synchronous error handling with the (albeit beautiful) tmm1-amqp library is very difficult. For example, in the client you’ve posted here, if the broker goes down, your Workling request method isn’t going to get an exception because the exception will be off in the EventMachine event loop. Rails won’t know the publish failed without some serious hacking.
Using synchronous calls to publish for a Workling client makes more sense when you want to detect errors—which I’d think is most of the time.
Aman Gupta (tmm1) recommended to me to look into the newer “Bunny” or “Carrot” amqp clients, which are both synchronous.
Bunny is working well for me so far, and a basic workling client with Bunny is pretty simple to whip up.
By the way, I also spent some time with Colin’s Qpid client (mentioned above), but the Ruby Qpid library itself (no fault of Colin’s) is pretty terrible to work with, and should be avoided unless you are doing Qpid stuff. I was unable to get messages to be persistent with Qpid client, so dropped it.
Hope this helps. Thanks for the code and all these articles. Great blog!
I’ve got workling working with rabbitmq and passenger using qusion, but I’m completely stumped on how to get it to work from script/console, and more importantly, testunit! Almost all of my tests are now broken. Could someone take a look at this and point out what I’m doing wrong http://gist.github.com/176465
Write a comment
Required in bold.