Angular Observable & RxJS

Observable

Observable은 Application의 Publisher와 Subscriber 사이의 메시지 전달을 지원하는 기능을 제공합니다. Observable은 이벤트 처리, 비동기 프로그래밍 및 다중 값 처리와 관련된 다른 기술에 비해 상당한 이점을 제공합니다.

Observable은 선언적입니다. 즉, 여러분이 값을 게시하는 함수를 정의해도, Consumer가 구독 하기전까지 실행되지 않습니다. 그리고 구독된 Consumer는 함수가 완료되거나 구독을 취소 할 때까지 알림을 받습니다.

Observable은 컨텍스트에 따라 리터럴(Literal), 메시지(Message) 또는 이벤트(Event)등 여러 값을 전달할 수 있습니다. 값을 수신하기 위한 API는 값이 동기적 또는 비동기적으로 전달되는지 여부에 관계없이 동일합니다. 설정 및 해체 로직은 모두 Observable에 의해 처리되고, 응용 프로그램에서는 값을 소비하기 위해 구독하고, 완료되면 구독을 취소하는 것에 신경을 써야 합니다. Stream이 Keystroke인지, HTTP 응답인지, 인터벌 타이머인지 여부와 관계없이 값을 구독하고 멈추는 인터페이스는 동일합니다.

이러한 이점 때문에 Observable은 Angular 내에서 광범위하게 사용되며 앱 개발에도 권장됩니다.

기본 사용법 및 용어

Publisher는 subscriber 함수를 정의하는 Observable 인스턴스를 만듭니다. 이 함수는 Consumer가 subscribe()함수를 호출할 때 실행 됩니다. subscriber 함수는 게시될 값 또는 메시지를 얻거나 생성하는 방법을 정의합니다.

생성한 Observable 객체를 실행하고 알림 수신을 시작하려면 observer를 전달하여 subscribe() 메서드를 호출합니다. observer는 여러분이 수신한 알림을 처리하는 핸드러를 정의한 JavaScript 객체입니다. subscribe() 메서드는 Subscripotion 객체를 반환하는데, 이 객체에는 알림 수신을 중지하기 위해 호출하는 unsubscribe() 메서드가 있습니다.

다음은 Geolocation 업데이트 정보 제공에 Observable을 어떻게 사용할 수 있지 보여줌으로써 기본 사용 모델을 설명하는 예제입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// Consumer가 구독할때 Geolocation 변경사항을 수신하는 *Observable*을 생성합니다.
const locations = new Observable((observer) => {
// next와 error 콜백을 가져옵니다. 이것은 Consumer가 구독할때 넘겨줍니다.
const {next, error} = observer;
let watchId;
// 게시할 값을 제공하는 간단한 geolocation API를 확인
if ('geolocation' in navigator) {
watchId = navigator.geolocation.watchPosition(next, error);
} else {
error('Geolocation not available');
}
// Consumer가 구독을 중지하면 다음 구독을 위해 준비한 데이터를 정리합니다.
return {unsubscribe() { navigator.geolocation.clearWatch(watchId); }};
});
// 변경사항 수신을 시작하기 위해 subscribe()를 호출합니다.
const locationsSubscription = locations.subscribe({
next(position) { console.log('Current Position: ', position); },
error(msg) { console.log('Error Getting Location: ', msg); }
});
// 10초후에 수신을 중지합니다.
setTimeout(() => { locationsSubscription.unsubscribe(); }, 10000);

Observer 정의

Observable 알람을 수신하기 위한 핸들러는 Observer 인터페이스를 구현합니다. 이것은 Observable이 보낼수 있는 3가지 알람 타입을 처리할 수 있는 콜백 메서드가 정의된 객체입니다.

알람 종류 설명
next 필수. 전달된 각각의 값을 처리하는 핸들러. 실행 시작후 0번 또는 그 이상 호출됩니다.
error 선택사항. 오류 알람에 대한 처리 핸들러. 오류로 인해 Observable 인스턴스의 실행이 중지됩니다.
complete 선택사항. 실행 완료 알람에 대한 처리 핸들러. 지연된 값은 실행 완료 후에도 계속해서 next 핸들러로 전달될 수 있습니다.

Observable 객체는 이러한 핸들러의 조합을 정의할 수 있습니다. 만약 알람 종류에 대한 핸들러를 제공하지 않으면, observer는 해당 종류의 알람을 무시합니다.

구독(Subscribing)

Observable 인스턴스는 누군가가 구독할 때만 값을 게시 시작합니다. 여러분은 알람을 수신받을 observer 객체를 Observable 인스턴스의 subscribe() 메서드에 넘겨 구독을 시작합니다.

어떻게 구독(Subscribing)이 작동하는지 보여주기 위해 새로운 Observable을 만듭니다. 새로운 인스턴스를 생성하기 위해 사용할 수 있는 생성자가 있지만, 설명을 위해 Observable 클래스에서 자주 사용하는 간단한 Observable을 만드는 몇가지 정적 메서드를 사용할 수 있습니다.

  • Observable.of(…items)-인수로 제공된 값을 동기적으로 전달하는 Observable 인스턴스를 반환합니다.
  • Observable.from(iterable)-인수를 Observable 인스턴스로 변경합니다. 일반적으로 이 메서드는 Array를 Observable로 변환할 때 사용합니다.

