I'm launching a process using ProcessBuilder like so:
val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
I'd like to do 2 things with the stdout of the process:
As far as I can tell, in order to do both of these things I'll need to "split" the InputStream I get from proc.inputStream
so that every line is mirrored to 2 other InputStreams: one that can be used to log to a file, and another to parse and monitor the status of the process.
One option would be to have a thread which reads from the InputStream fires an event with each line read to "subscribers", and I think this should work fine, but I was hoping to come up with a more generic "Tee" type functionality that would expose InputStreams to be consumed by whatever wanted to. Basically something like this:
val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
val originalInputStream = proc.inputStream
val tee = Tee(originalInputStream)
// Every line read from originalInputStream would be
// mirrored to all branches (not necessarily every line
// from the beginning of the originalInputStream, but
// since the start of the lifetime of the created branch)
val branchOne: InputStream = tee.addBranch()
val branchTwo: InputStream = tee.addBranch()
I took a shot at a Tee
class, but I'm not sure what to do in the addBranch
method:
class Tee(inputStream: InputStream) {
val reader = BufferedReader(InputStreamReader(inputStream))
val branches = mutableListOf<OutputStream>()
fun readLine() {
val line = reader.readLine()
branches.forEach {
it.write(line.toByteArray())
}
}
fun addBranch(): InputStream {
// What to do here? Need to create an OutputStream
// which readLine can write to, but return an InputStream
// which will be updated with each future write to that
// OutputStream
}
}
EDIT: The implementation of Tee
I ended up with was as follows:
/**
* Reads from the given [InputStream] and mirrors the read
* data to all of the created 'branches' off of it.
* All branches will 'receive' all data from the original
* [InputStream] starting at the the point of
* the branch's creation.
* NOTE: This class will not read from the given [InputStream]
* automatically, its [read] must be invoked
* to read the data from the original stream and write it to
* the branches
*/
class Tee(inputStream: InputStream) {
val reader = BufferedReader(InputStreamReader(inputStream))
var branches = CopyOnWriteArrayList<OutputStream>()
fun read() {
val c = reader.read()
branches.forEach {
// Recreate the carriage return so that readLine on the
// branched InputStreams works
it.write(c)
}
}
fun addBranch(): InputStream {
val outputStream = PipedOutputStream()
branches.add(outputStream)
return PipedInputStream(outputStream)
}
}
Take a look at the org.apache.commons.io.output.TeeInputStream from Apache Commons then you don't need to bother writing your own.
val pb = ProcessBuilder("/path/to/process")
pb.redirectErrorStream(true)
val proc = pb.start()
val original = proc.inputStream
val out = new PipedOutputStream()
val in = new PipedInputStream()
out.connect(in)
val tee = new TeeInputStream(in, out)
Then just read from tee
instead of original
, and any bytes read will be also written to out. By using the Piped streams, the data written to out will be made available to be read via in
and so now you can have two threads reading from in
and tee
independently. One thread writing to logs, and one thread monitoring lines.