Let’s say you need to fetch a lot of data from an upstream API, then you want to manipulate that data, maybe even enrich it, and then send it downstream to a database or another API. You aim for high concurrency in fetching data (since it’s allowed), but you need to be cautious when sending the enriched data to the downstream API due to system limits. You can’t send events downstream at the same rate as you fetch them upstream. Additionally, you want to keep the overall system quick, processing, enriching, and emitting each event as soon as possible.
In a traditional Rails/Sidekiq application, you can somewhat manage this by configuring worker sizes on Sidekiq and scaling out horizontally. However, I was curious about handling some operations inline since most are I/O bound, and I wondered if we could use resources optimally in addition to scaling out. I explored various approaches and ended up with a worker pool model using a Semaphore, here they are:
Approach 1: Sequential Processing (The Simple Start)
resources.each do |resource|
data = fetch_data_from_api(resource)
event = enrich_data(data)
update_database(event)
end
This method processes one resource at a time, makes an API call, enriches the data and then updates the database one at a time. It works! However, its slow. So lets keep exploring.
Approach 2: Full Concurrency with Parallel (quick fix)
Parallel.each(resources, in_threads: 100) do |resource|
data = fetch_data_from_api(resource)
event = enrich_data(data)
update_database(event)
end
Having known about Parallel
, I was tempted to try it. This method allows both API calls and database updates to run concurrently, aiming to maximize throughput. However, it eventually led to overwhelming the ActiveRecord
connection pool, with simultaneous 100 concurrent workloads trying to fetch and update the database at times. While this approach works most of the time and is faster, I am concerned about the risk of exhausting all the connections in the local ActiveRecord
connection pool. I was not interested in significantly increasing the pool size either. I want to be able to scale this service horizontally without exhausting the database. That said, we’ve made an interesting discovery—I now know the “upper limit” for this task. It’s determined by the number of connections I can check out from the ActiveRecord pool. As long as I leave enough connections for the rest of the application and still achieve a healthy level of concurrency, I am satisfied with this setup.
Approach 3: Metered Concurrency with Semaphores (the balance)
fetch_pool = Concurrent::ThreadPoolExecutor.new(min_threads: 50, max_threads: 100, max_queue: 10000)
db_semaphore = Concurrent::Semaphore.new(50)
resources.each do |resource|
fetch_pool.post do
data = fetch_data_from_api(resource)
event = enrich_data(data)
db_pool.post do
db_semaphore.acquire
begin
update_database(event)
ensure
db_semaphore.release
end
end
end
end
This setup introduces two levels of concurrency management (via the concurrent-ruby
gem): one for fetching data and another for database updates, both controlled by a semaphore. The fetch pool operates with high concurrency, optimizing the time spent on API calls. Meanwhile, the semaphore limits database operations to 50 concurrent updates to protect the application from overwhelming the local ActiveRecord
connection pool. This straightforward strategy allows me to balance load and efficiency, reducing total processing time without risking system stability.
While I am sure there are many other methods to address a problem of this nature, these approaches have helped me step back and identify some more simpler techniques to maximize system resources while building a simple, faster, and metered concurrent application in Ruby.
If you are not familiar with the gem, I recommend you check it out: https://github.com/ruby-concurrency/concurrent-ruby