다음은 수신받은 메시지를 console에 기록하는 간단한 observer를 이용하여 Observable을 생성하고 구독하는 예제입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 값 3개를 발생시키는 간단한 observable 생성
const myObservable = Observable.of(1, 2, 3);
// observer 객체 생성
const myObserver = {
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification'),
};
// observer 객체를 이용해 실행시킴
myObservable.subscribe(myObserver);
// Logs:
// Observer got a next value: 1
// Observer got a next value: 2
// Observer got a next value: 3
// Observer got a complete notification

또는 subscribe() 메서드는 next, errorcomplete 핸들러에 대한 콜백 함수를 인수에서 정의하는 것을 허용합니다. 예를 들어, 다음 subscribe() 호출은 사전 정의된 observer를 지정하여 호출하는 것과 똑같습니다.

1
2
3
4
5
myObservable.subscribe(
x => console.log('Observer got a next value: ' + x),
err => console.error('Observer got an error: ' + err),
() => console.log('Observer got a complete notification')
);

위의 두가지 방법 모두 next는 필수사항이고, errorcomplete는 선택사항입니다.

next 메서드는 컨텍스트에 따라 메시지 문자열, 이벤트 객체, 숫자값 또는 구조체를 수신 받을수 있습니다. 일반적으로 우리는 Stream에 의해 Observable로 게시된 데이터를 참조합니다. 모든 타입은 Observable로 표시될 수 있으며, 해당 값은 Stream으로 게시됩니다.

Observable 생성

Observable 생성자를 사용하여 모든 타입의 Observable Stream을 만듭니다. 생성자는 Observablesubscribe() 메서드가 호출될 때 실행시킬 subscriber 함수를 인수로 받습니다. subscriber 함수는 observer 객체를 받고, observernext 메서드에 값을 게시할 수 있습니다.

예를 들어 Observable.of(1, 2, 3)에 해당하는 Observable 객체를 만들려면 다음과 같이할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 이 함수는 subscribe()가 호출될 때 실행됩니다.
function sequenceSubscriber(observer) {
// 순차적으로 1, 2, 그리고 3 을 전달하 후 완료(complete) 합니다.
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
// 값이 순차적으로 전달되기 때문에 unsubscribe 함수는 아무것도 할 필요가 없습니다.
return {unsubscribe() {}};
}
// 위의 시퀀스를 전달할 새로운 Observable을 생성합니다.
const sequence = new Observable(sequenceSubscriber);
// Observable을 실행 시키고 각 알람의 결과를 출력합니다.
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// Logs:
// 1
// 2
// 3
// Finished sequence

이 예를 좀 더 발전시키기 위해, 우리는 이벤트를 게시하는 Observable을 생성할 수 있습니다. 이 예제에서 subscriber함수가 인라인으로 정의 됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
function fromEvent(target, eventName) {
return new Observable((observer) => {
const handler = (e) => observer.next(e);
// target에 Event 핸들러를 붙입니다.
target.addEventListener(eventName, handler);
return () => {
// target에서 이벤트 핸들러를 제거합니다.
target.removeEventListener(eventName, handler);
};
});
}

이제 이 함수를 keydown 이벤트를 게시하는 Observable을 생성하는데 사용할 수 있습니다.

1
2
3
4
5
6
7
8
9
const ESC_KEY = 27;
const nameInput = document.getElementById('name') as HTMLInputElement;
const subscription = fromEvent(nameInput, 'keydown')
.subscribe((e: KeyboardEvent) => {
if (e.keyCode === ESC_KEY) {
nameInput.value = '';
}
});

여러 Comsumer에게 동시에 보내기(Multicasting)

일반적인 Observable 생성은, 구독하는 각각 observer에 대해 독립적으로 실행 됩니다. observer가 구독할 때 Observable은 이벤트 핸들러를 연결하고 그 observer에 값을 전달합니다. 두 번째 observer가 구독하면 Observable은 새로운 이벤트 핸들러에 연결하고 별도의 실행으로 두 번째 observer에 값을 전달합니다.

Document 객체에 대한 클릭을 Observable하는 경우와 같이 각 subscriber가 독립된 실행으로 시작하는 대신에 이미 값이 발행되고 있다고 할지라도 각 구독자가 같은 값을 갖기를 원할 수도 있습니다.

Multicasting은 단일 실행으로 여러 구독자 목록에 브로드 캐스트하는 기능입니다. Multicasting observable을 활용하면 Document에 여러개의 수신자(Listener)를 등록할 필요가 없고, 첫 번째 수신자(Listener)를 재사용하여 각 구독자에 값을 보냅니다.

Observable을 생성할 때 Observable을 어떻게 사용 할지, 그리고 값을 Multicast 할지 말지를 결정합니다.

아래의 예제는 1부터 3까지 카운트 하며, 각 숫자가 발행된 후 1초가 지연됩니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
function sequenceSubscriber(observer) {
const seq = [1, 2, 3];
let timeoutId;
// 배열 끝에 도달 할 때까지 초당 하나의 값을 게시하는 숫자 배열을 따라 실행합니다.
function doSequence(arr, idx) {
timeoutId = setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(arr, idx++);
}
}, 1000);
}
doSequence(seq, 0);
// Unsubscribe 할때 timeout을 취소 합니다.
return {unsubscribe() {
clearTimeout(timeoutId);
}};
}
// 위에 정의한 시퀀스를 전달하는 Observable을 새로 생성합니다.
const sequence = new Observable(sequenceSubscriber);
sequence.subscribe({
next(num) { console.log(num); },
complete() { console.log('Finished sequence'); }
});
// 출력 결과:
// (at 1 second): 1
// (at 2 seconds): 2
// (at 3 seconds): 3
// (at 3 seconds): Finished sequence

