Li Mei

个人技术博客

嗨,我是李梅,一名前端开发者。


主要是记录个人对技术的理解和开发过程中遇到的问题,欢迎了解更多。

RxJS:四种 Subject 的用法和区别

在 RxJS 中有四种 Subject 分别是:Subject,BehaviorSubject,AsyncSubject,ReplaySubject;这四种 Subject 都是特殊的 Observable。

在介绍它们之前,我们先来看下这四种 Subject 与普通 Observable 的区别:

Observable Subject BehaviorSubject AsyncSubject ReplaySubject
Cold Hot Hot Hot Hot
数据生成者 数据生成者和消费者 数据生成者和消费者 数据生成者和消费者 数据生成者和消费者
单播 多播 多播 多播 多播
每次从头开始把值发给观察者 将值多播给已注册监听该 Subject 的观察者们 把最后一个值(当前值)发送给观察者(需要初始值) 执行的最后一个值发送给观察者相当于last() 可以把之前的值发送给观察者(错过的值)

Cold vs Hot

在文章【RxJS:Cold vs Hot Observables】里详细介绍了 Cold Observables 与 Hot Observables 的区别,除了 Observable 之外,这四种 Subject 都是 Hot Observables。

数据生产者 vs 数据消费者

  • 数据生产者是指 Observable(可观察对象),产生数据的一方。

  • 数据消费者是指 Observers(观察者),接收数据的一方。

普通的 Observable 只是数据生产者,发送数据。而 Subject,BehaviorSubject,AsyncSubject 和 ReplaySubject 即是生产者又是消费者。比如在 Angular项目中,我们经常用 Subject 在多个 component 中共享数据:

// messageService.ts
private shareData: Subject<string> = new Subject<string>();

sendData(value: string) {
    this.shareData.next(value);
}
getData(): Observable<string> {
    return this.shareData.asObservable();
}

// RxjsSubjectBComponent
// send data from RxjsSubjectBComponent
  this.messageService.sendData("this message from subject b");
// RxjsSubjectCComponent
// get data in RxjsSubjectCComponent
this.subscription = this.messageService.getData()
    .subscribe(val => {
        this.message = val;
        console.log(val);
    });

在这两个 component 中通过定义在 messageService 的 Subject 共享数据,通过 next() 方法使得 Subject 是数据生产者(可观察对象),通过 asObservable() 使得 Subject 是数据消费者(Observers)。

单播 vs 多播

在上面的代码例子中,不仅是在 component c 可以订阅拿到数据,在其他 component 中也可以订阅拿到数据。比如在 RxjsSubjectAComponent 中定义如下代码:

// RxjsSubjectAComponent
// get data in RxjsSubjectAComponent
this.subscription = this.messageService.getData()
    .subscribe(val => {
        console.log("this is in componet A and " + val);
    })

从 RxjsSubjectBComponent 发出值,在 RxjsSubjectCComponent 和 RxjsSubjectAComponent 能拿到同样的值,这个就是多播。

普通的 Observables 是 Cold Observables 并且是单播的,其他四种 Subject 是 Hot Observables 并且是多播的。

单播的意思是,每个普通的 Observables 实例都只能被一个观察者订阅,当它被其他观察者订阅的时候会产生一个新的实例。也就是普通 Observables 被不同的观察者订阅的时候,会有多个实例,不管观察者是从何时开始订阅,每个实例都是从头开始把值发给对应的观察者。

BehaviorSubject

Subject 其中的一个变体就是 BehaviorSubject,它有一个“当前值”的概念。它保存了发送给消费者的最新值,当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”,在定义一个 BehaviorSubject 时需要有初始值。

在 messageService 定义一个 BehaviorSubject,初始值是1,updateBehaviorSubject 方法把值更新为2,代码如下:

//messageService.ts
private behaviorSubject: BehaviorSubject<number> = new BehaviorSubject<number>(1);
currentBehaviorSubject = this.behaviorSubject.asObservable();
updateBehaviorSubject() {
    this.behaviorSubject.next(2);
}

并在 RxjsSubjectBComponent 中调用执行 updateBehaviorSubject 方法,代码如下:

//RxjsSubjectBComponent
this.messageService.updateBehaviorSubject();

在 RxjsSubjectAComponent 中订阅 behaviorSubject 的值,代码如下:

let subscription1 = this.messageService.currentBehaviorSubject
    .subscribe(val => {
        console.log("this is in RxjsSubjectAComponent and Current behavior subject value is  " + val)
    });

this.subscription.add(subscription1);

在 RxjsSubjectCComponent 中,等5秒以后在订阅 behaviorSubject 的值,代码如下:

setTimeout(() => {
    let subscription1 = this.messageService.currentBehaviorSubject
        .subscribe(val => {
             console.log("this is in RxjsSubjectCComponent and Current behavior subject value is  " + val)
        });

}, 5000);

运行效果:在 RxjsSubjectAComponent 中会拿到1和2两个值,在 RxjsSubjectCComponent 中会拿到2这个值。

this is in RxjsSubjectAComponent and Current behavior subject value is  1
this is in RxjsSubjectAComponent and Current behavior subject value is  2
this is in RxjsSubjectCComponent and Current behavior subject value is  2

如果是正常的 Hot Observable 在5秒之后,拿不到任何值,因为在五秒内已经把值1和2都发送完了,五秒以后观察者再订阅,没有任何值可以发送了。但是用 BehaviorSubject 保存了发送给消费者的最新值,当有新的观察者订阅时,会立即从 BehaviorSubject 那接收到“当前值”,所以 RxjsSubjectCComponent 的订阅者会拿到2这个值。

ReplaySubject

类似于 BehaviorSubject,可以发送旧值给新的订阅者,但是不仅是‘当前值’,还可以是之前的旧值。

在 messageService 定义一个 ReplaySubject,为新的订阅者缓冲3个值,sendReplaySubject 方法把1 2 3 4 5五个值发给订阅者,代码如下:

//messageService.ts
private replaySubject: ReplaySubject<number> = new ReplaySubject<number>(3);
sendReplaySubject() {
    this.replaySubject.next(1);
    this.replaySubject.next(2);
    this.replaySubject.next(3);
    this.replaySubject.next(4);
    this.replaySubject.next(5);
}

getReplaySubject(): Observable<any> {
    return this.replaySubject.asObservable();
}

在 RxjsSubjectBComponent 中,调用 sendReplaySubject 方法:

this.messageService.sendReplaySubject();

在 RxjsSubjectAComponent 中,订阅 replaySubject 的值,代码如下:

let subscription2 = this.messageService.getReplaySubject()
    .subscribe(val => {
        console.log("this is in RxjsSubjectAComponent and Current Replay subject value is  " + val)
    });

this.subscription.add(subscription2);

在 RxjsSubjectCComponent 中,等5秒以后在订阅 replaySubject 的值,代码如下:

setTimeout(() => {
    let subscription2 = this.messageService.getReplaySubject()
        .subscribe(val => {
            console.log("this is in RxjsSubjectCComponent and Current repaly subject value is  " + val)
        });
    this.subscription.add(subscription2);

}, 5000);

效果如下:

this is in RxjsSubjectAComponent and Current Replay subject value is  1
this is in RxjsSubjectAComponent and Current Replay subject value is  2
this is in RxjsSubjectAComponent and Current Replay subject value is  3
this is in RxjsSubjectAComponent and Current Replay subject value is  4
this is in RxjsSubjectAComponent and Current Replay subject value is  5

this is in RxjsSubjectCComponent and Current repaly subject value is  3
this is in RxjsSubjectCComponent and Current repaly subject value is  4
this is in RxjsSubjectCComponent and Current repaly subject value is  5

AsyncSubject

只有当 Observable 执行完成时(执行 complete()),它才会将执行的最后一个值发送给观察者。

在 messageService 定义一个 AsyncSubject,sendAsyncSubject 方法把10001 10002 10003 三个值发给订阅者,代码如下:

private asyncSubject: AsyncSubject<number> = new AsyncSubject<number>();

sendAsyncSubject() {
    this.asyncSubject.next(10001);
    this.asyncSubject.next(10002);
    this.asyncSubject.next(10003);
    this.asyncSubject.complete();
}
getAsyncSubject() {
    return this.asyncSubject.asObservable();
}

在 RxjsSubjectBComponent 中,调用 sendAsyncSubject 方法:

this.messageService.sendAsyncSubject();

在 RxjsSubjectAComponent 中,订阅 asyncSubject 的值,代码如下:

let subscription3 = this.messageService.getAsyncSubject()
    .subscribe(val => {
        console.log("this is in RxjsSubjectAComponent and Current Async subject value is  " + val)
    });

this.subscription.add(subscription3);

在 RxjsSubjectCComponent 中,等5秒以后在订阅 asyncSubject 的值,代码如下:

setTimeout(() => {
    let subscription3 = this.messageService.getAsyncSubject()
        .subscribe(val => {
            console.log("this is in RxjsSubjectCComponent and Current async subject value is  " + val)
        });
    this.subscription.add(subscription3);

}, 5000);

效果如下:

this is in RxjsSubjectAComponent and Current Async subject value is  10003
this is in RxjsSubjectCComponent and Current async subject value is  10003