I'm trying to parallelize some code from here using ipyparallel
. In short, I can make functions that work fine outside of apply_sync()
, but I can't seem to get them to work inside it (I swear I had this working earlier, but I can't find a version of the code that isn't broken). A simple example:
def test3(fname = '7_1197_.txt'):
import subprocess
command = 'touch data/sentiment/' + fname + '.test'
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return
test3() #this works, creates a file with the .test extention
results = view.map_sync(test3, speeches) #this doesn't work. No files created.
Here's a short version of the function I'm actually going to use. It works fine on its own. In apply_sync()
it spins up java
processes according to htop
, but it doesn't seem to get anything back from those processes.
def test2(fname = '7_1197_.txt'):
import subprocess
settings = ' -mx5g edu.stanford.nlp.sentiment.SentimentPipeline'
inputFile = ' -file data/sentiment/' + fname
command = 'java ' + settings + inputFile
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
results = []
while True:
out = child.stdout.read(1)
if out == '' and child.poll() != None:
return ''.join(results)
if out != '':
results.extend(out)
test2() #Works fine, produces output
results = view.map_sync(test2, speeches) #Doesn't work: the results are empty strings.
I tried a version where I return the command variable. The commands sent to Popen
are fine, and they work when pasted manually in the command line. I thought maybe it was just an issue with piping, but changing the command to redirect the output to files with ' > '+fname+'.out'
doesn't work inside the apply_sync()
call either (no output files are produced).
How should I be doing this so I get the stdout
from the system calls back?
I see two potential gotchas. One for the blocking, one for the missing files. For the missing files, you should make sure that your engines and your local session are in the same working directory, or make sure to use absolute paths. A quick way to synchronize paths locally and remotely:
client[:].apply_sync(os.chdir, os.getcwd())
That says: get the local cwd, then call os.chdir
everywhere, so that we all share the same working directory. A quick shortcut for this if you are in an IPython session is:
%px cd {os.getcwd()}
As for the blocking, my first thought is: are you perhaps using Python 3 when running in parallel? If so, child.stdout.read
returns bytes not text. In Python 2, str is bytes
, so out == ''
will work, but in Python 3, the condition out == ''
will never be true because b'' != u''
, and your function will never return.
Some more useful bits of info:
stdout.read(N)
will read up to that number of bytes, and truncate if the output is complete. This is useful because read(1)
will loop many times, even if the output is all waiting to be read.stdout.read()
will only return an empty bytestring if output is finished, so you only need to check that, not child.poll()
before returning. (this is true as long as you haven't set NOWAIT on the FD, which is some advanced usage).So here are a couple of implementations of your function, with different goals.
The first one appears to accomplish your current goal using Popen.communicate, which is the simplest choice if you don't actually want to do anything with partial output and/or have nothing to do in the function wile you are waiting for output:
def simple(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch -v data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE)
# if we aren't doing anything with partial outputs,
# child.communicate() does all of our waiting/capturing for us:
out, err = child.communicate()
return out
(it might be useful to include stderr capturing as well, with stderr=subprocess.PIPE
or merge stderr into stdout with stderr=subprocess.STDOUT
).
Here's another example, collecting stderr into stdout, and reading in chunks:
def chunked(fname = '7_1197_.txt'):
import subprocess
command = 'echo "{0}" && touch data/sentiment/{0}.test'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
chunks = []
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
chunks.append(chunk)
continue
else:
# read will only return an empty bytestring when output is finished
break
return b''.join(chunks)
Note that we can use the if not chunk
condition to determine when output is finished, rather than if chunk == ''
, since empty bytestrings are Falsy. If we aren't doing something with the partial output, there's really no reason to use this instead of the simpler .communicate()
version above.
Finally, here's a version you can use with IPython that, instead of capturing and returning the output, redisplays it, which we can use to display partial output in the client:
def chunked_redisplayed(fname = '7_1197_.txt'):
import sys, subprocess
command = 'for i in {{1..20}}; do echo "{0}"; sleep 0.25; done'.format(fname)
child = subprocess.Popen(command, shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
while True:
chunk = child.stdout.read(80) # read roughly one line at a time
if chunk:
sys.stdout.write(chunk.decode('utf8', 'replace'))
continue
else:
# read will only return an empty bytestring when output is finished
break
In the client, if you use map_async
instead of map_sync
, you can check on result.stdout
, which is a list of the stdout-streams so far, so you can check on the progress:
amr = view.map_async(chunked_redisplayed, speeches)
amr.stdout # list of stdout text, updated in the background as output is produced
amr.wait_interactive() # waits and shows progress
amr.get() # waits for and returns the actual result