두번 구독하면 매초 값이 게시되는 두개의 분리된 Stream이 생성되고, 아래와 같이 보여집니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// 구독을 시작합니다. 1초후 값이 게시 됩니다.
sequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// 0.5초 후에 다시 구독합니다.
setTimeout(() => {
sequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 500);
// 출력:
// (at 1 second): 1st subscribe: 1
// (at 1.5 seconds): 2nd subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2.5 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3.5 seconds): 2nd subscribe: 3
// (at 3.5 seconds): 2nd sequence finished

Observable을 Multicasting 으로 변경하면 다음과 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
function multicastSequenceSubscriber() {
const seq = [1, 2, 3];
// 각 observer를 추적합니다.(모든 활성 구독마다 하나씩).
const observers = [];
// 하나의 value set만 생성되고 각 subscriber로 Multicast 되기 때문에 하나의 timeoutId만 있습니다.
let timeoutId;
// subscribe() 함수가 invoke 될때 실행 되는 subscriber 함수를 리턴합니다.
return (observer) => {
observers.push(observer);
// 첫 번째 구독인 경우 시퀀스를 시작합니다.
if (observers.length === 1) {
timeoutId = doSequence({
next(val) {
// observer를 순회하면서 모든 구독에 알람을 줍니다.
observers.forEach(obs => obs.next(val));
},
complete() {
// 모든 complete 콜백에 알람을 줍니다.
observers.forEach(obs => obs.complete());
}
}, seq, 0);
}
return {
unsubscribe() {
// observer 배열에서 제거합니다. 그리고 더이상 알람을 받지 않습니다.
observers.splice(observers.indexOf(observer), 1);
// Listener가 없을 경우 정리합니다.
if (observers.length === 0) {
clearTimeout(timeoutId);
}
}
};
};
}
// 배열을 순회하면서 배열의 끝에 다다를 때까지 1초에 하나의 값을 게시합니다.
function doSequence(observer, arr, idx) {
return setTimeout(() => {
observer.next(arr[idx]);
if (idx === arr.length - 1) {
observer.complete();
} else {
doSequence(observer, arr, idx++);
}
}, 1000);
}
// 위의 시퀀스를 전달하는 새로운 Observable을 생성합니다.
const multicastSequence = new Observable(multicastSequenceSubscriber);
// 구독을 시작하고 1초후에 게시를 시작합니다.
multicastSequence.subscribe({
next(num) { console.log('1st subscribe: ' + num); },
complete() { console.log('1st sequence finished.'); }
});
// 1.5초 후에 구독을 다시 시작 합니다. (첫 값은 받지 못합니다.)
setTimeout(() => {
multicastSequence.subscribe({
next(num) { console.log('2nd subscribe: ' + num); },
complete() { console.log('2nd sequence finished.'); }
});
}, 1500);
// 출력:
// (at 1 second): 1st subscribe: 1
// (at 2 seconds): 1st subscribe: 2
// (at 2 seconds): 2nd subscribe: 2
// (at 3 seconds): 1st subscribe: 3
// (at 3 seconds): 1st sequence finished
// (at 3 seconds): 2nd subscribe: 3
// (at 3 seconds): 2nd sequence finished

Multicast Observable은 조금더 많은 설정을 필요로 하지만, 어떤 어플리케이션에서는 이러한 사용법이 훨씬 더 유용할 수 있습니다. 나중에 Multicasting 프로세스를 간소화하는 도구를 살펴볼 예정이며, 또한 임의의 Observable을 가져와 Multicasting 할 수도 있습니다.

오류 처리

Observable은 값을 동기적으로 생성하기 때문에 오류를 잡는데(catch) try/catch가 효율적이지 않습니다. 대신에 observererror 콜백을 지정하여 오류를 처리할 수 있습니다. 오류가 발생하면 Observable이 구독을 정리하고 값 생성을 중지합니다. Observable은 값을 생성하거나 (next 콜백을 호출할 수 있습니다.) complete 또는error 콜백을 호출하여 완료할 수 있습니다.

1
2
3
4
myObservable.subscribe({
next(num) { console.log('Next num: ' + num)},
error(err) { console.log('Received an errror: ' + err)}
});

오류 처리(특히 오류 복구)에 대해서는 다음 섹션에서 자세히 설명합니다.

RxJS 라이브러리

반응형 프로그래밍(Reactive programming)은 데이터 스트림 및 변경 전파와 관련된 비동기 프로그래밍 패러다임입니다. RxJS (Reactive Extensions for JavaScript)는 비동기 또는 콜백 기반 코드를 보다 쉽게 작성할 수 있도록 Observable을 이용한 반응형 프로그래밍 라이브러리 입니다.

RxJS는 Observable 타입 구현을 제공합니다. 이 구현은 Observable이 언어(Language)의 일부 또는 브라우저가 지원할 때까지 필요합니다. 이 라이브러리는 Observable 객체를 생성하고 작업하기 위한 유틸리티 함수도 제공합니다. 이 유틸리티 함수는 다음과 같은 경우에 사용할 수 있습니다.

  • 기존 코드를 Observable의 비동기 작업으로 변환
  • 스트림의 값 순회
  • 값을 다른 타입으로 매핑
  • 스트림 필터링
  • 여러 스트림 작성

Observable 생성 함수

RxJS는 새로운 Observable을 생성하는데 사용할 수 있는 몇가지 함수를 제공합니다. 이러한 함수는 Event, Timer , Promise등으로 부터 Observable 객체를 만드는 프로세스를 단순화 할 수 있습니다.

Promise로 부터 Observable 생성

1
2
3
4
5
6
7
8
9
10
import { fromPromise } from 'rxjs';
// Promise에서 Observable을 만듭니다.
const data = fromPromise(fetch('/api/endpoint'));
// 구독하여 비동기적으로 결과를 수신 시작
data.subscribe({
next(response) { console.log(response); },
error(err) { console.error('Error: ' + err); },
complete() { console.log('Completed'); }
});

Counter로 부터 Observable 생성

1
2
3
4
5
6
7
import { interval } from 'rxjs';
// 주기적으로 값을 게시하는 Observable 생성
const secondsCounter = interval(1000);
// 구독하여 값의 게시를 시작
secondsCounter.subscribe(n =>
console.log(`It's been ${n} seconds since subscribing!`));

이벤트(Event)로 부터 Observable 생성

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { fromEvent } from 'rxjs';
const el = document.getElementById('my-element');
// 마우스 이동을 게시하는 Observable 생성
const mouseMoves = fromEvent(el, 'mousemove');
// 구독하여 마우스 이동 이벤트를 수신 시작
const subscription = mouseMoves.subscribe((evt: MouseEvent) => {
// 마우스 움직임 좌표를 로깅
console.log(`Coords: ${evt.clientX} X ${evt.clientY}`);
// 마우스가 화면의 왼쪽 상단에 있을때 구독을 중지하여 마우스 이동정보 수신을 중지한다.
if (evt.clientX < 40 && evt.clientY < 40) {
subscription.unsubscribe();
}
});

AJAX 요청을 생성하는 Observable 생성

1
2
3
4
5
6
import { ajax } from 'rxjs/ajax';
// AJAX 요청을 생성하는 Observable 생성
const apiData = ajax('/api/data');
// 구독하여 요청을 생성
apiData.subscribe(res => console.log(res.status, res.response));

연산자(Operator)

연산자는 컬렉션의 정교한 조작을 가능하게하는 Observable의 기반이 되는 함수입니다. 예를 들어, RxJS는 map(), filter(), concat()flatMap()과 같은 연산자를 정의합니다. 연산자는 설정 옵션을 사용하고 원본 Observable을 사용하는 함수를 반환합니다.

반환된 함수를 실행할 때, 연산자는 원본 Observable에서 생성된 값을 주시하여, 변환하고 변환된 값의 새로운 Observable 값을 반환합니다. 다음은 간단한 예입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
import { map } from 'rxjs/operators';
const nums = of(1, 2, 3);
const squareValues = map((val: number) => val * val);
const squaredNums = squareValues(nums);
squaredNums.subscribe(x => console.log(x));
// Logs
// 1
// 4
// 9

pipe를 사용하여 연산자를 함께 연결할 수 있습니다. pipe를 사용하면 여러 함수를 단일 함수로 결합할 수 있습니다. pipe() 함수는 결합하려는 함수를 인수로 받고, 실행될 때 작성된 순서대로 함수를 실행하는 새로운 함수를 반환합니다.

Observable에 적용되는 연산자 집합은 레시피입니다. 즉, 원하는 값을 생성하기 위한 지침 집합입니다. 레시피 자체적으로는 아무 것도하지 않습니다. 레시피를 통해 결과를 얻으려면 subscribe()를 호출 해야합니다.

아래 예제를 보겠습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { filter, map } from 'rxjs/operators';
const nums = of(1, 2, 3, 4, 5);
// Observable을 받는 함수를 생성합니다.
const squareOddVals = pipe(
filter(n => n % 2),
map(n => n * n)
);
// filter와 map 함수를 실행시키는 Observable을 생성합니다.
const squareOdd = squareOddVals(nums);
// 구독하여 결합된 함수를 실행합니다.
squareOdd.subscribe(x => console.log(x));

pipe() 함수는 RxJS Observable의 메서드이기도 하므로 아래와 같이 짧은 표현식을 사용하여 동일한 연산을 정의할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
import { filter, map } from 'rxjs/operators';
const squareOdd = of(1, 2, 3, 4, 5)
.pipe(
filter(n => n % 2 !== 0),
map(n => n * n)
);
// 구독하여 값을 받습니다.
squareOdd.subscribe(x => console.log(x));

일반 연산자

RxJS는 많은 연산자(150개 이상)를 제공하지만 몇몇 연산자만 자주 사용됩니다. 다음은 일반 연산자 목록입니다. 사용 예는 RxJS 문서의 RxJS 5 연산자 사용 예제를 참조하십시오.

Angular app의 경우 체인(chaining)을 연결하는 대신 연산자를 파이프(pipe)와 결합하는 것이 더 좋습니다. 하지만 체인(chaining)은 많은 RxJS 예제에서 사용됩니다.

영역 연산자들
생성 from , fromPromise , fromEvent , of
결합 combineLatest , concat , merge , startWith , withLatestFrom , zip
필터링 debounceTime , distinctUntilChanged , filter , take , takeUntil
변환 bufferTime , concatMap , map , mergeMap , scan , switchMap
유틸리티 tap
멀티캐스팅 share

오류 처리

RxJS는 subscribe()서 사용할 수 있는 error() 핸들러 외에도 알려진 오류를 처리 할 수있는 catchError 연산자도 제공합니다.

예를 들어, API 요청을 만들고 서버의 응답에 매핑하는 Observable이 있다고 가정해 보겠습니다. 만약 서버에서 오류를 반환하거나 값이 없다면 오류가 발생합니다. 하지만 여러분이 이 오류를 잡아서 Default 값을 제공한다면, Stream은 오류 대신 계속 진행될 것입니다.

아래는 catchError 연산자를 사용하는 예제입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { ajax } from 'rxjs/ajax';
import { map, catchError } from 'rxjs/operators';
// API에서 "response"를 반환합니다. 하지만 오류가 발생하면 빈 배열을 반환합니다.
const apiData = ajax('/api/data').pipe(
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});

