We're currrently using the ReactiveSecurityContextHolder which gets our correct Auth details and is used along the Flux stream.
Now we want to de-couple stuff. The first iteration is that we use a Sinks as an intermediate 'event-hub'. So from an endpoint we produce some item to the Sinks.Many.
A listener is consuming events from this Sinks and doing the heavy work. Now in this consumer I'd like to use the context which is available on the producing site. I know one can do deferContextual
to pass on the current context to another Flux. But is it possible to pass the context to the resulting Flux from the Sinks?
Thanks in advance.
Alex
There is currently no API that exposes that on arbitrary Sinks
. The challenge with Sinks
is that a lot of them are multicasting to multiple Subscriber
s, and the Context
is defined on each Subscriber
.
There is a hack though: Sinks.Many<T>
is Scannable
, and most concrete implementations should expose their current collection of subscribers through the Stream<Scannable> inners()
method. In the case of a unicast sink, scan(Attr.ACTUAL)
would also work.
Two big caveats:
Scannable
s, which doesn't allow access to Context
directlyScannable
, it is replaced in the stream by the Scannable#NOT_SCANNABLE
constantMost if not all of reactor-core
CoreSubscribers
are Scannable
, but if you connect a custom subscriber which isn't Scannable, even though it has a Context
, you won't be able to see it.
Multicast Sinks in reactor-core tend to wrap downstream subscribers in their own inner Scannable inner tracker, which would make this approach work.
Unicast Sinks are a bit different as they directly attach to the downstream Subscriber
. So if it is a CoreSubscriber
but somehow not a Scannable
, you won't be able to see it as a CoreSubscriber
and access its Context
.
To sum up the approach:
sink.inners()
to get a Stream<Scannable>
CoreSubscriber
(that's the part where things can go wrong)CoreSubscriber
and call currentContext()
Context
s you got to extract the relevant key-value pair(s)