DisposePrevious<TSource>(IObservable<TSource>)
|
|
DoWithPrevious<TSource>(IObservable<TSource>, Action<TSource>)
|
|
EnableIf<T>(IObservable<T>, IObservable<bool>)
|
|
EnableIf<T>(IObservable<T>, IObservable<bool>, IObservable<T>)
|
|
ObserveLatestOn<T>(IObservable<T>, IScheduler)
|
https://www.zerobugbuild.com/?p=192
This is a problem that comes up when the UI dispatcher can’t keep up with inbound activity.
the example I saw on a project was a price stream that had to be displayed on the UI that could get very busy.
If more than one new price arrives on background threads in between dispatcher time slices, there is no point displaying anything but the most recent price – in fact we want the UI to “catch up” by dropping all but the most recent undisplayed price.
So the operation is like an ObserveOn that drops all but the most recent events. Here’s a picture of what’s happening – notice how price 2 is dropped and how prices are only published during the dispatcher time slice:
The key idea here is that we keep track of the notification to be pushed on to the target scheduler in pendingNotification – and whenever an event is received, we swap the pendingNotification for the new notification. We ensure the new notification will be scheduled for dispatch on the target scheduler – but we may not need to do this…
If the previousNotification is null we know that either (a) there was no previous notification as this is the first one or (b) the previousNotification was already dispatched. How to we know this? Because in the scheduler action that does the dispatch we swap the pendingNotification for null! So if previousNotification is null, we know we must schedule a new dispatch action.
This approach keeps the checks, locks and scheduled actions to a minimum.
Notes and credits:
I’ve gone round the houses a few times on this implementation – my own attempts to improve it to use CAS rather than lock ran into bugs, so the code below is largely due to Lee Campbell, and edited for RX 2.0 support by Wilka Hudson. For an interesting discussion on this approach see this thread on the official RX forum.
|
RetryWithBackOff<T>(IObservable<T>, Func<Exception, int, TimeSpan?>)
|
|
RetryWithDelay<T>(IObservable<T>, TimeSpan)
|
|
RetryWithDelay<T>(IObservable<T>, TimeSpan, IScheduler)
|
|
SelectAsync<T>(IObservable<T>, Func<T, Task>, IScheduler)
|
|
SelectAsync<T, T1>(IObservable<T>, Func<T, CancellationToken, Task<T1>>)
|
|
SelectAsync<T, T1>(IObservable<T>, Func<T, Task<T1>>)
|
|
SelectAsync<T, T1>(IObservable<T>, Func<T, Task<T1>>, IScheduler)
|
|
SelectSafeOrDefault<TIn, TOut>(IObservable<TIn>, Func<TIn, TOut>)
|
|
SelectSafeOrDefault<TIn, TOut>(IObservable<TIn>, Func<TIn, TOut>, Action<Exception>)
|
|
SelectSafe<TIn, TOut>(IObservable<TIn>, Func<TIn, TOut>, Func<TIn, Exception, TOut>)
|
|
SelectSafe<TIn, TOut, TException>(IObservable<TIn>, Func<TIn, TOut>, Func<TIn, TException, TOut>)
|
|
SelectSafe<TIn, TOut, TException>(IObservable<TIn>, Func<TIn, TOut>, Func<TException, TOut>)
|
|
SelectTo<T, Y>(IObservable<T>, Func<Y>)
|
|
SkipUntil<TSource>(IObservable<TSource>, Func<TSource, bool>)
|
|
StartWithDefault<T>(IObservable<T>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<Task>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<Task>, Action<Exception>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<T, CancellationToken, Task>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<T, CancellationToken, Task>, Action<Exception>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<T, Task>)
|
|
SubscribeAsync<T>(IObservable<T>, Func<T, Task>, Action<Exception>)
|
|
SubscribeSafe<T>(IObservable<T>, Action, Action<Exception>)
|
|
SubscribeSafe<T>(IObservable<T>, Action<T>, Action<Exception>)
|
|
SubscribeSafe<T>(IObservable<T>, Action<T>, Action<Exception>, Action)
|
|
SubscribeSafe<T>(IObservable<T>, Func<Task>, Action<Exception>)
|
|
SubscribeSafe<T>(IObservable<T>, Func<T, Task>, Action<Exception>)
|
|
SubscribeToErrors<T>(IObservable<T>, Action<Exception>)
|
|
Subscribe<T>(IObservable<T>, Action)
|
|
Subscribe<T>(IObservable<T>, Action, Action<Exception>)
|
|
SwitchIfNotDefault<TIn, TOut>(IObservable<TIn>, Func<TIn, IObservable<TOut>>)
|
|
SwitchIfNotDefault<TIn, TOut>(IObservable<TIn>, Func<TIn, IObservable<TOut>>, Func<IObservable<TOut>>)
|
|
SwitchIf<TIn, TOut>(IObservable<TIn>, Predicate<TIn>, Func<TIn, IObservable<TOut>>)
|
|
SwitchIf<TIn, TOut>(IObservable<TIn>, Predicate<TIn>, Func<TIn, IObservable<TOut>>, Func<TIn, IObservable<TOut>>)
|
|
SwitchLatestAsync<TSource, TResult>(IObservable<TSource>, Func<TSource, CancellationToken, Task<TResult>>)
|
|
Synchronize<T>(IObservable<T>, NamedLock)
|
|
ToProperty<TSourceProperty>(IObservable<TSourceProperty>)
|
|
ToProperty<TSource, TSourceProperty>(IObservable<TSourceProperty>, out ObservableAsPropertyHelper<TSourceProperty>, TSource, Expression<Func<TSource, TSourceProperty>>, IScheduler)
|
|
ToProperty<TSource, TSourceProperty>(IObservable<TSourceProperty>, TSource, Expression<Func<TSource, TSourceProperty>>, IScheduler)
|
|
ToUnit<T>(IObservable<T>)
|
|
WithExpirationTime<T>(IObservable<T>, TimeSpan, Func<T, T>)
|
Allows to attach TTL to each value of the stream. If next value is not produced in expected time slice, fallback value is sent into stream as replacement until next value is propagated.
|
WithPrevious<TSource>(IObservable<TSource>)
|
|
WithPrevious<TSource, TResult>(IObservable<TSource>, Func<TSource, TSource, TResult>)
|
|