실패한 Observable 재시도

catchError 연산자가 간단한 복구 경로를 제공하는 경우, retry 연산자를 사용하여 실패한 요청을 다시 시도할 수 있습니다.

catchError 연산자 앞에 retry연산자를 사용합니다. 이는 Observable 원본 소스를 다시 구독하여 오류를 유발한 모든 동작 시퀀스를 다시 실행할 수 있습니다. 그리고 만약 HTTP 요청을 포함하고 있다면 HTTP 요청을 다시 보냅니다.

아래 예제는 이전 예제를 수정하여 오류를 발생하기 전에 요청을 다시 시도합니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { ajax } from 'rxjs/ajax';
import { map, retry, catchError } from 'rxjs/operators';
const apiData = ajax('/api/data').pipe(
retry(3), // 실패하기 전에 최대 3번 다시 시도합니다.
map(res => {
if (!res.response) {
throw new Error('Value expected!');
}
return res.response;
}),
catchError(err => of([]))
);
apiData.subscribe({
next(x) { console.log('data: ', x); },
error(err) { console.log('errors already caught... will not run'); }
});

인증 요청은 사용자 작업에 의해서만 시작되어야 하므로 다시 시도하지 마십시오. 사용자가 시작하지 않은 반복 로그인 요청으로 사용자 계정이 잠길수 있습니다.

