当前位置:  开发笔记 > 编程语言 > 正文

订阅后添加可观察序列

如何解决《订阅后添加可观察序列》经验,为你挑选了2个好方法。

我们正在使用Rx来监控Silverlight应用程序中的活动,以便我们可以在一段时间不活动后向用户显示消息.

我们将事件(鼠标移动等)转换为可观察对象,然后将可观察对象合并在一起以创建单个(allActivity)可观察对象.然后,我们使用timepan限制allActivity observable,并且当系统在一段时间内处于非活动状态时,会预订某些通知.

如何在订阅之后向此添加新的observable/sequence(以便订阅选择此选项而不取消订阅和重新订阅).

例如,将几个序列合并在一起,节流,订阅.现在为已订阅的observable添加一个额外的序列.

示例代码:

private IObservable allActivity;
public void CreateActivityObservables(UIElement uiElement)
{
    // Create IObservables of event types we are interested in and project them as DateTimes
    // These are our observables sequences that can push data to subscribers/ observers 
    // NB: These are like IQueryables in the sense that they do not iterate over the sequence just provide an IObservable type
    var mouseMoveActivity = Observable.FromEventPattern(h => uiElement.MouseMove += h, h => uiElement.MouseMove -= h)
                                      .Select(o => DateTime.Now);

    var mouseLeftButtonActivity = Observable.FromEventPattern(h => uiElement.MouseLeftButtonDown += h, h => uiElement.MouseLeftButtonDown -= h)
                                            .Select(o => DateTime.Now);

    var mouseRightButtonActivity = Observable.FromEventPattern(h => uiElement.MouseRightButtonDown += h, h => uiElement.MouseRightButtonDown -= h)
                                             .Select(o => DateTime.Now);

    var mouseWheelActivity = Observable.FromEventPattern(h => uiElement.MouseWheel += h, h => uiElement.MouseWheel -= h)
                                       .Select(o => DateTime.Now);

    var keyboardActivity = Observable.FromEventPattern(h => uiElement.KeyDown += h, h => uiElement.KeyDown -= h)
                                     .Select(o => DateTime.Now);

    var streetViewContainer = HtmlPage.Document.GetElementById("streetViewContainer");
        var mouseMoveHandler = new EventHandler(this.Moo);
        bool b = streetViewContainer.AttachEvent("mousemove", mouseMoveHandler);

    var browserActivity = Observable.FromEventPattern(h => this.MyMouseMove += h, h => this.MyMouseMove -= h).Select(o => DateTime.Now);

    // Merge the IObservables together into one stream/ sequence
    this.allActivity = mouseMoveActivity.Merge(mouseLeftButtonActivity)
                                        .Merge(mouseRightButtonActivity)
                                        .Merge(mouseWheelActivity)
                                        .Merge(keyboardActivity)
                                        .Merge(browserActivity);
}

public IDisposable Subscribe(TimeSpan timeSpan, Action timeoutAction)
{
    IObservable timeoutNotification = this.allActivity.Merge   (IdleTimeoutService.GetDateTimeNowObservable())
                                                                .Throttle(timeSpan)
                                                                    .ObserveOn(Scheduler.ThreadPool);

    return timeoutNotification.Subscribe(timeoutAction);
}

小智.. 17

Merge有一个重载,它接受一个I​​Observable >.将外部序列设为Subject >,并在想要向该组添加另一个源时调用OnNext.Merge运算符将接收源并订阅它:

var xss = new Subject>();
xss.Merge().Subscribe(x => Console.WriteLine(x));

xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x));
...


Gideon Engel.. 5

最简单的方法是使用中间主题代替Merge调用.

Subject allActivities = new Subject();
var activitySubscriptions = new CompositeDisposable();

activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities));
activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities));
//etc ...

//subscribe to activities
allActivities.Throttle(timeSpan)
             .Subscribe(timeoutAction);

//later add another
activitySubscriptions.Add(newActivity.Subscribe(allActivities));

Subject如果接收到任何OnError或OnCompleted ,该类将停止从订阅的任何可观察对象传递OnNext(以及进一步的OnError和OnCompleted)事件.

此方法与您的示例之间的主要区别在于,它在创建主题时订阅所有事件,而不是在订阅合并的observable时订阅.由于您示例中的所有可观察对象都很热,因此差异不应太明显.



1> 小智..:

Merge有一个重载,它接受一个I​​Observable >.将外部序列设为Subject >,并在想要向该组添加另一个源时调用OnNext.Merge运算符将接收源并订阅它:

var xss = new Subject>();
xss.Merge().Subscribe(x => Console.WriteLine(x));

xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.0)).Select(x => 23 + 8 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(0.8)).Select(x => 17 + 3 * (int)x));
xss.OnNext(Observable.Interval(TimeSpan.FromSeconds(1.3)).Select(x => 31 + 2 * (int)x));
...



2> Gideon Engel..:

最简单的方法是使用中间主题代替Merge调用.

Subject allActivities = new Subject();
var activitySubscriptions = new CompositeDisposable();

activitySubscriptions.Add(mouseMoveActivity.Subscribe(allActivities));
activitySubscriptions.Add(mouseLeftButtonActivity.Subscribe(allActivities));
//etc ...

//subscribe to activities
allActivities.Throttle(timeSpan)
             .Subscribe(timeoutAction);

//later add another
activitySubscriptions.Add(newActivity.Subscribe(allActivities));

Subject如果接收到任何OnError或OnCompleted ,该类将停止从订阅的任何可观察对象传递OnNext(以及进一步的OnError和OnCompleted)事件.

此方法与您的示例之间的主要区别在于,它在创建主题时订阅所有事件,而不是在订阅合并的observable时订阅.由于您示例中的所有可观察对象都很热,因此差异不应太明显.

推荐阅读
喜生-Da
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有