I'm very new to both scalaz-stream and specifically scalaz.stream.tcp. I'm trying to do a very simple server for my own educational purposes. I parse the requests into commands, execute them to produce responses, and write the responses back to the client. The part I am having issues with is that I want to log each received command to stdout.
Here is my inner Process that I am passing to tcp.server:
def inner: Process[tcp.Connection, Unit] = {
val requests: Process[Connection, String] = tcp.reads(1024) pipe text.utf8Decode
val cmds: Process[Connection, Command] = requests.map(parseRequest)
val header: Process[Task, ByteVector] = Process("HEADER\n").pipe(text.utf8Encode)
val loggedCmds: Process[Connection, Command] = cmds.map { cmd =>
println(cmd.toString)
cmd
}
val results: Process[Connection, Process[Task, ByteVector]] = loggedCmds.map(_.execute)
val processedRequests: Process[Connection, Unit] = results.flatMap(result => tcp.writes(tcp.lift(header ++ result)))
processedRequests
}
(I am not in the habit of specifying the types everywhere; I just did that to try to get a handle on things. I plan to remove those.)
The above code actually compiles and runs correctly, but I do not feel it is very clean or idiomatic. Specifically I am unhappy with the loggedCmds part. I wanted to use io.stdOutLines, either through .observer or using writer.logged/mapW/drainW, but no matter what I tried I could not seem to get the types to line up correctly. I was always getting type conflicts between Task and Connection. tcp.lift seems to help with an input stream, but it does not seem to work for a Sink. Is there a cleaner/better way to do the loggedCmds part (FWIW: I'm open to corrections or improvements to any of the above code).
I should note that if I just have the results go to stdout via io.stdOutLines I do not have an issue ("through" seems to work in that case, which I have seen in examples), it's just when I want to send the stream to io.stdOutLines and also continue using the stream to respond to the client.
Figured it out on my own (finally). Using ".toChannel" I was able to do it:
val cmdFormatter = process1.id[Command].map(_.toString)
val cmdPrinter = io.stdOutLines.pipeIn(cmdFormatter)
...
val cmds: Process[Connection, Command] = requests.map(parseRequest) through
cmdPrinter.toChannel