phpwebsocketautobahnphpwebsocketthruway

Thruway manage subscriptions


I try to set up a websocket server via Thruway which can manage multiple groups. Something like a chat application where each client may subscribe to one or multiple ones at the same time and broadcast messages to an entire chat room. I managed to do that with an ancient version of Ratchet but since it doesn't run very smooth, I wanted to switch to Thruway. Sadly I can't find anything to manage groups. So far I have the following as the websocket-manager and the clients are using the current version of Autobahn|js (18.x).

Does anyone have any clue if it is possible to manage subscription groups with something like the following?

<?php

require_once __DIR__.'/../vendor/autoload.php';

use Thruway\Peer\Router;
use Thruway\Transport\RatchetTransportProvider;

$router = new Router();
$router->addTransportProvider(new RatchetTransportProvider("0.0.0.0", 9090));

$router->start();

Solution

  • With ThruWay, things are a little different than old Ratchet. First of all Thruway is not a WAMP Server. It is just a router. So it doesn't have a server instance like old Rathcet has lets you wrap all your server side functionality wrapped up. But it will only get the message packet and will route them to other sessions in same realm depending on their subscriptions. If you ever used socket.io, realm idea is similar to different connections so you can limit your sessions or connections to a single namespace or split functionality of different socket instances like administration, visitors etc.

    On client side with autobahn ( latest version ) once you subscribe to a topic, then publish in that topic, thruway will automatically detect topic subscribers and emit message to them in same realm. But in old ratchet you need to handle this manually by keeping an array of available channels, and add users to each channel when they subscribes as well as broadcast message to these users in topic by iterating over them. This was really painful.

    If you want to use RPC calls in server side and don't want to include some of your stuff on client side, you can still use a class called internalClient on server side. Conceptually Internal Client is another session connects to your thruway client and handles some functions internally without exposing other clients. It receives message packages and does stuff in it then returns result back to requested client connection. It took a while for me to understand how it works but once I figured out the idea behind made more sense.

    so little bit code to explain better,

    In your router instance you will need to add a module, ( note that, in voxys/thruway package examples are little confusing about internal client )

    server.php

    require __DIR__ . "/../bootstrap.php";
    require __DIR__ . '/InternalClient.php';
    
    $port = 8080;
    $output->writeln([
        sprintf('Starting Sockets Service on Port [%s]', $port),
    ]);
    $router = new Router();
    
    $router->registerModule(new RatchetTransportProvider("127.0.0.1", $port));   // use 0.0.0.0 if you want to expose outside world
    
    // common realm ( realm1 )
    $router->registerModule(
        new InternalClient()    // instantiate the Socket class now
    );
    
    // administration realm (administration)
    // $router->registerModule(new \AdminClient());
    
    $router->start();
    

    This will initialize Thruway router and will attach internalclient instance to it. Now in InternalClient.php file you will be able to access actual route as well as current connected clients. With the example they provided, router is not part of instance so you are stuck with only session id property of new connections.

    InternalClient.php

    <?php
    
    use Thruway\Module\RouterModuleInterface;
    use Thruway\Peer\Client;
    use Thruway\Peer\Router;
    use Thruway\Peer\RouterInterface;
    use Thruway\Logging\Logger;
    use React\EventLoop\LoopInterface;
    
    class InternalClient extends Client implements RouterModuleInterface
    {
        protected $_router;
    
        /**
         * Contructor
         */
        public function __construct()
        {
            parent::__construct("realm1");
        }
    
        /**
         * @param RouterInterface $router
         * @param LoopInterface $loop
         */
        public function initModule(RouterInterface $router, LoopInterface $loop)
        {
            $this->_router = $router;
    
            $this->setLoop($loop);
    
            $this->_router->addInternalClient($this);
        }
    
        /**
         * @param \Thruway\ClientSession $session
         * @param \Thruway\Transport\TransportInterface $transport
         */
        public function onSessionStart($session, $transport)
        {
            // TODO: now that the session has started, setup the stuff
    
            echo "--------------- Hello from InternalClient ------------\n";
            $session->register('com.example.getphpversion', [$this, 'getPhpVersion']);
    
            $session->subscribe('wamp.metaevent.session.on_join',  [$this, 'onSessionJoin']);
            $session->subscribe('wamp.metaevent.session.on_leave', [$this, 'onSessionLeave']);
        }
    
        /**
         * Handle on new session joined.
         * This is where session is initially created and client is connected to socket server
         *
         * @param array $args
         * @param array $kwArgs
         * @param array $options
         * @return void
         */
        public function onSessionJoin($args, $kwArgs, $options) {
            $sessionId = $args && $args[0];
            $connectedClientSession = $this->_router->getSessionBySessionId($sessionId);
            Logger::debug($this, 'Client '. $sessionId. ' connected');
        }
    
        /**
         * Handle on session left.
         *
         * @param array $args
         * @param array $kwArgs
         * @param array $options
         * @return void
         */
        public function onSessionLeave($args, $kwArgs, $options) {
    
            $sessionId = $args && $args[0];
    
            Logger::debug($this, 'Client '. $sessionId. ' left');
    
            // Below won't work because once this event is triggered, client session is already ended
            // and cleared from router. If you need to access closed session, you may need to implement
            // a cache service such as Redis to access data manually.
            //$connectedClientSession = $this->_router->getSessionBySessionId($sessionId); 
        }
    
        /**
         * RPC Call messages
         * These methods will run internally when it is called from another client. 
         */
        private function getPhpVersion() {
    
            // You can emit or broadcast another message in this case
            $this->emitMessage('com.example.commonTopic', 'phpVersion', array('msg'=> phpVersion()));
    
            $this->broadcastMessage('com.example.anotherTopic', 'phpVersionRequested', array('msg'=> phpVersion()));
    
            // and return result of your rpc call back to requester
            return [phpversion()];
        }
    
        /**
         * @return Router
         */
        public function getRouter()
        {
            return $this->_router;
        }
    
    
        /**
         * @param $topic
         * @param $eventName
         * @param $msg
         * @param null $exclude
         */
        protected function broadcastMessage($topic, $eventName, $msg)
        {
            $this->emitMessage($topic, $eventName, $msg, false);
        }
    
        /**
         * @param $topic
         * @param $eventName
         * @param $msg
         * @param null $exclude
         */
        protected function emitMessage($topic, $eventName, $msg, $exclude = true)
        {
            $this->session->publish($topic, array($eventName), array('data' => $msg), array('exclude_me' => $exclude));
        }
    
    }
    

    Few things to note in above example code, - In order to receive a message in a topic, in client side you need to be subscribed to that topic. - Internal client can publish/emit/broadcast any topic without any subscription in same realm. - broadcast/emit functions are not part of original thruway, something I come up with to make publications little easier on my end. emit will will send message pack to everyone has subscribed to topic, except sender. Broadcast on the other hand won't exclude sender.

    I hope this information would help a little to understand the concept.