socketsunixasynchronousocamllwt

Redirecting Lwt channels


I have a ssh service running on an Unix socket and I have a local TCP server for which I'm wanting to have it be directed toward the channels of the unix socket.

Basically when I do:

$ ssh root@localhost -p 2000

Then my local TCP server get the request and pipes it to the Unix socket and the TCP client, ssh in this case, gets the reply from the Unix socket. Relevant code:

  let running_tunnel debug (tcp_ic, tcp_oc) () =
    Lwt_io.with_connection a_unix_addr begin fun (mux_ic, mux_oc) ->
      let%lwt _ = some_call with_an_arg
      and _ =

        (* Some setup code *)


      let rec forever () =
        Lwt_io.read_line tcp_ic >>= fun opening_message ->
        Lwt_io.write_from_string_exactly
        mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
        Lwt_io.read_line mux_ic >>= fun reply ->
        Lwt_io.printl reply >>= fun () ->
        Lwt_io.write_line tcp_oc reply >>= fun () ->
        forever ()
      in
      forever ()
      in
      Lwt.return_unit
    end

And this sort of works. It gets "stuck" when I invoke ssh on the command line but I know I'm getting some data through because the other side's ssh header is correct, SSH-2.0-OpenSSH_6.7. I also get my side to print out more parts of the initial ssh handshake, i.e. I see this printed:

??^?W񦛦\zJ?~??curve25519-sha256@libssh.org,ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group14-sha1ssh-rsa,ssh-dss>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com>aes128-ctr,aes192-ctr,aes256-ctr,chacha20-poly1305@openssh.com?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1?umac-64-etm@openssh.com,umac-128-etm@openssh.com,hmac-sha2-256-etm@openssh.com,hmac-sha2-512-etm@openssh.com,hmac-sha1-etm@openssh.com,umac-64@openssh.com,umac-128@openssh.com,hmac-sha2-256,hmac-sha2-512,hmac-sha1none,zlib@openssh.comnone,zlib@openssh.co 

etc, which seems correct. I figured that the reason for the hang was because I am using Lwt_io.read_line so I tried this instead:

            let rec forever () =
              Lwt_io.read tcp_ic >>= fun opening_message ->
              Lwt_io.write_from_string_exactly
                mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
              Lwt_io.read mux_ic >>= fun reply ->
              Lwt_io.printl reply >>= fun () ->
              Lwt_io.write tcp_oc reply >>= fun () ->
              forever ()
            in
            forever ()

Which actually worked worse, it didn't even print out the initial handshake. I've also tried the dedicated {write,read}_into... functions with limited success. Running under strace/dtruce I see end results like:

read(0x6, "SSH-2.0-OpenSSH_6.9\r\n\0", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.9\n\0", 0x14)      = 20 0
read(0x7, "\0", 0x1000)      = -1 Err#35
write(0x7, "SSH-2.0-OpenSSH_6.9\0", 0x13)        = 19 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x7, "SSH-2.0-OpenSSH_6.7\r\n\0", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.7\n\0", 0x14)      = 20 0
read(0x6, "\0", 0x1000)      = -1 Err#35
write(0x6, "SSH-2.0-OpenSSH_6.7\n\0", 0x14)      = 20 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x6, "\0", 0x1000)      = 1968 0
read(0x6, "\0", 0x1000)      = -1 Err#35
^C

Where 6.9 is my local machine's ssh and 6.7 is the remote machine behind the Unix socket. One thing that does seem odd to me is how the \r is dropped and this changes the read/write count by 1. I'm unsure if this could be the crucial difference.

Ideally I'd like some kind of abstraction from Lwt that would say whenever there is data available on this readable channel (TCP socket), write it directly to the writable channel (Unix socket), and vice versa.


