Async Observer
→ ‘async_observer’
What
Async Observer is a Rails plugin that provides deep integration with Beanstalk.
beanstalkd is a fast, distributed, in-memory work-queue
service. Its interface is generic, but is intended for use in reducing the
latency of page views in high-volume web applications by running most
time-consuming tasks asynchronously.
Installing
First you need to download and install beanstalkd, which is the queue server.
Then, this plugin uses the beanstalk-client gem, so you need to install that:
$ sudo gem install beanstalk-client
Then stick the async_observer plugin itself into your Rails
project:
$ ./script/plugin install git://github.com/kristjan/async_observer.git
How to use it
If you don't define a queue, async_observer will just run things
synchronously in-process. But assuming you want asynchrony, first you must put
some lines into config/environments/*.rb:
config.after_initialize do AsyncObserver::Queue.queue = Beanstalk::Pool.new(%w(localhost:11300)) # This value should change every time you make a release of your app. AsyncObserver::Queue.app_version = RELEASE_STAMP end
Make sure you start some workers:
$ ./vendor/plugins/async_observer/bin/worker
Now running things asynchronously is a snap!
Hooks
You can define asynchronous hooks:
Existing synchronous hook
class Person < ActiveRecord::Base
after_create do |person|
SiteStats.increment_members()
end
end
Becomes an asynchronous hook:
class Person < ActiveRecord::Base
async_after_create do |person|
SiteStats.increment_members()
end
end
Method Calls
You can also call nearly any method asynchronously:
Existing synchronous call
class Person < ActiveRecord::Base
def befriend(other_person)
Friendship.create(self, other_person)
end
end
Becomes an asynchronous call:
class Person < ActiveRecord::Base
def befriend(other_person)
Friendship.async_send(:create, self, other_person)
end
end
One caveat: the arguments to and receiver of an asynchronous call must be
instances of one of the following list of types (see async_observer/lib/async_observer/queue.rb
for details): Symbol, Module, NilClass, FalseClass, TrueClass, Numeric, String,
Array, Hash, or ActiveRecord::Base.
Mapping over all records in a model
There's a powerful method called ActiveRecord::Base#async_each
(and its sibling, async_each_opts), that lets you easily run a
method on every instance of a model. For example:
Person.async_each(:something_cool)
If you have 57 people, this translates to 57 separate jobs:
Person.find(1).something_cool Person.find(2).something_cool ... Person.find(57).something_cool
If you have a very large table, async_each will not cause the
submitting process to block for a long time submitting many jobs. Instead, it
will submit a few meta-jobs (by default 1000), each of which will submit more
meta-jobs, until finally they submit actual jobs to run.
This easily scales up to tens of millions of records as long as you have sufficient memory on your beanstalkd server and enough workers to run the jobs in a timely fashion.
Advanced Stuff
Range#async_each
If you want to do the same thing for many integers, you can use
Range#async_each to generate one job for every integer in the
range. This is a lower-level method used by
ActiveRecord::Base#async_each. The example above runs code similar
to this:
(Person.minimum(:id)..Person.maximum(:id)).async_each(Person, :something_cool)
Custom jobs
If you want to submit custom jobs that async_observer doesn't know how to process, you must teach async_observer how to handle them:
config.after_initialize do
AsyncObserver::Worker.handle = proc do |job|
return MyCustomJobHandler.run(job) if job.ybody[:type] == :custom
raise 'unknown job type'
end
end
Normally the beanstalk client library handles just its own TCP connections. If you have other connections to juggle, you can define a select method for the beanstalk library to use:
config.after_initialize do Beanstalk.select = MyCustomJobHandler.method(:select) end
Finally, there's a callback that gets run when the worker wants to shut down:
config.after_initialize do AsyncObserver::Worker.finish = MyCustomJobHandler.method(:finish) end
Forum
There’s a google group called beanstalk-talk for
all discussion regarding beanstalkd and its various client
libraries.
How to submit patches
Read the 8 steps for fixing other people’s code and for section 8b: Submit patch to Google Groups, use the Google Group above.
The repository can be found at http://github.com/kristjan/async_observer.
License
You can share this code under the terms of the GPL version 3.
Contact
Comments are welcome. Send any questions or comments to the beanstalk-talk google group.
Kristján Pétursson, 13th November 2008