Suppose we have a slow function to produce data and another slow function to process data as follow:
# some slow function
function prime(i)
sleep(2)
println("processed $i")
i
end
function slow_process(x)
sleep(2)
println("slow processed $x")
end
function each(rng)
function _iter()
for i ∈ rng
@time d = prime(i)
produce(d)
end
end
return Task(_iter)
end
@time for x ∈ each(1000:1002)
slow_process(x)
end
Output:
% julia test-task.jl
processed 1000
2.063938 seconds (37.84 k allocations: 1.605 MB)
slow processed 1000
processed 1001
2.003115 seconds (17 allocations: 800 bytes)
slow processed 1001
processed 1002
2.001798 seconds (17 allocations: 800 bytes)
slow processed 1002
12.166475 seconds (88.08 k allocations: 3.640 MB)
Is there some way to get and cache data in a parallel thread using @async and feed to the slow_process
function?
Edit: I updated the example to clarify the problem. Ideally, the example should take 2+6 seconds instead of 12 seconds.
Edit 2: This is my effort of using @sync and @async but I got the error ERROR (unhandled task failure): no process with id 2 exists
macro swap(x,y)
quote
local tmp = $(esc(x))
$(esc(x)) = $(esc(y))
$(esc(y)) = tmp
end
end
# some slow function
function prime(i)
sleep(2)
println("processed $i")
i
end
function slow_process(x)
sleep(2)
println("slow processed $x")
end
function each(rng)
@assert length(rng) > 1
rng = collect(rng)
a = b = nothing
function _iter()
for i ∈ 1:length(rng)
if a == nothing
a = @async remotecall_fetch(prime, 2, rng[i])
b = @async remotecall_fetch(prime, 2, rng[i+1])
else
if i < length(rng)
a = @async remotecall_fetch(prime, 2, rng[i+1])
end
@swap(a,b)
end
@sync d = a
produce(d)
end
end
return Task(_iter)
end
@time for x ∈ each(1000:1002)
slow_process(x)
end
OK, I have the working solution below:
macro swap(x,y)
quote
local tmp = $(esc(x))
$(esc(x)) = $(esc(y))
$(esc(y)) = tmp
end
end
# some slow function
@everywhere function prime(i)
sleep(2)
println("prime $i")
i
end
function slow_process(x)
sleep(2)
println("slow_process $x")
end
function each(rng)
@assert length(rng) > 1
rng = collect(rng)
a = b = nothing
function _iter()
for i ∈ 1:length(rng)
if a == nothing
a = remotecall(prime, 2, rng[i])
b = remotecall(prime, 2, rng[i+1])
else
if i < length(rng)
a = remotecall(prime, 2, rng[i+1])
end
@swap(a,b)
end
d = fetch(a)
produce(d)
end
end
return Task(_iter)
end
@time for x ∈ each(1000:1002)
slow_process(x)
end
And
% julia -p 2 test-task.jl
8.354102 seconds (148.00 k allocations: 6.204 MB)