Rxjs 入门

Rxjs 入门
0

Rxjs 入门

什么是 Observable

一个 observable 代表一个流,或者是可以随时间推移到达的数据源。你可以从任何东西创建一个 observable, 但是在 RxJS 里最常用的方式是从事件来构建。这些事件可以是鼠标移动、按钮点击、输入文本甚或是路由的改变。最简单地创建一个 observable 的方式是用它的创建函数。举个例子,我们可以用 fromEvent 辅助函数来创建一个鼠标点击事件的 observable:

// import the fromEvent operator
import { fromEvent } from 'rxjs';

// grab button reference
const button = document.getElementById('myButton');

// create an observable of button clicks
const myObservable = fromEvent(button, 'click');

这样我们就有了一个 observable, 但是它不会做任何事情。这是因为 observables 是冷的,没有激活动作(就像链接事件监听),直到有。。。

Subscription

Subscription 是让一切动起来的原因。你可以想象它像一个水龙头,你有水流准备被打开(observable),但是需要一个人去打开水龙头。对于 observables,这个角色就是 subscriber

你可以使用 subscribe 方法来创建一个 subscription, 并为它提供一个名叫 observer 的函数(或者对象)。这样你就可以决定如何去**响应(式编程)**每个事件。让我们给之前的场景创建一个 subscription:

// import the fromEvent operator
import { fromEvent } from 'rxjs';

// grab button reference
const button = document.getElementById('myButton');

// create an observable of button clicks
const myObservable = fromEvent(button, 'click');

// for now, let's just log the event on each click
const subscription = myObservable.subscribe(event => console.log(event));

在上面的例子中,调用 myObservable.subscribe()将会:

  1. 给按钮上的点击事件设置一个监听器。
  2. 为每个点击事件,调用我们传入到 subscribe 方法中的函数(observer)。
  3. 返回的 subscription 对象中包含一个 unsubscribe 清理逻辑方法,就像去除事件监听器一样。

subscribe 方法还可以接受一个对象来处理错误或者完成的情况。你可能不会使用这种方式,但是需要了解这种方式:

// instead of a function, we will pass an object with next, error, and complete methods
const subscription = myObservable.subscribe({
  // on successful emissions
  next: event => console.log(event),
  // on errors
  error: error => console.log(error),
  // called once on completion
  complete: () => console.log('complete!')
});

注意每个 subscription 都将会分别创建一个执行上下文。这意味着第二次调用 subscribe 将会创建一个新的时间监听器:

// addEventListener called
const subscription = myObservable.subscribe(event => console.log(event));

// addEvent listener called again!
const secondSubscription = myObservable.subscribe(event => console.log(event));

// clean up with unsubscribe
subscription.unsubscribe();
secondSubscription.unsubscribe();

默认情况下 ,一个 subscritpion 会在 observableobserver 之间创建出一对一的单边会话。这就像你的老板(observable)叫(emitting)你(observer)合并一个 PR 一样。这是一个单播的形式。如果你想要一个会议的场景 —— 一个 observable, 许多 observers —— 你应当采用包括广播的 Subjects方式(无论是显式的还是隐式的)。这将会在后面讨论。

值得注意的是,当我们讨论一个 Observable 源向 observers 发送数据的时候,都是推送的模式。Observable 源并不关心 subscribers 对数据做了什么,它仅仅简单地推送了它。

在流上面处理事件很不错,但它仅仅在事件自身上有用,然而让 RxJS 成为 “事件的 lodash” 是。。。

Operators

Operators 提供了方法,用于操作源中的值,返回改变过值的 observable 。RxJS 中的许多 oprerators 看起来和 JavaScript 中 Array 的方法类似。举个例子,如果你想要从 observable 源中改变发出的值,你可以使用 map

// import the from operator
import { of } from 'rxjs';
/*
 *  'of' allows you to deliver values in a sequence
 *  In this case, it will emit 1,2,3,4,5 in order.
 */
const dataSource = of(1, 2, 3, 4, 5);

// subscribe to our source observable
const subscription = dataSource
  .pipe(
    // add 1 to each emitted value
    map(value => value + 1)
  )
  // log: 2, 3, 4, 5, 6
  .subscribe(value => console.log(value));

或者说你想要过滤出特别的值,你可以使用 filter

// import the from operator
import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

