I've constructed a json block like this:
data = {
"url" : "http://www.example.com/post",
"postdata" : [
{ "key1" : "value1" },
{ "key2" : "value2" },
{ "key3" : "value3" },
{ "key4" : "value4" },
{ "key5" : "value5" },
{ "key6" : "value6" }
]
}
I am attempting to post each block in 'postdata' in parallel (say a pool of 2) to the same 'url'. I've attempted to use Faraday, Typhoeus, and Parallel, but in doing so I have not encountered a functioning use case.
Ideally I'd like to use Typhoeus::Hydra or Faraday, passing the 'data' object to it, and pooling on data['postdata'] using data['url'] as the endpoint, but I've come up empty handed. Mostly I run into the case where I need to have a data array like:
[
{ "url" : "http://...",
"data" : { "key1" : "value1" },
{ "url" : "http://...",
"data" : { "key2" : "value2" },
{ "url" : "http://...",
"data" : { "key3" : "value3" }
]
But I'd obviously like to avoid such duplication.
End goal: Post potentially 100s of json bodies to the same url in parallel, pooling by a limited (say 10) at a time. Can anyone help guide me down the right path?
Disclaimer: This is an internal endpoint, so nothing nefarious.
Solution based off tadman:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
t = Thread.new do
n = q.size
n.times do |i|
value = q.pop
res = http.post(data['url'], value.as_json)
puts res
puts "consumed #{value.as_json}"
end
end
t.join
The usual strategy with this is where you remap your source data into something more digestible, then split that out across either a number of worker threads or through some kind of async event-driven method such as you'd use in EventMachine.
Threads are easier to understand, and then you can use the Queue structure to batch up work for each thread to consume.
Break out your jobs into a series of objects, jam those objects into the queue, and then spin up N threads to work through those queues.
Since you're using threads, it's okay to use shared data. For example, you can have a thin wrapper object that, given a structure in that format, captures the offset you're sending:
class BatchWrapper
def initialize(data, offset)
@data = data
@offset = offset
end
def as_json
@data['postdata'][@offset]
end
end
Then just stuff in one of these objects for each request you're intending to make:
q = Queue.new
data['postdata'].each_index do |i|
q << BatchWrapper.new(data, i)
end
Then you can spin through the queue in a worker:
Thread.new do
while (wrapper = q.pop)
# Send it...
make_request(wrapper.to_json)
end
end
The wrapper approach allows you to compose exactly what data is shared from that main object and what's request-specific, plus do any necessary restructuring on the data itself. The as_json
method returns what the to_json
method ends up encoding, so you have complete control there.