当前位置:  开发笔记 > 编程语言 > 正文

在rxjs中实现fromSubscriber

如何解决《在rxjs中实现fromSubscriber》经验,为你挑选了1个好方法。

我今天遇到了一个有趣的问题.我正在开发一个我们有文件上传的应用程序,我们想要实现一个进度条.该应用程序使用React/Redux/Redux-Observable编写.我想发送上传进度的动作.这是我为实现它所做的:

withProgress(method, url, body = {}, headers = {}) {
    const progressSubscriber = Subscriber.create();

    return {
        Subscriber: progressSubscriber,
        Request:    this.ajax({ url, method, body, headers, progressSubscriber }),
    };
}

我有一个类用于制作我的所有ajax请求.使用传入的参数this.ajax调用Observable.ajax.

export const blobStorageUploadEpic = (action$) => {
    return action$.ofType(a.BLOB_STORAGE_UPLOAD)
    .mergeMap(({ payload }) => {
        const { url, valetKey, blobId, blobData, contentType } = payload;

        const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, {
            'x-ms-blob-type': 'BlockBlob',
            'Content-Type':   contentType,
        });

        const requestObservable = Request
        .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
        .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));

        return Observable.fromSubscriber(Subscriber)
        .map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
        .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
        .merge(requestObservable);
    });
};

这是我的史诗.我收到了订阅者,我写了一个自定义静态方法Observable来接收用户.然后我将其与Request(这是一个Observable)合并.

Observable.fromSubscriber = function fromSubscriber(externalSubscriber) {
    return Observable.create((subscriber) => {
        externalSubscriber.next =     (val) => subscriber.next(val);
        externalSubscriber.error =    (err) => subscriber.error(err);
        externalSubscriber.complete = () => subscriber.complete();
    });
};

最后,这是我写的自定义静态方法Observable.我写这个有两个原因.1.至于其他人有类似的问题处理一个例子(我花了很多时间试图找出如何使一个ObservableSubscriber写我自己前)和2.要问这是否是实现这一目标的最佳途径.rxjs很深,我认为现有的方法可以做到这一点,但我找不到它.



1> olsn..:

这基本上Subject就是为了什么,以下应该也适用:

export const blobStorageUploadEpic = (action$) => {
    return action$.ofType(a.BLOB_STORAGE_UPLOAD)
    .mergeMap(({ payload }) => {
        const { url, valetKey, blobId, blobData, contentType } = payload;

        const progressSubscriber = new Rx.Subject();
        const request = Rx.Observable.ajax({
            method: 'PUT',
            url: `${url}?${valetKey}`,
            body: blobData,
            headers: {
                'x-ms-blob-type': 'BlockBlob',
                'Content-Type':   contentType,
            },
            progressSubscriber
        });

        const requestObservable = request
            .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } }))
            .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err }));

        return progressSubscriber
            .map((e) => ({ percentage: (e.loaded / e.total) * 100 }))
            .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} }))
            .merge(requestObservable);
    });
};

这是一个更通用的例子(live @jsfiddle):

let data = "";
for (let c = 0; c < 100000; ++c) {
    data += "" + Math.random();
}

const progressSubscriber = new Rx.Subject();
const request = Rx.Observable.ajax({
  method: 'POST',
  url: "/echo/json/",
  body: JSON.stringify({ data }),
  progressSubscriber
});

progressSubscriber
  .merge(request)
  .subscribe(console.log);

推荐阅读
kikokikolove
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有