Solution

  • The variant with readline didn't work since the data stream is binary, and readline is for a textual line-based input. The second variant with the Lwt_io.read function didn't work since this function will read all input up to the end, unless you have specified the optional count parameter. This means, that control will be passed to the write only after the EOF on the reader side. Using Lwt_io.read with some count, e.g., Lwt_io.read ~count:1024 mux_ic would be not a very bad idea. Also, you shouldn't forget to check the return value, if you expect your stream to be finite. The read_into should be used with care as, unlike the read function, it doesn't guarantee that it will read the exact amount of data, that you've requested. In other words, there will be short reads. The same is true for the write_into function. The _exactly versions of this functions don't suffer from this problem, so it is better to use them instead.

    There is one other thing, that you should consider. The Lwt_io provides an interface for a buffered input and output. That means, that all functions in this module are writing and reading to or from some internal buffer, instead of interacting directly with the operating system via a device descriptor. That means, that when you're piping data from one buffered source to another buffered source, you will have some unexpected delays on both ends. So you should anticipate them using flushes. Otherwise thay may introduce race conditions, when you have two way interaction.

    Moreover, although the buffered io simplifies things a lot, it comes with a price. In fact, you have several unnecessary layers of buffers, when you're using Lwt_io, you also allocate lots of unnecessary data, trashing your memory with a garbage. The problem is that Lwt_io has its own internal buffer, that it doesn't reveal for a casual user, and all functions that returns data or consumes data need to perform extra copy operation to or from the internal function. For example, using Lwt_io.{read,write}, will do the following:

    1. copy data from kernel into internal buffer
    2. allocate a string
    3. copy data from internal buffer to the allocated string
    4. (now the write part) copy data from the string into internal buffer
    5. copy data from internal buffer to kernel.
    6. (somewhere and sometimes later, inside the GC) copy allocated string from minor heap to major (if the string was small enough to fit into the minor heap) or copy it from one location to another (if compactation algorithm decides to move it, and the string is still alive, that is quite possible, if producers outruns the consumer and lifetime of the read data becomes quite long).

    It looks like that we can get rid of copies in 2, 3, 4 and 6. We can use our own buffer, and copy data from kernel into it, and then copy data from this kernel back to kernel. We can even get rid of copies in 1 and 5, by using splice and tee system calls, that copy data directly between kernel buffers without involving the user space at all. But in that case we will lose the ability to examine the data, and usually this is what we want.

    So, let's try to remove all copies except the copies from the kernel space. We can use a low-level interface to the interal buffer in the Lwt_io, like direct_access and newly added block function, but this requires a knowledge of the internals of Lwt_io and not very trivial, but still doable. Instead, we will use a simplier approach that uses Lwt_unix library. This library interacts directly with kernel without any intermediate buffers, leaving the buffering on our own.

    open Lwt.Infix
    
    let bufsiz = 32768
    
    let echo ic oc =
      let buf = Lwt_bytes.create bufsiz in
      let rec loop p =
        let p = p mod bufsiz in
        Lwt_bytes.read ic buf p (bufsiz - p) >>= function
        | 0 -> Lwt.return ()
        | n -> Lwt_bytes.write oc buf p n >>= loop in
      loop 0
    

    This will implement a simple and fast duplication of data, that will copy data with the same speed as cat program. There is still some room for improvement, although. For example, one should add error handing, for robustness (in particular for EINTR signal). Also, this function implements synchronous copy, where the input and output are tightly locked. Sometimes it is not an option. Consider the following example, the input is an UDP socket, that may easily outrun the consumer, and data will be droped off, even if in on average the producer is slower than a consumer. To handle this you need to split readers and writers into two separate threads, that communicate via some elastic queue.

    Lwt is quite low-level library that doesn't and shouldn't solve this things for you. It provides mechanisms, that can be used to build a solution for each case in particular. There're libraries, that do provide solutions for some common patterns, 0MQ and nanomessages are good examples.

    Update

    I might be too low-level guy, and may be I dig to deep. If you're really looking for a high-level approach, then you should use Lwt_stream's, in that case you can code node equivaluent of foo.pipe(bar).pipe(foo) as

    let echo ic oc = Lwt_io.(write_chars oc (read_chars ic))
    

    Of course this will be much slower, but it depends on your task.

    And yes, to perform two-way redirection you should just run two threads, like this: echo ic oc <&> echo ic oc for the version with file-descriptors, that are both writeable. If you're using Lwt_io channels, that are unidirected like pipes, then you will get two endpoints for each part. Lets name them fi and fo for frontend input and output correspondingly, and bi, bo for the backend part. Then you need to connect it like this: echo fo bi <&> echo bo fi, using the second version of echo with streams.

    Performance cost

    Usually high-level abstractions come with a performance cost. In our particular case, using a first version of echo, has throughtput more than 1Gb per second. A version with streams has average throughtput 5MB/s. Depending on your setup it might or might not work. It is more than enough for a regular ssh session, but may have an impact on scp in local networks.