Coming from a Java background I’m confused by ruby’s poor support for concurrency
The problem…
I’m rewriting weathersupermarket, a screen scraper for weather forecasts using Rails. This is the basic logic of server forecast service.
user requests forecast for location 123 get location 123 from database if last forecast retrieved more than N seconds ago retrieve new forecast save forecast update last retrieved time then return results from database end
This works fine, but when it comes to handling requests from different users concurrently it sucks. Why. By default rails runs in a single threaded mode, serving one request than the next.
More threads can help?
I realise that throwing more threads at a problem doesn’t help unless you’ve got cores to service those threads. However in my case a lot of the blocking work is IO bound, i.e. requesting data from remote sites. For example, two users request data for London and Manchester, in theory the downloading of the forecasts can be done in parallel…
Enabling multithreading
Usefully, Rails 4.0 enables config.threadsafe! by default, so no work required. However the default development server WEBrick does not support it. However the lightweight server thin does. By default it supports 1024 concurrent connections – my app will break long before that!
thin --threaded --max-conns 30 start
# do some requests in parallel wget http://localhost:3000/app/forecast/100 & wget http://localhost:3000/app/forecast/101 & wget http://localhost:3000/app/forecast/102 & wget http://localhost:3000/app/forecast/103 &
Next problem
Enabling multi-threading creates a unique problem for my app. Some requests can take 5-10 seconds to complete. If there were to be another request for the same data during that period then the work would be done again, in parallel, this is not ideal.
My solution is to introduce a proxy around my service class with the following logic.
if this request being processed already wait for other thread to complete return results from other thread else add request to hash map with current thread do the work save the result end
This is the implementation with the necessary Mutex to protect the internal state.
class RequestProxy def initialize(target) @queue = Hash.new @queue_lock = Mutex.new @target = target end def get_current_request(key) @queue[key] end def wait_for_request(request) if Thread.current.object_id != request[:thread] request[:thread].join end self.remove_request(request) end def remove_request(request) @queue.delete request[:key] end def save_request(key) new_request = {key: key,thread: Thread.current } @queue[key] = new_request new_request end def request(key) current_request = nil new_request = nil @queue_lock.synchronize { current_request = self.get_current_request(key) if current_request == nil new_request = self.save_request(key) end } if current_request != nil self.wait_for_request(current_request) result = current_request[:result] else begin result = @target.request(key) new_request[:result] = result rescue Exception => e new_request[:result] = nil raise e ensure @queue_lock.synchronize { self.remove_request(new_request) } end end result end end
Next problem
So everything is great now. Well, not really… I’m using ActiveRecord
…and then problem is actually slightly more complicated than I explained above… As an optimization to simplify and improve client speed the service interface supports requests for up to 6 locations in parallel, the worst case. This happens when a search for say ‘London’ returns results from all 6 forecast providers.
So with ActiveRecord a database connection is required for every thread that accesses the database. Requests from the threads coming from the controllers seem fine as the same ones are used over and over again, however when I started accessing the database from newly spawned threads I got Timeout errors. It seems that connections are held until released or the reaper gets them. You can help ActiveRecord out by telling it when you’re done with access on the current thread using release_connection
ActiveRecord::Base.connection_pool.release_connection