一些原始方法
IObservable<int> obs = Observable.Empty<int>();
IObservable<int> obs = Observable.Return(0);
IObservable<int> obs = Observable.Throw<int>(new Exception());
IObservable<int> obs = Observable.Never<string>();
简单的流
IObservable<long> obs = Observable.Interval(new TimeSpan(0, 0, 1));
IObservable<long> obs = Observable.Timer(DateTimeOffset.Now.AddHours(1));
IObservable<int> obs = Observable.Repeat(1);
IObservable<int> obs = Observable.Range(0, 1);
从异步数据
//从Action或者Func
Observable.Start(() => 1);
//从Task
Task.Factory.StartNew(...).ToObservable();
//从AsyncPattern
Func<int,int,double> sampleFunc = (a,b) => 1d;
Func<int, int, IObservable<double>> funcObs =
Observable.FromAsyncPattern<int, int, double>(sampleFunc.BeginInvoke,
sampleFunc.EndInvoke);
IObservable<double> obs = funcObs(1, 0);
从事件
public event EventHandler<EventArgs> AnEvent;
IObservable<IEvent<EventArgs>> fromEventObs =
Observable.FromEvent<EventArgs>(h => this.AnEvent += h, h=>this.AnEvent -= h);
从已有的集合
IEnumerable<int> ie = new int[] {};
observable = ie.ToObservable();
通过Generate()
通过Create()
-->冷流
IObservable<int> observable = Observable.Create<int>(o =>
{
o.OnNext(1);
o.OnNext(2);
o.OnCompleted();
return () => { };
});
-->热流
List<IObserver<int>> _subscribed = new List<IObserver<int>>();
private CreateHot()
{
observable = Observable.Create<int>(o =>
{
_subscribed.Add(o);
return () => _subscribed.Remove(o);
});
}
private void onNext(int val)
{
foreach (var o in _subscribed)
{
o.OnNext(val);
}
}