home / blog

Rails, active record, concurrency and scaling

Coming from a Java background I’m confused by ruby’s poor support for concurrency

The problem…

I’m rewriting weathersupermarket, a screen scraper for weather forecasts using Rails. This is the basic logic of server forecast service.

user requests forecast for location 123
get location 123 from database
if last forecast retrieved more than N seconds ago
  retrieve new forecast
  save forecast
  update last retrieved time
then
  return results from database
end

This works fine, but when it comes to handling requests from different users concurrently it sucks. Why. By default rails runs in a single threaded mode, serving one request than the next.

More threads can help?

I realise that throwing more threads at a problem doesn’t help unless you’ve got cores to service those threads. However in my case a lot of the blocking work is IO bound, i.e. requesting data from remote sites. For example, two users request data for London and Manchester, in theory the downloading of the forecasts can be done in parallel…

Enabling multithreading

Usefully, Rails 4.0 enables config.threadsafe! by default, so no work required. However the default development server WEBrick does not support it. However the lightweight server thin does. By default it supports 1024 concurrent connections – my app will break long before that!

thin --threaded --max-conns 30 start
# do some requests in parallel
wget http://localhost:3000/app/forecast/100 &
wget http://localhost:3000/app/forecast/101 &
wget http://localhost:3000/app/forecast/102 &
wget http://localhost:3000/app/forecast/103 &

Next problem

Enabling multi-threading creates a unique problem for my app. Some requests can take 5-10 seconds to complete. If there were to be another request for the same data during that period then the work would be done again, in parallel, this is not ideal.

My solution is to introduce a proxy around my service class with the following logic.

if this request being processed already
  wait for other thread to complete
  return results from other thread
else
  add request to hash map with current thread
  do the work
  save the result
end

This is the implementation with the necessary Mutex to protect the internal state.

class RequestProxy
  def initialize(target)
    @queue = Hash.new
    @queue_lock = Mutex.new
    @target = target
  end

  def get_current_request(key)
    @queue[key]
  end

  def wait_for_request(request)
    if Thread.current.object_id != request[:thread]
      request[:thread].join
    end
    self.remove_request(request)
  end

  def remove_request(request)
    @queue.delete request[:key]
  end

  def save_request(key)
    new_request = {key: key,thread: Thread.current }
    @queue[key] = new_request
    new_request
  end

  def request(key)
    current_request = nil
    new_request = nil
    @queue_lock.synchronize {
      current_request = self.get_current_request(key)
      if current_request == nil
        new_request = self.save_request(key)
      end
    }
    if current_request != nil
      self.wait_for_request(current_request)
      result = current_request[:result]
    else
      begin
        result = @target.request(key)
        new_request[:result] = result
      rescue Exception => e
        new_request[:result] = nil
        raise e
      ensure
        @queue_lock.synchronize {
          self.remove_request(new_request)
        }
      end
    end

    result
  end

end

Next problem

So everything is great now. Well, not really… I’m using ActiveRecord

…and then problem is actually slightly more complicated than I explained above… As an optimization to simplify and improve client speed the service interface supports requests for up to 6 locations in parallel, the worst case. This happens when a search for say ‘London’ returns results from all 6 forecast providers.

So with ActiveRecord a database connection is required for every thread that accesses the database. Requests from the threads coming from the controllers seem fine as the same ones are used over and over again, however when I started accessing the database from newly spawned threads I got Timeout errors. It seems that connections are held until released or the reaper gets them. You can help ActiveRecord out by telling it when you’re done with access on the current thread using release_connection

ActiveRecord::Base.connection_pool.release_connection
This entry was posted in geek and tagged , . Bookmark the permalink.

Leave a Reply

Your email address will not be published.