const dataSource = from(1, 2, 3, 4, 5);

// subscribe to our source observable
const subscription = dataSource
  .pipe(
    // only accept values 2 or greater
    filter(value => value >= 2)
  )
  // log: 2, 3, 4, 5
  .subscribe(value => console.log(value));

在实践中,如果你需要解决一个问题,那么一般情况下都会有一个 operator 来解决。在你开始 RxJS 的旅程的时候,会有大量的 operators ,你可以从其中一小部分开始了解。假以时日,当复杂的场景来临时,你会欣赏这个充满弹性的 operator 库的。

在上面的例子中你可能注意到,operators 是存在在。。。

Pipe

pipe 函数是 observable 数据源通过 operators 的装配线。就像工厂中的原始材料在变成成品之前,需要通过一系列步骤一样,源数据可以通过 pipe —— 一系列 operators 来操作,过滤或者变型数据来满足你的需求。使用 pipe 函数在 observable 链上组合了5个以上的 operators 是很常见的:

// observable of values from a text box, pipe chains operators together
inputValue
  .pipe(
    // wait for a 200ms pause
    debounceTime(200),
    // if the value is the same, ignore
    distinctUntilChanged(),
    // if an updated value comes through while request is still active cancel previous request and 'switch' to new observable
    switchMap(searchTerm => typeaheadApi.search(term))
  )
  // create a subscription
  .subscribe(results => {
    // update the dom
  });

但是怎么知道在你的程序中需要哪个 operator 呢?

Operators 可以分为几个常见的类型

寻找正确的 operator 的第一步是找到相关的分类。想要从源中过滤数据?找一找 filtering 相关的 operators。想要追踪 bug ,或者在你的 observable 流中进行 debug?utility operators 将会很有作用。operator 的分类包括。。。

创建 operators

这些 operators 让你可以从任何事物创建 observable。从通用到特殊的用途,你可以随意将任意东西变成流。

举个例子,假如我们创建了滚动条,让用户可以混动查看文章。我们可以将滚动事件通过 fromEvent 操作符变成流:

fromEvent(scrollContainerElement, 'scroll')
  .pipe(
    // we will discuss cleanup strategies like this in future article
    takeUntil(userLeavesArticle)
  )
  .subscribe(event => {
    // calculate and update DOM
  });

常用的创建 opratros 包括 offromfromEvent

组合 operators

组合 operators 允许连接来自多个 observables 的信息。发出的值的顺序,时间或者结构是这些 operators 主要的差别。

举个例子,我们可以组合订阅来自多个数据源的更新来进行计算:

// give me the last emitted value from each source, whenever either source emits
combineLatest(sourceOne, sourceTwo).subscribe(
  ([latestValueFromSourceOne, latestValueFromSourceTwo]) => {
    // perform calculation
  }
);

常用的组合操作符有:combineLatestconcatmergestartWithwithLatestFrom

错误处理 operators

错误处理 operators 在错误发生的时候,提供了优雅有效的错误处理不执行重试。

举个例子,我们可以使用 carchError 来安全地处理网络请求失败:

source
  .pipe(
    mergeMap(value => {
      return makeRequest(value).pipe(
        catchError(handleErrorByReturningObservable)
      );
    })
  )
  .subscribe(value => {
    // take action
  });

常用的错误处理 operatorscatchError

过滤 operators

过滤 operators 提供了选择(或拒绝)来自 observable 源的数据,并重新构建值的流的技术。

举个例子,我们可以使用 take operator 来获取从源中最先发出的5个值:

source.pipe(take(5)).subscribe(value => {
  // take action
});

最常用的过滤 operatorsdebounceTimedistinctUntilChangedfiltertake, 和 takeUntil

多播 operators

在 RxJS 中,observables 是冷的,并且默认是单播的(每个 subscriber 只有一个源)。这些 operators 可以让 observable 变热,或者成为多播的,允许多个 subscribers 共享副作用。

举个例子,我们想要最后一个 subscribers 进行共享,并从源中获取到最后发出的值:

const source = data.pipe(shareReplay());

const firstSubscriber = source.subscribe(value => {
  // perform some action
});

// sometime later...

// second subscriber gets last emitted value on subscription, shares execution context with 'firstSubscriber'
const secondSubscriber = source.subscribe(value => {
  // perform some action
});

