Suppose we have an API POST endpoint which returns answer like this:
{
"data": [1,2,3],
"total_rows": 20
}
Which means that we received data partially, only the first page of 3 entries. The total entries count is 20, which means we want to call other pages using some offset in request. The total count (and possibly offset) is therefore only known after the first call is completed, while other calls do not depend on each other and can be done simultaneously.
It's PHP and for this task Guzzle 6 is used with Promises/A+. Additionally, Guzzle offers EachPromise class, which receives an iterable of multiple promises and a config hash to setup processing. As per PhpDoc block of that class constructor:
Configuration hash can include the following key value pairs:
- fulfilled: (callable) Invoked when a promise fulfills. The function is invoked with three arguments: the fulfillment value, the index
position from the iterable list of the promise, and the aggregate
promise that manages all of the promises. The aggregate promise may
be resolved from within the callback to short-circuit the promise.- rejected: (callable) Invoked when a promise is rejected. The function is invoked with three arguments: the rejection reason, the
index position from the iterable list of the promise, and the
aggregate promise that manages all of the promises. The aggregate
promise may be resolved from within the callback to short-circuit
the promise.- concurrency: (integer) Pass this configuration option to limit the allowed number of outstanding concurrently executing promises,
creating a capped pool of promises. There is no limit by default.
$paginatedResult = $this->client->sendAsync($this->createRequest($requestBody))
->then(function (ResponseInterface $response) use ($requestBody) {
return $this->deserializeToPaginatedResult(
$response,
$requestBody->paginatedResultClass()
);
}
)->wait();
$pageGenerator = function () use ($paginatedResult, $requestBody) {
$perPageCount = count($paginatedResult->getItems());
$totalItems = $paginatedResult->getTotalCount();
for ($currentOffset = $perPageCount; $currentOffset <= $totalItems; $currentOffset += $perPageCount) {
$newRequestBody = clone $requestBody;
$newRequestBody->setOffset($currentOffset);
yield $this->client->sendAsync($this->createRequest($newRequestBody));
}
};
$aggregatedResult = (new EachPromise(
$pageGenerator(), [
'concurrency' => 4,
'fulfilled' => function ($promiseResult, $promiseIndex, $promiseAggregate) use ($requestBody) {
$paginatedResult = $this->deserializeToPaginatedResult(
$promiseResult,
$requestBody->paginatedResultClass()
);
return $paginatedResult->getItems();
},
]
))->promise()
->then(
function ($promisedAggregatedResult) {
var_dump($promisedAggregatedResult);
}
)
->wait();
var_dump($aggregatedResult);
In the config hash the fulfilled
callback receives 3 objects, as documentation states. The $promiseResult
can be properly handled and $paginatedResult->getItems()
actually returns an array of items in requested page, but I cannot aggregate those items. The $aggregatedResult
is null
, and the $promisedAggregatedResult
inside the last thened fulfillment callback is also null
.
How should one properly use Guzzle's EachPromise
(and it's helper functions each
and each_limit
) to aggregate results of all the promises passed to it?
As per EachPromise class description:
Represents a promise that iterates over many promises and invokes side-effect functions in the process.
So those config hash callables are just side-effect functions which might or might not short-circuit resolve the aggregate promise. And that's exactly is the reason why fulfillment
function neither affected the $promiseAggregate
, nor ended up in last then fulfillment callable argument $promisedAggregatedResult
.
The hint on proper usage can be found in \GuzzleHttp\Promise\all
and \GuzzleHttp\Promise\some
functions. The key idea is to use
external aggregate by reference in those side-effect fulfilled
/rejected
callbacks. If the aggregate promise is never resolved inside those side-effect function, then it resolves to null
, which is then passed on in the next then
fulfillment callback. Again, one should use
aggregate by reference in the fulfillment callable just to return it as promise fulfillment value.
$paginatedResult = $this->client->sendAsync($this->createRequest($requestBody))
->then(function (ResponseInterface $response) use ($requestBody) {
return $this->deserializeToPaginatedResult(
$response,
$requestBody->paginatedResultClass()
);
}
)->wait();
$pageGenerator = function () use ($paginatedResult, $requestBody) {
$perPageCount = count($paginatedResult->getItems());
$totalItems = $paginatedResult->getTotalCount();
for ($currentOffset = $perPageCount; $currentOffset <= $totalItems; $currentOffset += $perPageCount) {
$newRequestBody = clone $requestBody;
$newRequestBody->setOffset($currentOffset);
yield $this->client->sendAsync($this->createRequest($newRequestBody));
}
};
$items = $paginatedResult->getItems();
return each_limit(
$pageGenerator(),
4,
function ($promiseResult) use (&$items, $requestBody) {
$paginatedResult = $this->deserializeToPaginatedResult(
$promiseResult,
$requestBody->paginatedResultClass()
);
$items = array_merge($items, $paginatedResult->getItems());
},
function ($reason, $idx, PromiseInterface $aggregate) {
$aggregate->reject($reason);
})->then(function () use (&$items) {
ksort($items);
return $items;
});