phpamphp

Amphp : Run many async loops with same connection (eventstore client)


I'm using eventstore client which uses amphp. I need inside my application to reuse the connection in many parts.

So I created a connection provider:

public function getConnection(): EventStoreConnection
{
    if ($this->connection) {
        return $this->connection;
    }
    $this->connection = $this->createConnection();
    wait($this->connection->connectAsync());

    return $this->connection;
}

And then I use this connection at many places:

\Amp\Loop::run(function () use ($eventStoreEvents, $streamName) {
    $connection = $this->connectionProvider->getConnection();

    // Creation of an event stream
    yield $connection->appendToStreamAsync($streamName, ExpectedVersion::ANY, $eventStoreEvents);
    // sleep(10); // This sleep does not work, code continue like nothing happend
});

\Amp\Loop::run(function () use ($streamName, $aggregateFqcn, &$aggregateRoot) {

    $start = 0;
    $count = \Prooph\EventStore\Internal\Consts::MAX_READ_SIZE;

    $connection = $this->connectionProvider->getConnection();

    do {
        $events = [];
        /** @var StreamEventsSlice $streamEventsSlice */
        $streamEventsSlice = yield $connection
            ->readStreamEventsForwardAsync(
                $streamName,
                $start,
                $count,
                true
            );

        if (!$streamEventsSlice->status()->equals(SliceReadStatus::success())) {
            dump($streamEventsSlice); // Event stream does not exist
            // Error here: the event stream doesn't exist at this point.
            throw new RuntimeGangxception('Impossible to generate the aggregate');
        }
    } while (! $streamEventsSlice->isEndOfStream());
});

The problem: it seems that the first request is not over but the second loop starts already. The sleep uncommented doesn't have any effect!

But the event stream is finally created with the related events inside, so the first request worked.

If I start a connection then close then start a new one, it works. But it's slow, due to handshake overhead on each new connection.

I tried a similar example with the WebSocket library of Amphp and it worked. Do you see anything wrong?

Here is my test with websocket that worked:

$connection = \Amp\Promise\wait(connect('ws://localhost:8080'));
Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("Hello...");
   sleep(10); // This sleep works!
});

Amp\Loop::run(function () use ($connection) {
   /** @var Connection $connection */
   yield $connection->send("... World !");
});

$connection->close();

Solution

  • Prooph eventstore library is based on amphp but doesn't follow all principles: you can't wait for the connection to be ready. It will be even worse if you try to use it at scale, so don't try to wait for the promise is complete.

    As an alternative, you can set a promise for later and check if the connection is null. That's what actually does the library internally to process further steps.

    On my side, I decided to stop using this library. But as an alternative you can use the library that uses the HTTP client, it's also from the prooph team.