常用的多播 operatorshareReplay

转换 operators

通过 operator 链来转换通过其中的值是一个常见的任务。这些 operators 提供了转换的技术,几乎可以解决你遇到的所有情况。

举个例子,我们可能想要随着时间的推移,从源中累加状态对象,就像 Redux

source
  .pipe(
    scan((accumulatedState, currentState) => {
      return { ...accumulatedState, ...currentState };
    })
  )
  .subscribe();

最常用的转换 operators 有: concatMapmapmergeMapscan, 和 switchMap

Operators 共有的行为

不同分类的 operators 通常会有一些共有的行为。识别这些通用的行为,你可以试试 ‘choose your own operator’ tree’

例如,大量的 operators 可以组合成。。。

展平 operators

operators 管理内部 observable 中的 subscription ,并将这些值发送到一个 observable 源中。一个常用的例子是在 observable 汇总操作 HTTP 请求或者基于 promise 的 API,但这仅仅是冰山一角:

fromEvent(button, 'click')
  .pipe(
    mergeMap(value => {
      // this 'inner' subscription is managed by mergeMap, with response value emitted to observer
      return makeHttpRequest(value);
    })
  )
  .subscribe(response => {
    // do something
  });

我们可以将展平 operators 划分出常见的行为,如。。。

switch operators

就像灯的开关一样, switch operators 将会关闭(unsubscribe)当前的 observable 并且从源开启一个新的 observable 。 Switch operators 适用于你不想同时获得多个热的 observable 时的情况。

inputValueChanges
  // only the last value is important, if new value comes through cancel previous request / observable
  .pipe(
    // make GET request for data
    switchMap(requestObservable)
  )
  .subscribe();

Switch operators 包括 swirchAllswitchMap 以及 switchMapTo

concat operators

如图 ATM 机一样,下一笔交易不能在上一笔交易完成之前进行。在 observable 中,让同一时间只有一个 subscription 会按顺序,在上一个完成之后发生。在执行顺序很重要的时候,这个操作符就很有用了:

concat(
  firstObservable,
  // will begin when 'firstObservable` completes
  secondObservable,
  // will begin when 'secondObservable` completes
  thirdObservable
).subscribe(values => {
  // take action
});


Concat operators 包括 concatconcatAllconcatMap, 和 concatMapTo

merge operators

如同合并车道一样,merge operators 支持将多个热的 observables 以先到先得的方式并入一个车道。Merge operators 用于当你想从多个源触发事件的时候:

merge(firstObservable, secondObservable)
  // any emissions from first or second observable as they occur
  .pipe(mergeMap(saveActivity))
  .subscribe();

常用的 Merge operators 包括 mergemergeMapmergeMapTomergeAll

Oprators 其他的类似之处

还有一些 operators 做了些类似的操作,但更灵活的操作。比如说,当达到特定条件的时候 unsubscribing ,可以使用:

  1. take 取得 x 个值
  2. takeLast 只去最后一个值
  3. takeWhile 提供一个表达式来取某些值
  4. takeUntile 当需要 source 保持激活直到另外一个 source 触发。

尽管 RxJS 中的 operators 不计其数,但是这些常用的行为和模式可以让我们快速地学习 RxJS。

对我们的影响

当你对通过 Observables 来进行编程越来越熟悉,你可以通过 observable 流来将所有异步行为组织起来。这是一个简单并且灵活地处理复杂行为的方式。

举个例子,假设我们想要在用户回答问题之后,发出保存用户活动的请求。我们一开始的实现可以用 mergeMap operator,它会在每个事件之上发出保存请求:

const formEvents = fromEvent(formField, 'click');
const subscription = formEvents
  .pipe(
    map(convertToAppropriateValue),
    mergeMap(saveRequest)
  )
  .subscribe();

之后,我们需要确认保存的顺序。用于上面操作符的知识,我们可以简单地将 mergeMap 操作符替换成 concatMap 操作符就好了:

const formEvents = fromEvent(formField, 'click');
const subscription = formEvents
  .pipe(
    map(convertToAppropriateValue),
    // now the next request won't begin until the previous completes
    concatMap(saveRequest)
  )
  .subscribe();

这个小小的改变将我们的事件请求变成了有序地请求,但这只是强大功能的一点点应用而已!