我今天遇到了一个有趣的问题.我正在开发一个我们有文件上传的应用程序,我们想要实现一个进度条.该应用程序使用React/Redux/Redux-Observable编写.我想发送上传进度的动作.这是我为实现它所做的:
withProgress(method, url, body = {}, headers = {}) { const progressSubscriber = Subscriber.create(); return { Subscriber: progressSubscriber, Request: this.ajax({ url, method, body, headers, progressSubscriber }), }; }
我有一个类用于制作我的所有ajax请求.使用传入的参数this.ajax
调用Observable.ajax
.
export const blobStorageUploadEpic = (action$) => { return action$.ofType(a.BLOB_STORAGE_UPLOAD) .mergeMap(({ payload }) => { const { url, valetKey, blobId, blobData, contentType } = payload; const { Subscriber, Request } = RxAjax.withProgress('PUT', `${url}?${valetKey}`, blobData, { 'x-ms-blob-type': 'BlockBlob', 'Content-Type': contentType, }); const requestObservable = Request .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } })) .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err })); return Observable.fromSubscriber(Subscriber) .map((e) => ({ percentage: (e.loaded / e.total) * 100 })) .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} })) .merge(requestObservable); }); };
这是我的史诗.我收到了订阅者,我写了一个自定义静态方法Observable
来接收用户.然后我将其与Request
(这是一个Observable
)合并.
Observable.fromSubscriber = function fromSubscriber(externalSubscriber) { return Observable.create((subscriber) => { externalSubscriber.next = (val) => subscriber.next(val); externalSubscriber.error = (err) => subscriber.error(err); externalSubscriber.complete = () => subscriber.complete(); }); };
最后,这是我写的自定义静态方法Observable
.我写这个有两个原因.1.至于其他人有类似的问题处理一个例子(我花了很多时间试图找出如何使一个Observable
从Subscriber
写我自己前)和2.要问这是否是实现这一目标的最佳途径.rxjs
很深,我认为现有的方法可以做到这一点,但我找不到它.
这基本上Subject
就是为了什么,以下应该也适用:
export const blobStorageUploadEpic = (action$) => { return action$.ofType(a.BLOB_STORAGE_UPLOAD) .mergeMap(({ payload }) => { const { url, valetKey, blobId, blobData, contentType } = payload; const progressSubscriber = new Rx.Subject(); const request = Rx.Observable.ajax({ method: 'PUT', url: `${url}?${valetKey}`, body: blobData, headers: { 'x-ms-blob-type': 'BlockBlob', 'Content-Type': contentType, }, progressSubscriber }); const requestObservable = request .map(() => ({ type: a.BLOB_STORAGE_UPLOAD_SUCCESS, payload: { blobId } })) .catch((err) => Observable.of({ type: a.BLOB_STORAGE_UPLOAD_FAILURE, err })); return progressSubscriber .map((e) => ({ percentage: (e.loaded / e.total) * 100 })) .map((percentage) => ({ type: a.BLOB_STORAGE_UPLOAD_PROGRESS, payload: { percentage} })) .merge(requestObservable); }); };
这是一个更通用的例子(live @jsfiddle):
let data = ""; for (let c = 0; c < 100000; ++c) { data += "" + Math.random(); } const progressSubscriber = new Rx.Subject(); const request = Rx.Observable.ajax({ method: 'POST', url: "/echo/json/", body: JSON.stringify({ data }), progressSubscriber }); progressSubscriber .merge(request) .subscribe(console.log);