Observable에 대한 명명 규칙

Angular 응용 프로그램은 대부분 TypeScript로 작성되기 때문에 일반적으로 변수가 Observable 일때 알 수 있습니다. 비록 Angular 프레임워크가 Observable에 대한 명명 규칙을 강제하지 않더라도 여러분은 종종 이름 끝에 “$”가 붙은 변수를 볼수 있습니다.

이러한 방법은 코드를 훑어 보며 Observable 값을 찾을때 유용할 수 있습니다. 또한, Observable에서 가장 최근의 값을 저장하는 프로퍼티를 원할 때 “$”의 유무와 상관없이 동일한 이름을 사용하는 것이 편리할 수 있습니다.

예를 들어 아래와 같습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import { Component } from '@angular/core';
import { Observable } from 'rxjs';
@Component({
selector: 'app-stopwatch',
templateUrl: './stopwatch.component.html'
})
export class StopwatchComponent {
stopwatchValue: number;
stopwatchValue$: Observable<number>;
start() {
this.stopwatchValue$.subscribe(num =>
this.stopwatchValue = num
);
}
}

Angular에서 Observable

Angular는 Observable을 다양한 공통의 비동기 작업을 처리하기 위한 인터페이스로 사용합니다. 예를 들어:

  • EventEmitter 클래스는 Observable를 상속받습니다.
  • HTTP 모듈은 Observable을 사용하여 AJAX 요청 및 응답을 처리합니다.
  • Router 및 Form 모듈은 Observable을 사용하여 사용자 입력 이벤트를 수신하고 응답합니다.

Event Emitter

Angular는 @Output() 데코레이터를 통해 Component의 값을 게시할 때 사용되는 EventEmitter 클래스를 제공합니다. EventEmitterObservable을 상속하고 emit() 메서드를 추가하여 임의의 값을 방출할 수 있습니다. emit() 메서드를 호출하면 방출된 값을 구독하고 있는 observernext() 메서드로 전달합니다.

EventEmitter 문서에서 좋은 예제들을 찾을 수 있습니다. 다음은 open 및 close 이벤트를 수신하는 예제 Component 입니다.

1
<zippy (open)="onOpen($event)" (close)="onClose($event)"></zippy>

아래는 Component에 대한 정의입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component({
selector: 'zippy',
template: `
<div class="zippy">
<div (click)="toggle()">Toggle</div>
<div [hidden]="!visible">
<ng-content></ng-content>
</div>
</div>`})
export class ZippyComponent {
visible = true;
@Output() open = new EventEmitter<any>();
@Output() close = new EventEmitter<any>();
toggle() {
this.visible = !this.visible;
if (this.visible) {
this.open.emit(null);
} else {
this.close.emit(null);
}
}
}

HTTP

