RxJs介绍

RxJs介绍

十一月 14, 2019

概述

我们先来看看官网介绍:
RxJS 是一个库,它通过使用 observable 序列来编写异步和基于事件的程序。它提供了一个核心类型 Observable,附属类型 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 启发的操作符 (map、filter、reduce、every, 等等),这些数组操作符可以把异步事件作为集合来处理。

1
可以把 RxJS 当做是用来处理事件的 Lodash 。

ReactiveX 结合了 观察者模式、迭代器模式 和 使用集合的函数式编程,以满足以一种理想方式来管理事件序列所需要的一切。

这里提到了两种设计模式,观察者模式和迭代器模式。

观察者模式

我们先来看一段代码,来了解下这个模式:

// 被观察者对象
class Subject {
    constructor() {
        this.observerCollection = []
    }

    // 注册观察者
    registerObserver(observer) {
        this.observerCollection.push(observer)
    }

    // 取消订阅
    unRegisterObserver(observer) {
        let index = this.observerCollection.indexOf(observer)
        this.observerCollection.splice(index, 1)
    }

    // 发送通知
    notifyObservers() {
        this.observerCollection.forEach(observer => observer.notify());
    }
}

class Observer {
    constructor(name) {
        this.name = name
    }
    notify() {
        console.log(`${this.name} has been notified.`)
    }
}

let subject = new Subject(); // 创建主题对象

let observer1 = new Observer('x'); // 创建观察者A - 'semlinker'
let observer2 = new Observer('y'); // 创建观察者B - 'lolo'

subject.registerObserver(observer1); // 注册观察者A
subject.registerObserver(observer2); // 注册观察者B

subject.notifyObservers(); // 通知观察者

subject.unRegisterObserver(observer2); // 移除观察者A

subject.notifyObservers(); // 验证是否成功移除

从上面我们可以看出来,观察者模式实际也是一种订阅模式,定义了一对多的关系,让多个观察者对象同时监听某一个对象,这个对象状态发生变化时就会通知所有的观察者对象。

观察者模式的应用

<button id="btn">确认</button>

function clickHandler(event) {
    console.log('用户已点击确认按钮!');
}
document.getElementById("btn").addEventListener('click', clickHandler);

迭代器模式(ES6)

let arr = ['a', 'b', 'c'];
let iter = arr[Symbol.iterator]();
console.log(iter.next())
console.log(iter.next())
console.log(iter.next())
console.log(iter.next())

一个迭代器对象 ,知道如何每次访问集合中的一项, 并记录它的当前在序列中所在的位置。在 JavaScript 中迭代器是一个对象,它提供了一个 next() 方法,返回序列中的下一项。这个方法返回包含 done 和 value 两个属性的对象。对象的取值如下:

  • 在最后一个元素前: {done: false, value: elementValue }
  • 在最后一个元素后: { done: true, value: undefined }

Observer(观察者)

观察者是由Observable发送的值的消费者。观察者只是一组回调函数的集合,每个回调函数对应一种Observable发送的通知类型:next,error和complete。下面是一个典型的观察者对象:

var observer = {
  next: x => console.log('Observer got a next value: ' + x),
  error: err => console.error('Observer got an error: ' + err),
  complete: () => console.log('Observer got a complete notification'),
};

要使用观察者,需要把它提供给Observable的subscribe方法:
observable.subscribe(observer);
而取消订阅则是使用unsubscribe方法

自定义实现

class DataSource {
    constructor() {
        let i = 0;
        this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
    }

    emit(n) {
        const limit = 10;  // 设置数据上限值
        if (this.ondata) {
            this.ondata(n);
        }
        if (n === limit) {
            if (this.oncomplete) {
                this.oncomplete();
            }
            this.destroy();
        }
    }

    destroy() { // 清除定时器
        clearInterval(this._id);
    }
}

function myObservable(observer) {
    let datasource = new DataSource(); // 创建数据源
    datasource.ondata = (e) => observer.next(e); // 处理数据流
    datasource.onerror = (err) => observer.error(err); // 处理异常
    datasource.oncomplete = () => observer.complete(); // 处理数据流终止
    return () => { // 返回一个函数用于,销毁数据源
        datasource.destroy();
    };
}

const unsub = myObservable({
    next(x) { console.log(x); },
    error(err) { console.error(err); },
    complete() { console.log('done') }
});

// setTimeout(unsub, 500);

上面的myObservable实际上就是一个被观察者,它接收一个Observer对象,返回一个unsubscribe函数,用于取消订阅。但是它有很多缺陷:

  • 传入的Observer对象可以不实现所有规定的方法
  • 在complete或者error触发后再调用next方法是没用的
  • 调用unsubscribe方法后,任何方法不能再被调用了
  • complete和error触发后,unsubscribe也会自动调用
  • 当出现异常后,unsubscribe也会自动调用以保证资源不会被浪费

下面我们来修改下上述代码:

class DataSource {
    constructor() {
        let i = 0;
        this._id = setInterval(() => this.emit(i++), 200); // 创建定时器
    }

