multithreadinggtkclient-servernim-lang

How to have a message from a thread-channel trigger an update in owlkettle?


After doing some research in general in owlkettle (https://github.com/can-lehmann/owlkettle) for a "client-server" architecture (see here ) I tried to figure out how to specifically write one for an owlkettle app.

The idea is to have 2 threads:

  1. A "client" thread running a GTK application built by owlkettle
  2. A "server" thread that does whatever heavy computing is necessary.

For them to communicate you need 2 channels:

  1. one for messages from client => server
  2. one for messages from client <= server

Now the problem becomes that a message from the server does not and can not trigger an update in owlkettle It gets stuck in the channel until owlkettle itself decides to trigger an update (e.g. when clicking a button) during which it reads the message. There is no convenient way or hook to say "New server message arrived, update the UI with the new data".

This is obvious in the below example. When clicking the button it sends a message to the server (via channel 1) which sends a response (via channel 2).

You do not immediately see that update in the UI. only when you click the button, because button-clicks trigger general UI updates on their own.

        import owlkettle, owlkettle/[playground, adw]
        import std/[options, os]
         
        var counter: int = 1
         
        type ChannelHub = ref object
          serverChannel: Channel[string]
          clientChannel: Channel[string]
         
        proc sendToServer(hub: ChannelHub, msg: string): bool =
          echo "send client => server: ", msg
          hub.clientChannel.trySend(msg)
         
        proc sendToClient(hub: ChannelHub, msg: string): bool =
          echo "send client <= server: ", msg
          hub.serverChannel.trySend(msg)
         
        proc readClientMsg(hub: ChannelHub): Option[string] =
          let response: tuple[dataAvailable: bool, msg: string] = hub.clientChannel.tryRecv()
         
          return if response.dataAvailable:
              echo "read client => server: ", response.repr
              some(response.msg)
            else:
              none(string)
         
        proc readServerMsg(hub: ChannelHub): Option[string] =
          let response: tuple[dataAvailable: bool, msg: string] = hub.serverChannel.tryRecv()
         
          return if response.dataAvailable:
              echo "read client <= server: ", response.repr
              some(response.msg)
            else:
              none(string)
         
        proc setupServer(channels: ChannelHub): Thread[ChannelHub] =
          proc serverLoop(hub: ChannelHub) =
            while true:
              let msg = hub.readClientMsg()
              if msg.isSome():
                discard hub.sendToClient("Received Message " & $counter)
         
         
                counter.inc
              sleep(0) # Reduces stress on CPU when idle, increase when higher latency is acceptable for even better idle efficiency
         
          createThread(result, serverLoop, channels)
         
        viewable App:
          hub: ChannelHub
          backendMsg: string = ""
         
        method view(app: AppState): Widget =
          let msg: Option[string] = app.hub.readServerMsg()
          if msg.isSome():
            app.backendMsg = msg.get()
           
          result = gui:
            Window:
              defaultSize = (500, 150)
              title = "Client Server Example"
             
              Box:
                orient = OrientY
                margin = 12
                spacing = 6
               
                Button {.hAlign: AlignCenter, vAlign: AlignCenter.}:
                  Label(text = "Click me")
                 
                  proc clicked() =
                    discard app.hub.sendToServer("Frontend message!")
                   
         
                Label(text = "Message sent by Backend: ")
                Label(text = app.backendMsg)
         
        proc setupClient(channels: ChannelHub): Thread[ChannelHub] =
          proc startOwlkettle(hub: ChannelHub) =
            adw.brew(gui(App(hub = hub)))
         
          createThread(result, startOwlkettle, channels)
         
        proc main() =
          var serverToClientChannel: Channel[string]
          var clientToServerChannel: Channel[string]
          serverToClientChannel.open()
          clientToServerChannel.open()
         
          let hub = ChannelHub(serverChannel: serverToClientChannel, clientChannel: clientToServerChannel)
         
          let client = setupClient(hub)
          let server = setupServer(hub)
          joinThreads(server, client)
         
        main()

How do I get this to trigger updates in the frontend when the server sends a message?


Solution

  • The solution for this is g_idle_add_full proc (or alternatively g_timeout_add_full). What these do is register a function with GTK that gets called whenever the GTK main thread is idle (or whenever the GTK thread is idle and a timeout expires in the case of g_timeout_add_full).

    Both of these are in fact available and wrapped by owlkettle! Just use the addGlobalTimeout or addGlobalIdleTask procs (don't forget to add a "sleep" in the global idle task to not fully block the CPU with it).

    Use this to check if there are any messages in the channel that receives messages from the server. If there are, trigger an update in owlkettle.

    You can register that proc on startup in the afterBuild hook of the App Widget.

    That would look like this:

    type ListenerData = object
      hub: ChannelHub[string, string]
      app: Viewable
    
    proc addServerListener(app: Viewable, hub: ChannelHub[string, string], priority: int = 200) =
      ## Adds a callback function to the GTK app that checks every 5 ms whether the 
      ## server sent a new message. Triggers a UI update if that is the case.
      proc listener(): bool =    
        if hub.hasServerMsg():
          discard app.redraw()
        
        const KEEP_LISTENER_ACTIVE = true
        return KEEP_LISTENER_ACTIVE
    
      const MESSAGE_CHECK_INTERVAL_MS = 5
      discard addGlobalTimeout(MESSAGE_CHECK_INTERVAL_MS, listener)
    
    viewable App:
      hub: ChannelHub[string, string]
      backendMsg: string = ""
    
      hooks:
        afterBuild:
          addServerListener(state, state.hub)
    

    Here the complete example (with also a bit of clean-up). Note though, the owlkettle bits were moved into the main thread, there is no need to have it outside the main thread:

    
    import owlkettle, owlkettle/[widgetutils, adw]
    import std/[options, os]
    
    var counter: int = 0
    
    ## Communication
    type ChannelHub[SMsg, CMsg] = ref object
      serverChannel: Channel[SMsg]
      clientChannel: Channel[CMsg]
    
    proc new[SMsg, CMsg](t: typedesc[ChannelHub[SMsg, CMsg]]): ChannelHub[SMsg, CMsg] =
      result = ChannelHub[SMsg, CMsg]()
      result.serverChannel.open()
      result.clientChannel.open()
      
    proc destroy[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg]) =
      hub.serverChannel.close()
      hub.clientChannel.close()
    
    proc sendToServer[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg], msg: string): bool =
      echo "send client => server: ", msg
      hub.clientChannel.trySend(msg)
      
    proc sendToClient[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg], msg: string): bool =
      echo "send client <= server: ", msg
      hub.serverChannel.trySend(msg)
    
    proc readClientMsg[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg]): Option[string] =
      let response: tuple[dataAvailable: bool, msg: string] = hub.clientChannel.tryRecv()
      
      result = if response.dataAvailable:
          echo "read client => server: ", response.repr
          some(response.msg)
        else:
          none(string)
    
    proc readServerMsg[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg]): Option[string] =
      let response: tuple[dataAvailable: bool, msg: string] = hub.serverChannel.tryRecv()
    
      result = if response.dataAvailable:
          echo "read client <= server: ", response.repr
          some(response.msg)
        else:
          none(string)
      
    proc hasServerMsg[SMsg, CMsg](hub: ChannelHub[SMsg, CMsg]): bool =
      hub.serverChannel.peek() > 0
    
    
    
    ## Server
    proc setupServer(channels: ChannelHub[string, string]): Thread[ChannelHub[string, string]] =
      proc serverLoop(hub: ChannelHub[string, string]) =
        while true:
          let msg = hub.readClientMsg()
          if msg.isSome():
            discard hub.sendToClient("Received Message " & $counter)
    
            counter.inc
          sleep(0) # Reduces stress on CPU when idle, increase when higher latency is acceptable for even better idle efficiency
      
      createThread(result, serverLoop, channels)
    
    
    
    ## Client
    type ListenerData = object
      hub: ChannelHub[string, string]
      app: Viewable
    
    proc addServerListener(app: Viewable, hub: ChannelHub[string, string], priority: int = 200) =
      ## Adds a callback function to the GTK app that checks every 5 ms whether the 
      ## server sent a new message. Triggers a UI update if that is the case.
      proc listener(): bool =    
        if hub.hasServerMsg():
          discard app.redraw()
        
        const KEEP_LISTENER_ACTIVE = true
        return KEEP_LISTENER_ACTIVE
    
      const MESSAGE_CHECK_INTERVAL_MS = 5
      discard addGlobalTimeout(MESSAGE_CHECK_INTERVAL_MS, listener)
    
    viewable App:
      hub: ChannelHub[string, string]
      backendMsg: string = ""
    
      hooks:
        afterBuild:
          addServerListener(state, state.hub)
    
    method view(app: AppState): Widget =
      let msg: Option[string] = app.hub.readServerMsg()
      if msg.isSome():
        app.backendMsg = msg.get()
        
      result = gui:
        Window:
          defaultSize = (500, 150)
          title = "Client Server Example"
          
          Box:
            orient = OrientY
            margin = 12
            spacing = 6
            
            Button {.hAlign: AlignCenter, vAlign: AlignCenter.}:
              Label(text = "Click me")
              
              proc clicked() =
                discard app.hub.sendToServer("Frontend message!")
                
    
            Label(text = "Message sent by Backend: ")
            Label(text = app.backendMsg)
    
    proc setupClient(hub: ChannelHub) =
      adw.brew(gui(App(hub = hub)))
    
    
    
    
    ## Main
    proc main() =
      let hub = new(ChannelHub[string, string])
      let server = setupServer(hub)
      setupClient(hub)
      joinThread(server)
    
      hub.destroy()
      
    main()