Angular의 HttpClient는 HTTP 메서드 호출에서 Observable을 반환합니다. 예를 들어 http.get('/api')Observable을 반환합니다. 이러한 방식은 Promise 기반의 API에 비해 몇가지 장점을 제공합니다.

  • Observables는 서버 응답을 변경하지 않습니다 (Promise.then() 호출을 통해 발생할 수 있음). 대신에 필요하다면 일련의 연산자를 이용하여 값을 변형할 수 있습니다.
  • unsubscribe() 메서드를 통해 HTTP 요청을 취소할 수 있습니다.
  • 요청에 대한 설정을 통해 진행 이벤트 업데이트를 가져올 수 있습니다.
  • 실패한 요청을 쉽게 재시도할 수 있습니다.

비동기 Pipe (Async Pipe)

AsyncPipeObservable 또는 Promise를 구독하고 방출된 최신 값을 반환합니다. 새 값이 방출(emit)되면 Pipe는 변경 사항을 검사할 Component를 표시합니다.

아래 예제는 time ObservableComponent의 뷰에 바인딩 합니다. Observable은 연속적으로 현재 시각을 뷰에 업데이트 합니다.

1
2
3
4
5
6
7
8
9
10
@Component({
selector: 'async-observable-pipe',
template: `<div><code>observable|async</code>:
Time: {{ time | async }}</div>`
})
export class AsyncObservablePipeComponent {
time = new Observable(observer =>
setInterval(() => observer.next(new Date().toString()), 1000)
);
}

Router

Router.events는 이벤트를 Observable로 제공합니다. RxJS의 filter() 연산자를 사용하여 관심있는 이벤트를 찾을수 있고 Navigation 프로세스의 이벤트 순서를 기반으로 의사 결정을 내리기 위해 구독할 수 있습니다. 예를 들어:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { Router, NavigationStart } from '@angular/router';
import { filter } from 'rxjs/operators';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable1Component implements OnInit {
navStart: Observable<NavigationStart>;
constructor(private router: Router) {
// NavigationStart만 게시하기 위한 새로운 Observable을 생성합니다.
this.navStart = router.events.pipe(
filter(evt => evt instanceof NavigationStart)
) as Observable<NavigationStart>;
}
ngOnInit() {
this.navStart.subscribe(evt => console.log('Navigation Started!'));
}
}

ActivatedRouteObservable을 사용하여 Route path 및 파라미터에 대한 정보를 가져 오는 Inject된 Router Service입니다. 예를 들어, ActivateRoute.url]에는 Route path를 보고하는 Observable을 포함하고 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import { ActivatedRoute } from '@angular/router';
@Component({
selector: 'app-routable',
templateUrl: './routable.component.html',
styleUrls: ['./routable.component.css']
})
export class Routable2Component implements OnInit {
constructor(private activatedRoute: ActivatedRoute) {}
ngOnInit() {
this.activatedRoute.url
.subscribe(url => console.log('The URL changed to: ' + url));
}
}

반응형 Form (Reactive form)

반응형 Form은 Observable을 사용하여 Form control 값을 모니터링하는 프로퍼티를 가지고 있습니다. FormControl 프로퍼티는 변경 이벤트를 발생시키는 valueChangesstatusChanges Observable을 포함합니다. Observable From control 프로퍼티를 구독하는 것은 Component 클래스 내에서 응용 프로그램 로직을 실행시키는 방법입니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import { FormGroup } from '@angular/forms';
@Component({
selector: 'my-component',
template: 'MyComponent Template'
})
export class MyComponent implements OnInit {
nameChangeLog: string[] = [];
heroForm: FormGroup;
ngOnInit() {
this.logNameChange();
}
logNameChange() {
const nameControl = this.heroForm.get('name');
nameControl.valueChanges.forEach(
(value: string) => this.nameChangeLog.push(value)
);
}
}

Observable의 실제 사용방법

다음은 Observable이 특히 유용한 도메인의 예입니다.

자동완성(Type-ahead suggestion)

Observable은 자동완성(Type-ahead)의 구현을 간단히할 수 있습니다. 일반적으로 자동완성(Type-ahead)은 일련의 별도 작업을 수행해야합니다.

  • 입력 데이터 수신합니다.
  • 값에서 공백을 제거하고, 최소 길이인지 확인합니다.
  • Debounce(모든 키 입력에 대해 API 요청을 보내지 않고 대신 키 입력이 중단될 때까지 대기).
  • 값이 동일하게 유지되면 요청을 보내지 않습니다 (예를 들어 문자를 빠르게 치고 백스 페이스 하는등).
  • 업데이트된 결과로 인해 결과가 무효화되는 경우 진행중인 AJAX 요청을 취소합니다.

이런 내용을 JavaScript로 전체 작성하면 상당히 복잡할 수 있습니다. 하지만 Observable를 사용하면 간단한 일련의 RxJS 연산자를 사용할 수 있습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import { fromEvent } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { map, filter, debounceTime, distinctUntilChanged, switchMap } from 'rxjs/operators';
const searchBox = document.getElementById('search-box');
const typeahead = fromEvent(searchBox, 'input').pipe(
map((e: KeyboardEvent) => e.target.value),
filter(text => text.length > 2),
debounceTime(10),
distinctUntilChanged(),
switchMap(() => ajax('/api/endpoint'))
);
typeahead.subscribe(data => {
// API로 부터 받은 데이터 처리
});

Exponential backoff

