Async Observer

→ ‘async_observer’

What

Async Observer is a Rails plugin that provides deep integration with Beanstalk.

beanstalkd is a fast, distributed, in-memory work-queue service. Its interface is generic, but is intended for use in reducing the latency of page views in high-volume web applications by running most time-consuming tasks asynchronously.

Installing

First you need to download and install beanstalkd, which is the queue server.

Then, this plugin uses the beanstalk-client gem, so you need to install that:

$ sudo gem install beanstalk-client

Then stick the async_observer plugin itself into your Rails project:

$ ./script/plugin install git://github.com/kristjan/async_observer.git

How to use it

If you don't define a queue, async_observer will just run things synchronously in-process. But assuming you want asynchrony, first you must put some lines into config/environments/*.rb:

config.after_initialize do
  AsyncObserver::Queue.queue = Beanstalk::Pool.new(%w(localhost:11300))

  # This value should change every time you make a release of your app.
  AsyncObserver::Queue.app_version = RELEASE_STAMP
end

Make sure you start some workers:

$ ./vendor/plugins/async_observer/bin/worker

Now running things asynchronously is a snap!

Hooks

You can define asynchronous hooks:

Existing synchronous hook

class Person < ActiveRecord::Base
  after_create do |person|
    SiteStats.increment_members()
  end
end

Becomes an asynchronous hook:

class Person < ActiveRecord::Base
  async_after_create do |person|
    SiteStats.increment_members()
  end
end

Method Calls

You can also call nearly any method asynchronously:

Existing synchronous call

class Person < ActiveRecord::Base
  def befriend(other_person)
    Friendship.create(self, other_person)
  end
end

Becomes an asynchronous call:

class Person < ActiveRecord::Base
  def befriend(other_person)
    Friendship.async_send(:create, self, other_person)
  end
end

One caveat: the arguments to and receiver of an asynchronous call must be instances of one of the following list of types (see async_observer/lib/async_observer/queue.rb for details): Symbol, Module, NilClass, FalseClass, TrueClass, Numeric, String, Array, Hash, or ActiveRecord::Base.

Mapping over all records in a model

There's a powerful method called ActiveRecord::Base#async_each (and its sibling, async_each_opts), that lets you easily run a method on every instance of a model. For example:

Person.async_each(:something_cool)

If you have 57 people, this translates to 57 separate jobs:

Person.find(1).something_cool
Person.find(2).something_cool
...
Person.find(57).something_cool

If you have a very large table, async_each will not cause the submitting process to block for a long time submitting many jobs. Instead, it will submit a few meta-jobs (by default 1000), each of which will submit more meta-jobs, until finally they submit actual jobs to run.

This easily scales up to tens of millions of records as long as you have sufficient memory on your beanstalkd server and enough workers to run the jobs in a timely fashion.

Advanced Stuff

Range#async_each

If you want to do the same thing for many integers, you can use Range#async_each to generate one job for every integer in the range. This is a lower-level method used by ActiveRecord::Base#async_each. The example above runs code similar to this:

(Person.minimum(:id)..Person.maximum(:id)).async_each(Person, :something_cool)

Custom jobs

If you want to submit custom jobs that async_observer doesn't know how to process, you must teach async_observer how to handle them:

config.after_initialize do
  AsyncObserver::Worker.handle = proc do |job|
    return MyCustomJobHandler.run(job) if job.ybody[:type] == :custom
    raise 'unknown job type'
  end
end

Normally the beanstalk client library handles just its own TCP connections. If you have other connections to juggle, you can define a select method for the beanstalk library to use:

config.after_initialize do
  Beanstalk.select = MyCustomJobHandler.method(:select)
end

Finally, there's a callback that gets run when the worker wants to shut down:

config.after_initialize do
  AsyncObserver::Worker.finish = MyCustomJobHandler.method(:finish)
end

Forum

There’s a google group called beanstalk-talk for all discussion regarding beanstalkd and its various client libraries.

How to submit patches

Read the 8 steps for fixing other people’s code and for section 8b: Submit patch to Google Groups, use the Google Group above.

The repository can be found at http://github.com/kristjan/async_observer.

License

You can share this code under the terms of the GPL version 3.

Contact

Comments are welcome. Send any questions or comments to the beanstalk-talk google group.

Kristján Pétursson, 13th November 2008