gostreamp2plibp2p

How to handle buffered Read-Write Stream(s) to peers in golang using libp2p?


I am following this tutorial:

https://github.com/libp2p/go-libp2p-examples/tree/master/chat-with-mdns

In a short form, it:

  1. configures a p2p host
  2. sets a default handler function for incoming connections (3. not necessary)
  3. and opens a stream to the connecting peers:

stream, err := host.NewStream(ctx, peer.ID, protocol.ID(cfg.ProtocolID))

Afterwards, there is a buffer stream/read-write variable created:

rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))

Now this stream is used to send and receive data between the peers. This is done using two goroutine functions that have rw as an input:

go writeData(rw) go readData(rw)

My problems are:

  1. I want to send data to my peers and need feedback from them: e.g. in rw there is a question and they need to answer yes/no. How can I transfer back this answer and process it (enable some interaction)?

  2. The data I want to send in rw is not always the same. Sometimes it's a string containing only a name, sometimes it's a string containing a whole block etc. How can I distinguish?

I thought about those solutions. But I am new to golang, so maybe you have a better one:

Thank you for any help to solve this!!


Solution

  • This is what readData does from your tuto:

    func readData(rw *bufio.ReadWriter) {
        for {
            str, err := rw.ReadString('\n')
            if err != nil {
                fmt.Println("Error reading from buffer")
                panic(err)
            }
    
            if str == "" {
                return
            }
            if str != "\n" {
                // Green console colour:    \x1b[32m
                // Reset console colour:    \x1b[0m
                fmt.Printf("\x1b[32m%s\x1b[0m> ", str)
            }
    
        }
    }
    

    It basically reads the stream until it finds a \n, which is a new line character and prints it to stdout.

    The writeData:

    func writeData(rw *bufio.ReadWriter) {
        stdReader := bufio.NewReader(os.Stdin)
    
        for {
            fmt.Print("> ")
            sendData, err := stdReader.ReadString('\n')
            if err != nil {
                fmt.Println("Error reading from stdin")
                panic(err)
            }
    
            _, err = rw.WriteString(fmt.Sprintf("%s\n", sendData))
            if err != nil {
                fmt.Println("Error writing to buffer")
                panic(err)
            }
            err = rw.Flush()
            if err != nil {
                fmt.Println("Error flushing buffer")
                panic(err)
            }
        }
    }
    

    It reads data from stdin, so you can type messages, and writes this to the rw and flushes it. This kind of enables a sort of tty chat. If it works correctly you should be able to start at least two peers and communicate through stdin.

    You shouldn't recreate new rw for new content. You can reuse the existing one until you close it. From the tuto's code, a new rw is created for each new peer.


    Now a tcp stream does not work as an http request with a request and a response corresponding to that request. So if you want to send something, and get the response to that specific question, you could send a message of this format:

    [8 bytes unique ID][content of the message]\n
    

    And when you receive it, you parse it, prepare the response and send it with the same format, so that you can match messages, creating a sort of request/response communication.

    You can do something like that:

    func sendMsg(rw *bufio.ReadWriter, id int64, content []byte) error {
            // allocate our slice of bytes with the correct size 4 + size of the message + 1
            msg := make([]byte, 4 + len(content) + 1)
    
            // write id 
            binary.LittleEndian.PutUint64(msg, uint64(id))
    
            // add content to msg
            copy(msg[13:], content)
    
            // add new line at the end
            msg[len(msg)-1] = '\n'
    
            // write msg to stream
            _, err = rw.Write(msg)
            if err != nil {
                fmt.Println("Error writing to buffer")
                return err
            }
            err = rw.Flush()
            if err != nil {
                fmt.Println("Error flushing buffer")
                return err
            }
            return nil
    }
    
    func readMsg(rw *bufio.ReadWriter) {
        for {
            // read bytes until new line
            msg, err := rw.ReadBytes('\n')
            if err != nil {
                fmt.Println("Error reading from buffer")
                continue
            }
    
            // get the id
            id := int64(binary.LittleEndian.Uint64(msg[0:8]))
    
            // get the content, last index is len(msg)-1 to remove the new line char
            content := string(msg[8:len(msg)-1])
    
            if content != "" {
                // we print [message ID] content
                fmt.Printf("[%d] %s", id, content)
            }
    
            // here you could parse your message
            // and prepare a response
            response, err := prepareResponse(content)
            if err != nil {
                fmt.Println("Err while preparing response: ", err)
                continue
            }
    
            if err := s.sendMsg(rw, id, response); err != nil {
                fmt.Println("Err while sending response: ", err)
                continue
            }
        }
    }
    

    Hope this helps.