我们正在使用Rx来监控Silverlight应用程序中的活动,以便我们可以在一段时间不活动后向用户显示消息.
我们将事件(鼠标移动等)转换为可观察对象,然后将可观察对象合并在一起以创建单个(allActivity)可观察对象.然后,我们使用timepan限制allActivity observable,并且当系统在一段时间内处于非活动状态时,会预订某些通知.
如何在订阅之后向此添加新的observable/sequence(以便订阅选择此选项而不取消订阅和重新订阅).
例如,将几个序列合并在一起,节流,订阅.现在为已订阅的observable添加一个额外的序列.
示例代码:
private IObservableallActivity; public void CreateActivityObservables(UIElement uiElement) { // Create IObservables of event types we are interested in and project them as DateTimes // These are our observables sequences that can push data to subscribers/ observers // NB: These are like IQueryables in the sense that they do not iterate over the sequence just provide an IObservable type var mouseMoveActivity = Observable.FromEventPattern (h => uiElement.MouseMove += h, h => uiElement.MouseMove -= h) .Select(o => DateTime.Now); var mouseLeftButtonActivity = Observable.FromEventPattern (h => uiElement.MouseLeftButtonDown += h, h => uiElement.MouseLeftButtonDown -= h) .Select(o => DateTime.Now); var mouseRightButtonActivity = Observable.FromEventPattern (h => uiElement.MouseRightButtonDown += h, h => uiElement.MouseRightButtonDown -= h) .Select(o => DateTime.Now); var mouseWheelActivity = Observable.FromEventPattern (h => uiElement.MouseWheel += h, h => uiElement.MouseWheel -= h) .Select(o => DateTime.Now); var keyboardActivity = Observable.FromEventPattern (h => uiElement.KeyDown += h, h => uiElement.KeyDown -= h) .Select(o => DateTime.Now); var streetViewContainer = HtmlPage.Document.GetElementById("streetViewContainer"); var mouseMoveHandler = new EventHandler (this.Moo); bool b = streetViewContainer.AttachEvent("mousemove", mouseMoveHandler); var browserActivity = Observable.FromEventPattern (h => this.MyMouseMove += h, h => this.MyMouseMove -= h).Select(o => DateTime.Now); // Merge the IObservables together into one stream/ sequence this.allActivity = mouseMoveActivity.Merge(mouseLeftButtonActivity) .Merge(mouseRightButtonActivity) .Merge(mouseWheelActivity) .Merge(keyboardActivity) .Merge(browserActivity); } public IDisposable Subscribe(TimeSpan timeSpan, Action timeoutAction) { IObservable timeoutNotification = this.allActivity.Merge (IdleTimeoutService.GetDateTimeNowObservable()) .Throttle(timeSpan) .ObserveOn(Scheduler.ThreadPool); return timeoutNotification.Subscribe(timeoutAction); }
小智.. 17
Merge有一个重载,它接受一个IObservable
var xss = new Subject>(); xss.Merge().Subscribe(x => Console.WriteLine(x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x)); ...
Gideon Engel.. 5
最简单的方法是使用中间主题代替Merge
调用.
SubjectallActivities = new Subject (); var activitySubscriptions = new CompositeDisposable(); activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities)); activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities)); //etc ... //subscribe to activities allActivities.Throttle(timeSpan) .Subscribe(timeoutAction); //later add another activitySubscriptions.Add(newActivity.Subscribe(allActivities));
Subject
如果接收到任何OnError或OnCompleted ,该类将停止从订阅的任何可观察对象传递OnNext(以及进一步的OnError和OnCompleted)事件.
此方法与您的示例之间的主要区别在于,它在创建主题时订阅所有事件,而不是在订阅合并的observable时订阅.由于您示例中的所有可观察对象都很热,因此差异不应太明显.
Merge有一个重载,它接受一个IObservable
var xss = new Subject>(); xss.Merge().Subscribe(x => Console.WriteLine(x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x)); xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x)); ...
最简单的方法是使用中间主题代替Merge
调用.
SubjectallActivities = new Subject (); var activitySubscriptions = new CompositeDisposable(); activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities)); activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities)); //etc ... //subscribe to activities allActivities.Throttle(timeSpan) .Subscribe(timeoutAction); //later add another activitySubscriptions.Add(newActivity.Subscribe(allActivities));
Subject
如果接收到任何OnError或OnCompleted ,该类将停止从订阅的任何可观察对象传递OnNext(以及进一步的OnError和OnCompleted)事件.
此方法与您的示例之间的主要区别在于,它在创建主题时订阅所有事件,而不是在订阅合并的observable时订阅.由于您示例中的所有可观察对象都很热,因此差异不应太明显.