대규모 서비스를 설계할 때, 적절한 타임아웃 설정이 중요합니다. 실제로 어떤 요청에 대해 부하 상태 등으로 타임아웃이 발생하게 되면 그 다음 요청에 대한 응답을 조금 더 긴 시간에 랜덤으로 처리하는 방법을 엑스포넨셜 백오프(exponential backoff)라고 합니다.

엑스포넨셜 백오프는 API 요청 실패후 재시도하고, 연속적인 실패가 발생할 때마다 재시도 간격을 늘립니다. 그리고 최대 재시도 횟수이후 요청이 실패한 것으로 간주하는 기술입니다. 이러한 내용을 Promise와 AJAX 호출을 추적하는 다른 메서드로 구현하는 것은 꽤 복잡 할 수 있습니다. 하지만 Observable로 구현하는 것은 매우 쉽습니다.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import { pipe, range, timer, zip } from 'rxjs';
import { ajax } from 'rxjs/ajax';
import { retryWhen, map, mergeMap } from 'rxjs/operators';
function backoff(maxTries, ms) {
return pipe(
retryWhen(attempts => range(1, maxTries)
.pipe(
zip(attempts, (i) => i),
map(i => i * i),
mergeMap(i => timer(i * ms))
)
)
);
}
ajax('/api/endpoint')
.pipe(backoff(3, 250))
.subscribe(data => handleData(data));
function handleData(data) {
// ...
}

다른 기술들과 Observable 비교

Observable을 이용하여 Promise 대신 값을 비동기로 전달할 수 있고, Event 핸들러 대신 사용할 수 있습니다. 그리고 Observable이 여러개의 값을 전달할 수 있기 때문에 Array를 만들고 처리할 수 있는 곳에 사용할 수 있습니다.

Observable는 이러한 각각의 상황에서 대체 기술과 다소 다르게 동작하지만 중요한 이점을 제공합니다. 다음은 차이점에 대한 자세한 비교 내용입니다.

Observable과 Promise 비교

Observable은 종종 Promise와 비교됩니다. 주요 차이점은 다음과 같습니다.

  • Observable은 선언적입니다. 구독이 될 때까지 실행을 하지 않습니다. 하지만 Promise는 생성즉시 실행이 됩니다. 따라서 Observable은 결과가 필요할 때마다 실행할 수 있는 레시피를 정의하는데 유용합니다.
  • Observable 다수의 값을 제공할 수 있습니다. 하지만 Promise는 하나만 제공합니다. Observable은 시간이 지남에 따라 여러 값을 얻는데 유용할 수 있습니다.
  • Observable는 체인과 구독을 구분합니다. 하지만 Promise에는 .then() 절만 있습니다. 이것은 Observable로 시스템의 다른 부분에서 사용할 수 있고, 작업이 바로 실행 되지 않는 복잡한 변환 레시피를 만드는데 유용하게 합니다.
  • Observablesubscribe()는 오류 처리를 담당합니다. Promise는 오류를 자식(Child) Promise로 넘깁니다. 따라서 Observable은 중앙 집중적이고 예측 가능한 오류 처리에 유용합니다.

생성과 구독

  • Observable은 Consumer가 구독할 때까지 실행되지 않습니다. subscribe()는 행위에 대한 정의를 한번 실행하고, 다시 호출할 수 있습니다. 각 구독은 고유한 계산이 존재 합니다. 그리고 재구독은 값의 재계산을 유발합니다.
1
2
3
4
5
6
// 게시 작업을 선언합니다.
new Observable((observer) => { subscriber_fn });
// 실행을 시작
observable.subscribe(() => {
// observer가 알람을 처리
});
  • Promise는 즉시 한번만 실행이 됩니다. 결과 계산은 Promise가 만들어지면 시작됩니다. 작업을 재시작 할 수 있는 방법이 없습니다. 모든 then 절(구독)은 동일한 계산을 공유합니다.
1
2
3
4
5
6
// 실행 시작
new Promise((resolve, reject) => { executer_fn });
// 반환값을 처리
promise.then((value) => {
// 여기에서 결과값을 처리
});

체인(Chaining)

  • Observable은 map과 같은 변환 함수와 구독(subscription)을 구분합니다. 구독(subscription)만 subscriber 함수를 활성화하여 값 계산을 시작합니다.

    1
    observable.map((v) => 2*v);
  • Promise는 마지막 .then 절(구독과 동일)과 중간 .then 절 (map과 동일)을 구별하지 않습니다.

    1
    promise.then((v) => 2*v);

취소(Cancellation)

  • Observable 구독을 취소할 수 있습니다. 구독 취소는 수신자(listener)가 더 이상 값을 받지 못하도록 하고 subscriber 함수에 작업 취소를 알립니다.

    1
    2
    const sub = obs.subscribe(...);
    sub.unsubscribe();
  • Promise는 취소가 가능하지 않습니다.

오류 처리 (Error handling)

  • Observable 실행 오류는 subscriber의 오류 처리기로 전달되며 subscriber는 자동으로 Observable에서 구독을 취소합니다.
1
2
3
obs.subscribe(() => {
throw Error('my error');
});
  • Promise는 자식 Promise에 오류를 넘깁니다.
1
2
3
promise.then(() => {
throw Error('my error');
});

Cheat sheet

