parallel-processingasync-awaitjulia

julia-lang Cache data in a parallel thread using @async


Suppose we have a slow function to produce data and another slow function to process data as follow:

# some slow function
function prime(i)
  sleep(2)
  println("processed $i")
  i
end

function slow_process(x)
  sleep(2)
  println("slow processed $x")
end

function each(rng)
  function _iter()
    for i ∈ rng
      @time d = prime(i)
      produce(d)
    end
  end
  return Task(_iter)
end

@time for x ∈ each(1000:1002)
  slow_process(x)
end

Output:

% julia test-task.jl
processed 1000
  2.063938 seconds (37.84 k allocations: 1.605 MB)
slow processed 1000
processed 1001
  2.003115 seconds (17 allocations: 800 bytes)
slow processed 1001
processed 1002
  2.001798 seconds (17 allocations: 800 bytes)
slow processed 1002
 12.166475 seconds (88.08 k allocations: 3.640 MB)

Is there some way to get and cache data in a parallel thread using @async and feed to the slow_process function?

Edit: I updated the example to clarify the problem. Ideally, the example should take 2+6 seconds instead of 12 seconds.

Edit 2: This is my effort of using @sync and @async but I got the error ERROR (unhandled task failure): no process with id 2 exists

macro swap(x,y)
  quote
    local tmp = $(esc(x))
    $(esc(x)) = $(esc(y))
    $(esc(y)) = tmp
  end
end

# some slow function
function prime(i)
  sleep(2)
  println("processed $i")
  i
end

function slow_process(x)
  sleep(2)
  println("slow processed $x")
end

function each(rng)
  @assert length(rng) > 1
  rng = collect(rng)
  a = b = nothing
  function _iter()
    for i ∈ 1:length(rng)
      if a == nothing
        a = @async remotecall_fetch(prime, 2, rng[i])
        b = @async remotecall_fetch(prime, 2, rng[i+1])
      else
        if i < length(rng)
          a = @async remotecall_fetch(prime, 2, rng[i+1])
        end
        @swap(a,b)
      end
      @sync d = a
      produce(d)
    end
  end
  return Task(_iter)
end

@time for x ∈ each(1000:1002)
  slow_process(x)
end

Solution

  • OK, I have the working solution below:

    macro swap(x,y)
      quote
        local tmp = $(esc(x))
        $(esc(x)) = $(esc(y))
        $(esc(y)) = tmp
      end
    end
    
    # some slow function
    @everywhere function prime(i)
      sleep(2)
      println("prime $i")
      i
    end
    
    function slow_process(x)
      sleep(2)
      println("slow_process $x")
    end
    
    function each(rng)
      @assert length(rng) > 1
      rng = collect(rng)
      a = b = nothing
      function _iter()
        for i ∈ 1:length(rng)
          if a == nothing
            a = remotecall(prime, 2, rng[i])
            b = remotecall(prime, 2, rng[i+1])
          else
            if i < length(rng)
              a = remotecall(prime, 2, rng[i+1])
            end
            @swap(a,b)
          end
          d = fetch(a)
          produce(d)
        end
      end
      return Task(_iter)
    end
    @time for x ∈ each(1000:1002)
      slow_process(x)
    end
    

    And

    % julia -p 2 test-task.jl
    8.354102 seconds (148.00 k allocations: 6.204 MB)