    emit(n) {
        const limit = 10;  // 设置数据上限值
        if (this.ondata) {
            this.ondata(n);
        }
        if (n === limit) {
            if (this.oncomplete) {
                this.oncomplete();
            }
            this.destroy();
        }
    }

    destroy() { // 清除定时器
        clearInterval(this._id);
    }
}

class SafeObserver {
    constructor(destination) {
        this.destination = destination;
    }

    next(value) {
        // 尚未取消订阅,且包含next方法
        if (!this.isUnsubscribed && this.destination.next) {
            try {
                this.destination.next(value);
            } catch (err) {
                // 出现异常时,取消订阅释放资源,再抛出异常
                this.unsubscribe();
                throw err;
            }
        }
    }

    error(err) {
        // 尚未取消订阅,且包含error方法
        if (!this.isUnsubscribed && this.destination.error) {
            try {
                this.destination.error(err);
            } catch (e2) {
                // 出现异常时,取消订阅释放资源,再抛出异常
                this.unsubscribe();
                throw e2;
            }
            this.unsubscribe();
        }
    }

    complete() {
        // 尚未取消订阅,且包含complete方法
        if (!this.isUnsubscribed && this.destination.complete) {
            try {
                this.destination.complete();
            } catch (err) {
                // 出现异常时,取消订阅释放资源,再抛出异常
                this.unsubscribe();
                throw err;
            }
            this.unsubscribe();
        }
    }

    unsubscribe() { // 用于取消订阅
        this.isUnsubscribed = true;
        if (this.unsub) {
            this.unsub();
        }
    }
}

function myObservable(observer) {
    const safeObserver = new SafeObserver(observer); // 创建SafeObserver对象
    const datasource = new DataSource(); // 创建数据源
    datasource.ondata = (e) => safeObserver.next(e);
    datasource.onerror = (err) => safeObserver.error(err);
    datasource.oncomplete = () => safeObserver.complete();

    safeObserver.unsub = () => { // 为SafeObserver对象添加unsub方法
        datasource.destroy();
    };
    // 绑定this上下文,并返回unsubscribe方法
    return safeObserver.unsubscribe.bind(safeObserver);
}


const unsub = myObservable({
    next(x) { console.log(x); },
    error(err) { console.error(err); },
    complete() { console.log('done') }
});

最后我们把subscribe加入进去:

/**
 * A contrived data source to use in our "observable"
 * NOTE: this will clearly never error
 */

class DataSource {
    constructor() {
        let i = 0;
        this._id = setInterval(() => this.emit(i++), 200);
    }

    emit(n) {
        const limit = 10;
        if (this.ondata) {
            this.ondata(n);
        }
        if (n === limit) {
            if (this.oncomplete) {
                this.oncomplete();
            }
            this.destroy();
        }
    }

    destroy() {
        clearInterval(this._id);
    }
}

/**
 * Safe Observer
 */
class SafeObserver {
    constructor(destination) {
        this.destination = destination;
    }

    next(value) {
        // only try to next if you're subscribed have a handler
        if (!this.isUnsubscribed && this.destination.next) {
            try {
                this.destination.next(value);
            } catch (err) {
                // if the provided handler errors, teardown resources, then throw
                this.unsubscribe();
                throw err;
            }
        }
    }

    error(err) {
        // only try to emit error if you're subscribed and have a handler
        if (!this.isUnsubscribed && this.destination.error) {
            try {
                this.destination.error(err);
            } catch (e2) {
                // if the provided handler errors, teardown resources, then throw
                this.unsubscribe();
                throw e2;
            }
            this.unsubscribe();
        }
    }

    complete() {
        // only try to emit completion if you're subscribed and have a handler
        if (!this.isUnsubscribed && this.destination.complete) {
            try {
                this.destination.complete();
            } catch (err) {
                // if the provided handler errors, teardown resources, then throw
                this.unsubscribe();
                throw err;
            }
            this.unsubscribe();
        }
    }

    unsubscribe() {
        this.isUnsubscribed = true;
        if (this.unsub) {
            this.unsub();
        }
    }
}

/**
 * Observable basic implementation
 */
class Observable {
    constructor(_subscribe) {
        this._subscribe = _subscribe;
    }

    subscribe(observer) {
        const safeObserver = new SafeObserver(observer);
        safeObserver.unsub = this._subscribe(safeObserver);
        return safeObserver.unsubscribe.bind(safeObserver);
    }
}

/**
 * map operator
 */
function map(source, project) {
    return new Observable((observer) => {
        const mapObserver = {
            next: (x) => observer.next(project(x)),
            error: (err) => observer.error(err),
            complete: () => observer.complete()
        };
        return source.subscribe(mapObserver);
    });
}

/**
 * our observable
 */
const myObservable = new Observable((observer) => {
    const datasource = new DataSource();
    datasource.ondata = (e) => observer.next(e);
    datasource.onerror = (err) => observer.error(err);
    datasource.oncomplete = () => observer.complete();

    return () => datasource.destroy();
});

const unsub = myObservable.subscribe({
    next(x) { console.log(x); },
    error(err) { console.error(err); },
    complete() { console.log('done') }
});