What's an elegant way of composing an Rx observable which would resemble ReplaySubject
, but only emit the accumulated sequence once and for the first subscriber only (when that subscriber is connected)? After the 1st subscription, it should act just as a regular Subject
.
This for a .NET project, but I'd equally appreciate JavaScript/RxJS answers.
I did google for potential solutions, and I'm about to roll out my own, similar to how I approached DistinctSubject
, eventually.
I modified slightly the implementation found in a similar question, and changed the name of the class from ReplayOnceSubject
to ReplayFirstSubscriberOnlySubject
:
public class ReplayFirstSubscriberOnlySubject<T> : ISubject<T>
{
private readonly object _locker = new object();
private ISubject<T> _subject = new ReplaySubject<T>();
public void OnNext(T value) { lock (_locker) _subject.OnNext(value); }
public void OnError(Exception error) { lock (_locker) _subject.OnError(error); }
public void OnCompleted() { lock (_locker) _subject.OnCompleted(); }
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null) throw new ArgumentNullException(nameof(observer));
lock (_locker)
{
if (_subject is ReplaySubject<T> replaySubject)
{
var subject = new Subject<T>();
var subscription = subject.Subscribe(observer);
// Now replay the buffered notifications
replaySubject.Subscribe(subject).Dispose();
replaySubject.Dispose();
_subject = subject;
return subscription;
}
else
return _subject.Subscribe(observer);
}
}
}
This is probably not the most efficient solution, since two different lock
s are acquired on every operation (the _locker
and the internal _gate
), but it
shouldn't be very bad either.