概念:可观察对象,一个可调用的未来值或事件的集合
根据基本用法,
Observable
可以执行同步或异步任务,并向observer
推送数据,要实现核心功能,只需要如下两个步骤:- 创建:作为发布者,
observable
需要设置一个可执行的publish
方法,其入参是observer
对象,该方法在构造实例的时候传入,在执行该方法的时候就可以调用observer对象的回调方法进行传值;
- 订阅:
publish
方法执行的时机是在observable
被subscribe
的时候,因此observable
是惰性推送值,且对于每个观察者来说是独立执行的;
class Observable {
constructor(publishFn) {
this.publish = publishFn;
}
subscribe(observer) {
this.publish(observer);
return observer;
}
}
export class Observable<T> implements Subscribable<T> {
constructor(subscribe?: (this: Observable<T>, subscriber: Subscriber<T>) => TeardownLogic) {
if (subscribe) {
this._subscribe = subscribe;
}
}
subscribe(
observerOrNext?: Partial<Observer<T>> | ((value: T) => void) | null,
error?: ((error: any) => void) | null,
complete?: (() => void) | null
): Subscription {
const subscriber = isSubscriber(observerOrNext) ? observerOrNext : new SafeSubscriber(observerOrNext, error, complete);
errorContext(() => {
const { operator, source } = this;
subscriber.add(
operator
? // We're dealing with a subscription in the
// operator chain to one of our lifted operators.
operator.call(subscriber, source)
: source
? // If `source` has a value, but `operator` does not, something that
// had intimate knowledge of our API, like our `Subject`, must have
// set it. We're going to just call `_subscribe` directly.
this._subscribe(subscriber)
: // In all other cases, we're likely wrapping a user-provided initializer
// function, so we need to catch errors and handle them appropriately.
this._trySubscribe(subscriber)
);
});
return subscriber;
}
}