简介
可观察对象(Observable)支持在应用中的发布者和订阅者之间传递消息。
可观察对象(Observable)可以发送多个任意类型的值 —— 字面量、消息、事件。
基本用法和词汇
Observable 用于在发送方和接收方之间传输消息。在创建 Observable 对象时,需要传入一个函数作为构造函数的参数,这个函数叫订阅者函数,这个函数也就是生产者向消费者推送消息的地方。在被消费者 subscribe(订阅)之前,订阅者函数不会被执行,直到subscribe()函数被调用,该函数返回一个 subscription 对象,里面有一个unsubscribe()函数,消费者可以随时拒绝消息的接收!
好了,我们看如下案例:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>Observable对象使用</h2>
<div>
<button (click)="getData()" >Click</button>
</div>
<h2>计数</h2>
<div>
<button (click)="getRxjsIntervalData()" >Click</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
getData() {
const myObservable = new Observable((observer) => {
let uname = 'hresh';
observer.next(uname);
});
let oData = myObservable.subscribe((data) => {
console.log(data);
});
}
getRxjsIntervalData() {
let count = 0;
//开始每秒计数
const myObservable = new Observable((observer) => {
setInterval(() => {
count++;
observer.next(count);
}, 1000);
});
let oData = myObservable.subscribe((data) => {
console.log(data);
});
setTimeout(() => {
console.log('取消计数操作');
oData.unsubscribe(); /*3s后取消数据显示*/
}, 3000);
}
}
页面测试效果:
地理位置的获取
该案例的功能是获取地理位置,根据官方文档的案例进行扩展,不然直接看官方案例可能会存在疑惑。
在 HTML 规范中,增加了获取用户地理信息的 API,这样使得可以基于用户位置开发互联网应用,即基于位置服务 。
获取当前地信息
navigator.geolocation.getCurrentPosition(successCallback,errorCallback)
重复获取当前地理信息
navigator.geolocation.watchPosition(successCallback,errorCallback)
使用
当获取地理信息成功后,会调用 successCallback,并返回一个包含位置信息的对象 position。
- 经度 : coords.longitude
-
纬度 : coords.latitude
-
准确度 : coords.accuracy
-
海拔 : coords.altitude
-
海拔准确度 : coords.altitudeAcuracy
-
行进方向 : coords.heading
-
地面速度 : coords.speed
-
请求的时间: new Date(position.timestamp)
当获取地理信息失败后,会调用 errorCallback,返回错误信息 error。
observable.component.ts
文件内容如下:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>获取地理位置案例</h2>
<div>
<input type = "button" (click)="getLocationUpdate()" value = "Watch Update"/>
<input type = "button" (click) = "stopWatch()" value = "Stop Watch"/>
</div>
'
})
export class ObservableComponent implements OnInit {
watchID: number;
geoLoc: any;
public num = 0;
constructor() { }
ngOnInit(): void {
// this.geolocationTest();
}
getLocationUpdate() {
let index = this.num;
function showLocation(position) {
let latitude = position.coords.latitude;
let longitude = position.coords.longitude;
alert('index:' + index + '---Latitude : ' + latitude + '----Longitude: ' + longitude);
}
function errorHandler(err) {
if (err.code === 1) {
alert('Error: Access is denied!');
} else if (err.code === 2) {
// tslint:disable-next-line: quotemark
alert("Error: Position is unavailable!");
}
}
if (navigator.geolocation) {
// timeout at 60000 milliseconds (60 seconds)
const options = {
enableHighAccuracy: false, // 位置是否精确获取
timeout: 5000, // 获取位置允许的最长时间
maximumAge: 1000 // 多久更新获取一次位置
};
this.geoLoc = navigator.geolocation;
this.watchID = this.geoLoc.watchPosition(showLocation, errorHandler, options);
console.log(this.watchID);
this.num += 1;
} else {
alert('Sorry, browser does not support geolocation!');
}
}
stopWatch() {
this.geoLoc.clearWatch(this.watchID);
this.num = 0;
}
}
运行项目后,在 IE 浏览器中打开,谷歌浏览器暂不支持地理位置获取。点击页面中的第一个按钮,效果如下:
Geolocation.watchPosition()
用于注册监听器,在设备的地理位置发生改变的时候自动被调用。该方法会返回一个 ID,从图中可以看到控制台输出的 watchID 值为1。在页面上停留一段时间,可以看到以下现象:
该监听器会一直监听设备的地理位置,一旦有所变动就会自动调用。如要取消监听可以通过 Geolocation.clearWatch()
传入该 ID 实现取消的目的,点击第二个按钮即可。
接下来我们使用可观察对象来实现地理位置的监听。
基于Observable的位置监听
作为发布者,你创建一个 Observable 的实例,其中定义了一个订阅者(subscriber)函数。 当有消费者调用 subscribe() 方法时,这个函数就会执行。 订阅者函数用于定义“如何获取或生成那些要发布的值或消息”。
要执行所创建的可观察对象,并开始从中接收通知,你就要调用它的 subscribe() 方法,并传入一个观察者(observer)。 这是一个 JavaScript 对象,它定义了你收到的这些消息的处理器(handler)。 subscribe() 调用会返回一个 Subscription 对象,该对象具有一个 unsubscribe() 方法。 当调用该方法时,你就会停止接收通知。
observable.component.ts
文件内容如下:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>Observable基本用法</h2>
<div>
<button (click)="geolocationTest()">获取当前地信息</button>
</div>
'
})
export class ObservableComponent implements OnInit {
public num = 0;
constructor() { }
ngOnInit(): void {
// this.geolocationTest();
}
// 创建一个Observable,它将开始监听地理位置更新
// 消费者订阅时
geolocationTest() {
const locations = new Observable((observer) => {
let watchId: number;
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition((position) => {
observer.next(position);
}, (error: PositionError) => {
observer.error(error);
});
} else {
observer.error('Geolocation not available');
}
// 当使用者取消订阅时,请清理数据以备下次订阅。
return {
unsubscribe() {
navigator.geolocation.clearWatch(watchId);
}
};
});
let index = this.num;
// 调用subscription()开始监听更新。
const locationsSubscription = locations.subscribe({
next(position) {
console.log(index);
console.log('Current Position: ', position);
},
error(msg) {
console.log('Error Getting Location: ', msg);
}
});
this.num += 1;
// 10秒后停止监听位置信息
setTimeout(() => {
locationsSubscription.unsubscribe();
}, 10000);
}
}
打开页面后,点击按钮,观察控制台输出情况。
我们定义的事件方法监听了 10s,即不再对地理位置的更新进行监听,所以很长时间后控制台只输出了一条信息。
我们对上述的代码进行修改,查看页面显示效果。首先注释 setTimeout 方法,页面显示内容为:
从上图可以看出,该监听器一直在运行着,所以在不间断的输出位置更新信息。
但是当 setTimeout 方法有效时,无论是否存在 clearWatch 方法,该监听器都不会一直执行下去。
定义观察者
用于接收可观察对象通知的处理器要实现 Observer 接口。这个对象定义了一些回调函数来处理可观察对象可能会发来的三种通知
通知类型 | 说明 |
---|---|
next | 必要。用来处理每个送达值。在开始执行后可能执行零次或多次。 |
error | 可选。用来处理错误通知。错误会中断这个可观察对象实例的执行过程。 |
complete | 可选。用来处理执行完毕(complete)通知。当执行完毕后,这些值就会继续传给下一个处理器。 |
Observer
对象其实就是个包含 next、error、complete 方法的对象。比如说:
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
关于 Observer
对象有一些比较重要的原则:
- 传入的
Observer
对象可以不实现所有规定的方法 (next、error、complete 方法) - 在
complete
或者error
触发之后再调用next
方法是没用的 - 调用
unsubscribe
方法后,任何方法都不能再被调用了 complete
和error
触发后,unsubscribe
也会自动调用- 当
next
、complete
和error
出现异常时,unsubscribe
也会自动调用以保证资源不会浪费 next
、complete
和error
是可选的。按需处理即可,不必全部处理
订阅
只有当有人订阅 Observable 的实例时,它才会开始发布值。订阅时要先调用该实例的 subscribe()
方法,并把一个观察者对象传给它,用来接收通知。
为了展示订阅的原理,我们需要创建新的可观察对象。它有一个构造函数可以用来创建新实例,但是为了更简明,也可以使用
Observable
上定义的一些静态方法来创建一些常用的简单可观察对象:
of(...items)
—— 返回一个Observable
实例,它用同步的方式把参数中提供的这些值发送出来。from(iterable)
—— 把它的参数转换成一个Observable
实例。 该方法通常用于把一个数组转换成一个(发送多个值的)可观察对象。
下面的例子会创建并订阅一个简单的可观察对象,它的观察者会把接收到的消息记录到控制台中:
import { Component, OnInit } from '@angular/core';
import { Observable,of } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用of来创建可观察对象</h2>
<div>
<button (click)="getData()">Click here</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
getData() {
// Create simple observable that emits three values
const myObservable = of(1, 2, 3);
// Create observer object
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// Execute with the observer object
myObservable.subscribe(myObserver);
}
}
启动该项目,打开页面并点击按钮,出现这样的结果:
观察者除了单独定义之外,还可以直接同 subscribe() 方法 一起使用,下面的代码和刚才的等价:
myObservable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);
创建可观察对象
在上面提到可以使用 Observable
上定义的一些静态方法来创建一些常用的简单可观察对象,比如 of() 和 from() 方法。接下来我们使用 Observable
构造函数可以创建任何类型的可观察流。 当执行可观察对象的 subscribe()
方法时,这个构造函数就会把它接收到的参数作为订阅函数来运行。 订阅函数会接收一个 Observer
对象,并把值发布给观察者的 next()
方法。
比如,要创建一个与前面的 of(1, 2, 3)
等价的可观察对象,你可以这样做:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用Observable构造函数来创建可观察对象</h2>
<div>
<button (click)="getData2()">Click here</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
getData2() {
const myObservable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
return { unsubscribe() { } };
});
myObservable.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
}
}
我们都知道键盘事件分为三种,包括 keydown,keypress 和 keyup,在我们之前学习普通事件的时候,在 input 标签中使用过这三个事件,如下述代码:
<input type="text" (keydown)="keydown()" (keypress)="keypress()" (keyup)="keyup()" />
关于这三个事件的定义如下:
keydown
:按下键盘键keypress
:紧接着keydown
事件触发(只有按下字符键时触发)keyup
:释放键盘键
顺序为:keydown -> keypress ->keyup
在上述代码可以看出,如果像捕捉到不同的事件需要单独进行定义。接下来我们将对上述代码进行修改,创建一个用来发布事件的可观察对象。在这个例子中,订阅函数是用内联方式定义的。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>发布键盘事件的可观察对象</h2>
<div>
<input type="text" class="name" id="name" placeholder="please input data" (keydown)="inputFunc()">
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
myObservable.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
}
fromEvent(target, eventName) {
return new Observable((observer) => {
const handler = (e) => observer.next(e);
// Add the event handler to the target
target.addEventListener(eventName, handler);
return () => {
// Detach the event handler from the target
target.removeEventListener(eventName, handler);
};
});
}
inputFunc() {
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = this.fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});
const subscription2 = this.fromEvent(nameInput, 'keyup')
.subscribe(() => {
console.log('keyup释放键盘:' + nameInput.value);
});
}
}
页面测试效果如下:
从结果可以看出,你可以在 inputFunc 方法中创建可发布 keydown
事件的可观察对象以及 keyup
事件的可观察对象。
多播
典型的可观察对象会为每一个观察者创建一次新的、独立的执行。 当观察者进行订阅时,该可观察对象会连上一个事件处理器,并且向那个观察者发送一些值。当第二个观察者订阅时,这个可观察对象就会连上一个新的事件处理器,并独立执行一次,把这些值发送给第二个可观察对象。
有时候,不应该对每一个订阅者都独立执行一次,你可能会希望每次订阅都得到同一批值 —— 即使是那些你已经发送过的。这在某些情况下有用,比如用来发送 document
上的点击事件的可观察对象。
多播用来让可观察对象在一次执行中同时广播给多个订阅者。借助支持多播的可观察对象,你不必注册多个监听器,而是复用第一个(next
)监听器,并且把值发送给各个订阅者。
来看一个从 1 到 3 进行计数的例子, 如果你订阅了两次,就会有两个独立的流 。
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用Observable构造函数来创建可观察对象</h2>
<div>
<button (click)="getData2()">Click here</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
getData2() {
const myObservable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
return { unsubscribe() { } };
});
myObservable.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('Finished sequence'); }
});
myObservable.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('Finished sequence'); }
});
}
}
页面结果如下:
可以发现,这两个观察者之间互不干扰,即生产者先将值发送给 observer1,再将值发送给 observer2。
修改上述代码使其支持多播,代码如下:
import { Component, OnInit } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-observable',
templateUrl: '
<h2>使用Observable构造函数来创建可观察对象</h2>
<div>
<button (click)="getData2()">Click here</button>
</div>
'
})
export class ObservableComponent implements OnInit {
constructor() { }
ngOnInit(): void {
}
sequenceSubscriber(observer) {
const seq = [1, 2, 3];
let timeoutId;
const mydate = this.date;
// Will run through an array of numbers, emitting one value
// per second until it gets to the end of the array.
function doSequence(arr, idx) {
timeoutId = setTimeout(() => {
const dd = new Date();
// console.log(dd.toLocaleTimeString());
observer.next(dd.toLocaleTimeString() + '----:' + arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(arr, ++idx);
}
}, 1000);
}
doSequence(seq, 0);
// Unsubscribe should clear the timeout to stop execution
return {
unsubscribe() {
clearTimeout(timeoutId);
}
};
}
getNum() {
const sequence = new Observable(this.sequenceSubscriber);
// Subscribe starts the clock, and will emit after 1 second
sequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// After 1/2 second, subscribe again.
// setTimeout(() => {
// sequence.subscribe({
// next(num) { console.log('2nd subscribe: ' + num); },
// complete() { console.log('2nd sequence finished.'); }
// });
// }, 500);
sequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}
页面测试结果为:
问题记录
angular中clearTimeout不生效的问题
原因:安装了@types/node 所致
解决办法:
使用 window.setTimeout 和 window.clearTimeout。
总结
本文内容参考官方文档总结的,官方文档毕竟有深度,所以对于刚接触 Observable 对象的新人来说理解起来比较困难,所以如果各位想快速入门的话,可以参看 Angular7入门辅助教程(五)——Observable(可观察对象) ,该文通俗易懂,希望对大家有所帮助。回到本文,本文只是想按照官方文档理解一遍,对于其中的案例进行扩展,希望能帮助你更好的理解。
参考文献
本文作者为hresh,转载请注明。