当前位置: 代码迷 >> 综合 >> RxJS 应用场景1——简化轮询逻辑(将琐碎的异步逻辑抽象为一个输入输出流)
  详细解决方案

RxJS 应用场景1——简化轮询逻辑(将琐碎的异步逻辑抽象为一个输入输出流)

热度:35   发布时间:2024-01-21 08:47:14.0

RxJS 应用场景1——简化轮询逻辑
从实际应用场景中抽出一段逻辑,作为说明。逻辑如下:
调用生成接口,拿到key,5秒后调用进度条接口轮询进度(数据太多),UI渲染进度,当进100时停止轮询。
所以要抽象出一个方法demoX,负责调用生成接口,并轮询进度条接口,最后返回接口响应信息(进度或,接口报错信息)。
整理一下这个方法需要实现的功能:

  1. 收集第一个接口和第二个接口的报错信息并返回
  2. 收集这个方法中出现的错误信息并返回
  3. 调用第一个接口拿到key 延迟5秒后轮询进度接口1秒1次
  4. 假如轮询时接口没有返回,而1秒又有新的轮询产生,丢掉之前的轮询
  5. 当轮询结果返回,方法就返回
  6. 方法中出现报错停止轮询,当进度为100时停止轮询
  7. 方法返回的是一个可观察对象,有新值时会通知订阅者

很显然,原生Promise 不够强大,这个方法要返回一个可观察者对象,它可以发出N个值,至到取消订阅。
demoX这个方法收集了所有报错信息及轮询结果给订阅者,由订阅者来决定后续操作,这些不是我们关心的。

step 1
封装一个_http 方法用来发起http请求:

 import { of, race } from "rxjs";import { delay } from "rxjs/operators";class _http {static post(url: string, data: any) {return race(of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)),ajax.post(baseURL + url, data, {"Content-Type": "application/json;charset=UTF-8",token:xxxxxxxxx}));}}

这是一个简单的封装,封装了请求头,及超时处理,还可以做统一的错误处理,及完成回调等。如下所示:

import { ajax } from "rxjs/ajax";import { of, race } from "rxjs";import { delay, catchError, finalize } from "rxjs/operators";class _http {static post(url: string, data: any) {return race(of({ code: 408, msg: "请求超时!" }).pipe(delay(50000)),ajax.post(baseURL + url, data, {"Content-Type": "application/json;charset=UTF-8",token}).pipe(catchError(({message}:any)=> of({code:'err',msg:message})),finalize(()=>console.log('complete'))));}}

只用简单的封装就可以,

 

demoX(this: any, param: any) {return _http("/api/create", { // step 2 _http 请求create 接口,拿到完整响应,residentialIdList: param}).pipe(map(({ response: rs }: any) => {// step 3 通过map 操作符处理一下响应数据,const { code, msg } = rs;if (code !== 0) {throw Error(msg);  // 对非 0 code 码处理,抛出错误信息,最后由catchError 操作符进行集中处理}return rs.data;}),delay(5000), // 延迟5秒后调用进度接口mergeMap(key => { // step 4 将接口数据(拿到的key)合并到进度请求逻辑中 return interval(1000).pipe( // step5 每1秒产生一个值,switchMap(() => // step 6 抛掉interval发出的值,将流切到内部的请求_http("/api/progress", {// step 7 请求进度接口key: key,type: 2}).pipe(map(({ response: rs }: any) => { // step 8 处理进度接口响应const { code, msg } = rs;if (code !== 0) {   // 对非 0 code值 抛错,由后续catchError 操作符集中处理,throw Error(msg);}return rs;}))));}),catchError(({ message }: any) => { //step 9 集中处理错误信息,返回一个值,包含各种错误信息,可能是接口报错,或是语法报错,或是逻辑报错return of({ code: "err", msg: message });}),takeWhile( // step 10 只有要有错误抛出或当进度为100时,要停止轮询进度接口,并取消相关订阅({ data, code }: any) => code !== 0 || data < 100 || data > this.percent),distinct((o: any) => o.data) // step 11 假如当前值和上次发出的值相同,则不发出新的值,订阅者不会收到);}

现在demoX 封装完闭,下面来调用它:

this.demoX(param).subscribe((rs: any) => {const { code, msg } = rs;if (code !== 0) {alert(msg);return;}this.percent = rs.data;})

这个例子可以看出来,RxJS 可以将琐碎的异步逻辑抽像为一个输入输出流,可以通过各种操作符对流进行加工,最后输出,简洁、高效。
需要注意的是一个输入输出流内部理论上可以包含无数的输入输出流,什么时候把流合到内部,或把内部流切到外部这个根据实际需要调用相应的操作符来操作,具体参见RxJS 官方文档。