아래 코드는 ObservablePromise를 이용하여 같은 종류의 연산을 정의하는 방법을 보여줍니다.

  • 생성 (Creation)

    • Observable

      1
      2
      3
      new Observable((observer) => {
      observer.next(123);
      });
    • Promise

      1
      2
      3
      new Promise((resolve, reject) => {
      resolve(123);
      });
  • 변환 (Transform)

    • Observable

      1
      obs.map((value) => value * 2 );
    • Promise

      1
      promise.then((value) => value * 2);
  • 구독 (Subscribe)

    • Observable

      1
      2
      3
      sub = obs.subscribe((value) => {
      console.log(value)
      });
    • Promise

      1
      2
      3
      promise.then((value) => {
      console.log(value);
      });
  • 구독 취소 (Unsubscribe)

    • Observable

      1
      sub.unsubscribe();
    • Promise : Promise의 해결(resolve)로 암묵적으로 처리

Observable과 Event API 비교

ObservableEvent API를 사용하는 Event 핸들러와 매우 유사합니다. 이 두 개술은 모두 알람에 대한 핸들러를 정의하고 시간이 지남에 따라 전달되는 여러 값을 처리하는데 사용합니다. Observable을 구독하는 것은 Event Listener를 추가하는 것과 같습니다. 한가지 주요한 다른점은 Observable은 이벤트가 핸들러에 전달되기 전에 이벤트를 변환하도록 구성할 수 있다는 것입니다.

Observable을 사용하여 Event 및 비동기 작업을 처리하면 HTTP 요청과 같은 컨텍스트에서 일관성이 향상됩니다.

다음은 ObservableEvent API를 사용하여 같은 종류의 연산을 정의하는 방법을 보여주는 코드 샘플입니다.

  • 생성과 취소 (Creation & cancellation)

    • Observable

      1
      2
      3
      4
      5
      6
      7
      // Setup
      let clicks$ = fromEvent(buttonEl, ‘click’);
      // Begin listening
      let subscription = clicks$
      .subscribe(e => console.log(‘Clicked’, e))
      // Stop listening
      subscription.unsubscribe();
    • Event API

      1
      2
      3
      4
      5
      6
      7
      function handler(e) {
      console.log(‘Clicked’, e);
      }
      // Setup & begin listening
      button.addEventListener(‘click’, handler);
      // Stop listening
      button.removeEventListener(‘click’, handler);
  • 구독 (Subscription)

    • Observable

      1
      2
      3
      observable.subscribe(() => {
      // notification handlers here
      });
    • Event API

      1
      2
      3
      element.addEventListener(eventName, (event) => {
      // notification handler here
      });
  • 구성 (Configuration)

    • Observable
      Keystroke를 Listen 하지만 입력 값을 나타내는 Stream을 제공합니다.

      1
      2
      3
      fromEvent(inputEl, 'keydown').pipe(
      map(e => e.target.value)
      );
    • Event API
      구성은 제공하지 않습니다.

      1
      2
      3
      element.addEventListener(eventName, (event) => {
      // 핸들러에 도착하기 전에 전달된 이벤트를 다른 값으로 변경할 수 없습니다.
      });

Observable과 Array 비교

Observable은 시간이 지남에 따라 값을 발생시킵니다. Array는 정적인 값 집합으로 만들어집니다. 어떤 의미에서, ObservableArray가 동기적인 곳에서 비동기적입니다. 다음 예에서 ➞는 비동기 값 전달을 의미합니다.

  • Given

    • Observable

      1
      2
      obs: ➞12357
      obsB: ➞'a''b''c'
    • Array

      1
      2
      arr: [1, 2, 3, 5, 7]
      arrB: ['a', 'b', 'c']
  • concat()

    • Observable

      1
      2
      obs.concat(obsB)
      12357'a''b''c'
    • Array

      1
      2
      arr.concat(arrB)
      [1,2,3,5,7,'a','b','c']
  • filter()

    • Observable

      1
      2
      obs.filter((v) => v>3)
      57
    • Array

      1
      2
      arr.filter((v) => v>3)
      [5, 7]
  • find()

    • Observable

      1
      2
      obs.find((v) => v>3)
      5
    • Array

      1
      2
      arr.find((v) => v>3)
      5
  • findIndex()

    • Observable

      1
      2
      obs.findIndex((v) => v>3)
      3
    • Array

      1
      2
      arr.findIndex((v) => v>3)
      3
  • forEach()

    • Observable

      1
      2
      3
      4
      5
      6
      7
      8
      obs.forEach((v) => {
      console.log(v);
      })
      1
      2
      3
      5
      7
    • Array

      1
      2
      3
      4
      5
      6
      7
      8
      arr.forEach((v) => {
      console.log(v);
      })
      1
      2
      3
      5
      7
  • map()

    • Observable

      1
      2
      obs.map((v) => -v)
      -1-2-3-5-7
    • Array

      1
      2
      arr.map((v) => -v)
      [-1, -2, -3, -5, -7]
  • reduce()

    • Observable

      1
      2
      obs.scan((s,v)=> s+v, 0)
      1361118
    • Array

      1
      2
      arr.reduce((s,v) => s+v, 0)
      18

이 내용은 나중에 참고하기 위해 Angular 6.0.4 기술 문서를 공부하며 정리한 내용입니다.
의역, 오역, 직역이 있을 수 있음을 알려드립니다.
This post is a translation of this original article [https://angular.io/guide/observables, https://angular.io/guide/rx-library, https://angular.io/guide/observables-in-angular, https://angular.io/guide/comparing-observables]

참고

공유하기