Use when writing, editing, or reviewing C# code that uses System.Reactive (Rx.NET); working with `IObservable`, `IObserver`, `Subject`, `BehaviorSubject`, `ReplaySubject`, or `AsyncSubject`; composing operators like `Select`, `Where`, `Merge`, `Switch`, `Publish`, `RefCount`, `Catch`, `Retry`, `ObserveOn`, or `SubscribeOn`; managing subscription lifecycle with `CompositeDisposable`, `SerialDisposable`, or `DisposeWith`; testing time-dependent reactive code with `TestScheduler` from `Microsoft.Reactive.Testing`; designing hot vs cold observables or enforcing the Rx serial-notification contract. Triggers on phrases like "observable stream", "reactive pipeline", "Rx.NET", "subject leak", "OnNext on wrong thread", "test scheduler", or edits touching files that import `System.Reactive`. Do NOT use for plain `IEnumerable` LINQ, `IAsyncEnumerable`, or TPL `Task`-based async.
Patterns, anti-patterns, and decision guidance for System.Reactive (Rx.NET) in C#. Covers interface design, subject usage, subscription lifecycle, error handling, and testing.
Expose IObservable<T> as a property; never inherit from it.
Classes should not implement IObservable<T> directly. Expose a property instead, enabling
consumers to compose operators before subscribing.
public class SensorReadings
{
private readonly Subject<double> _readings = new();
// Wrap with AsObservable() to prevent downcasting to Subject
public IObservable<double> Readings => _readings.AsObservable();
internal void Report(double value) => _readings.OnNext(value);
}
Key rules:
AsObservable() on subjects before exposing them. This prevents consumers from
calling OnNext/OnError/OnCompleted.IObservable<T>. Return
or instead.Observable.Empty<T>()Observable.Never<T>()IObservable<T> or IObserver<T> yourself. Use Observable.Create or
subjects.Subjects are "mutable variables of the Rx world" (Erik Meijer). Use them only to generate a hot observable imperatively from a local source with no direct external observable to adapt.
Use Observable.Create when:
IDisposableUse a Subject when:
If the source is an existing observable and you want hot behavior, use Publish/RefCount instead
of piping through a subject.
| Type | Replays | Seed | Use when |
|---|---|---|---|
Subject<T> | None | No | Fire-and-forget events; no history needed |
BehaviorSubject<T> | Last (1) | Yes | Property-change semantics; always has current value |
ReplaySubject<T> | N or all | No | Late subscribers need historical values |
AsyncSubject<T> | Last | No | Single async result (like Task<T>) |
BehaviorSubject<T> requires a seed value and immediately pushes it to new subscribers. Prefer it
for "current state" scenarios.
ReplaySubject<T> can replay a bounded window (new ReplaySubject<T>(bufferSize)) or a time
window. Always bound the buffer to avoid unbounded memory growth.
Subjects are implementation details. Expose only IObservable<T> publicly:
private readonly BehaviorSubject<int> _count = new(0);
public IObservable<int> Count => _count.AsObservable();
Exposing .Value / .Current from a BehaviorSubject<T> publicly introduces imperatively-accessed
shared state, undermining the reactive model. Ben Lesh (RxJS lead): "using getValue() is a huge
code smell... you're doing something imperative in declarative paradigm."
Acceptable: Internal read-modify-write within the owning class (e.g.
_subject.OnNext(_subject.Value + 1)).
Avoid: Public .Value properties that let consumers pull state instead of subscribing.
Alternatives:
Scan within the observable chain for stateful accumulation.Cold: Each Subscribe triggers independent execution. Factory methods like Observable.Create,
Observable.Defer, Observable.Timer produce cold observables.
Hot: All subscribers share a single underlying source. Subjects are inherently hot. Use
Publish/RefCount to share a cold source:
IObservable<long> shared = Observable
.Interval(TimeSpan.FromSeconds(1))
.Publish()
.RefCount();
Publish().RefCount() connects on first subscriber and disconnects when the last unsubscribes. Use
Publish().AutoConnect(n) if you need to wait for n subscribers before connecting.
Every Subscribe returns an IDisposable. Failing to dispose leaks subscriptions and can leak the
subscriber (prevented from GC by the observable's reference to it).
| Type | Behavior |
|---|---|
CompositeDisposable | Groups multiple disposables; disposes all together. |
SerialDisposable | Holds one at a time; setting a new one disposes previous. |
SingleAssignmentDisposable | Assigned once; throws on second assignment. |
MultipleAssignmentDisposable | Like Serial but does NOT dispose previous on reassignment. |
CancellationDisposable | Bridges CancellationTokenSource to IDisposable. |
RefCountDisposable | Prevents disposal until all dependents dispose. |
BooleanDisposable | Tracks IsDisposed state. |
Disposable.Empty | No-op; useful as default or from Observable.Create. |
Disposable.Create(action) | Runs action on first Dispose call (idempotent). |
Collect subscriptions with CompositeDisposable and dispose in one call:
private readonly CompositeDisposable _subscriptions = new();
public void Initialize()
{
source1.Subscribe(HandleItem1).DisposeWith(_subscriptions);
source2.Subscribe(HandleItem2).DisposeWith(_subscriptions);
}
public void Dispose() => _subscriptions.Dispose();
DisposeWith is in the System.Reactive.Disposables.Fluent namespace (available since Rx.NET 6.1).
Use SerialDisposable when replacing one subscription with another:
private readonly SerialDisposable _current = new();
public void SwitchSource(IObservable<int> newSource)
{
// Disposes previous subscription automatically
_current.Disposable = newSource.Subscribe(Handle);
}
Only dispose subscriptions early if you need to unsubscribe before the observable completes. Finite
sequences (Observable.Return, .Take, etc.) clean up on completion.
An OnError notification terminates the sequence. Subscribers that don't provide an OnError
handler will have the exception thrown on the calling thread.
Always provide an OnError handler in Subscribe.
| Operator | Behavior |
|---|---|
Catch<TSource, TException>(handler) | Catches typed exception; returns fallback observable. |
Catch(fallback1, fallback2, ...) | On error, moves to next sequence. |
Retry() / Retry(count) | Resubscribes on error. Bound the count. |
OnErrorResumeNext(next) | Continues with next on error OR completion. |
Finally(action) | Runs on completion, error, or disposal. |
Prefer typed Catch<TSource, TException> over untyped variants to avoid swallowing unexpected