등장 배경
RxSwift는 비동기 프로그래밍과 데이터의 흐름을 관리하기 위해 설계되었다.
앱에서 데이터와 상호작용은 대부분 비동기적으로 발생하며, 이는 앱의 상태를 추적하고 관리하는 것을 어렵게 만든다.
예를 들어, 서버에서 데이터를 가져와야 하는 경우, 데이터를 받아서 UI를 리프레쉬 해야 한다. 이 과정에서 동시에 여러 작업이 비동기적으로 수행되는 경우가 많다. 이 때문에 작업 간의 의존성이나 상태 등을 관리하는 것이 까다롭다.
이러한 복잡성을 관리하는 전통적인 방법 중 하나는 콜백 함수를 사용하는 것인데, 콜백 함수는 여러 비동기 작업이 서로 연결되어 있을 때, 가독성이나 에러처리의 어려움 등 여러 문제가 발생할 수 있다. 이를 해결하기 위해, RxSwift와 같은 반응형 프로그래밍 라이브러리가 등장하게 되었다.
RxSwift는 Observable이라는 개념을 도입하여 데이터의 흐름을 추상화한다.
Observable은 시간에 따라 변화하는 값의 시퀸스를 나타내며, Observer가 구독할 수 있다.
이를 통해 비동기 작업을 쉽게 조합하고, 에러를 일관되게 처리하며, 코드를 보다 명확하고 읽기 쉽게 만들 수 있다.
따라서 RxSwift는 비동기 프로그래밍의 복잡성을 줄이고, 상태를 쉽게 관리하기 위해 등장했다.
RxSwift
RxSwift는 Swift를 위한 반응형 프로그래밍 라이브러리다.
반응형 프로그래밍은 데이터 흐름과 변화를 중심으로 하는 프로그래밍 패러다임으로, 이벤트와 데이터 변화를 쉽게 다루기 위해 등장했다.
Observable
Observable은 RxSwift의 핵심 개념으로 데이터 흐름을 표현하는 기본 단위이다.
Observable을 이해하기 전에 먼저 '스트림(Stream)' 이라는 개념을 이해해야 한다. 스트림은 시간에 따라 변화하는 값의 연속을 말한다. 예를 들어, 사용자가 버튼을 여러 번 누르는 경우, 각 누름 이벤트를 시간에 따라 나열한 것을 이벤트 스트림이라 할 수 있다.
Observable은 이런 스트림을 '생산' 하는 역할을 한다.
Observable이 생성되면, 특정 이벤트(데이터 변경 등)가 발생할 때마다 새로운 데이터 항목을 방출한다. 이렇게 방출한 항목들이 모여 시간에 따른 데이터 스트림을 형성하는 것이다.
방출한 항목을 받기 위해서는 'Observer'가 'Observable' 객체에 'Subscribe(구독)'을 해야 한다.
Observable은 주로 4가지 유형의 이벤트를 방출한다.
- Next: 새로운 데이터 항목이 방출할 때 발생한다. 이 이벤트를 통해 Observable은 새로운 데이터를 전달한다.
- Error: 에러가 발생했을 때 발생한다. 이 이벤트를 통해 Observable은 에러를 전달하며, 이후 아무런 이벤트도 방출하지 않는다.
- Completed: Observable이 모든 데이터를 내보내고, 더 이상 방출할 데이터가 없을 때 발생한다. 이 이벤트는 Observable의 종료를 나타낸다.
- Disposed: Observable의 수명 주기가 종료되었을음 나타낸다.
이런 방식으로, Observable은 시간에 따라 변화하는 데이터를 표현한다.
Observable의 정의
public class Observable<Element> : ObservableType { ... }
Observable 클래스는 ObservableType 프로토콜을 준수하며, 제네릭 타입 'Element'를 받는다. 이 'Element'는 Observable이 생성하는 이벤트의 데이터 타입을 나타낸다.
Observable의 생성 연산자
1. just
'just' 메서드는 단 한개의 항목만 방출하는 Observable Sequence를 생성한다.
'just'는 'ObservableType' 프로토콜의 extension에 정의되어 있는 메서드로 'ObservableType' 프로토콜을 준수하는 타입은 모두 'just'를 사용할 수 있다.
사용 예시
let singleElementObservable = Observable.just(1)
Observable 타입의 'singleElementObservable'를 생성했으며, 'just' 메서드를 통해 1을 방출한다.
'just'는 두 가지 형태를 가지고 있는데, 두 번째 형태는 'scheduler'에 대해서 학습해야 하기 때문에 첫 번째 형태의 'just'만 깊게 파볼 것이다.
첫 번째 형태의 just
public static func just(_ element: Element) -> Observable<Element> {
Just(element: element)
}
이 메서드는 입력으로 받은 'element' 라는 값 하나를 방출하는 Observable을 생성한다.
'Just'라는 클래스의 생성자를 호출하고 반환한다.
'Just' 클래스의 정의
final private class Just<Element>: Producer<Element> {
private let element: Element
init(element: Element) {
self.element = element
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.next(self.element))
observer.on(.completed)
return Disposables.create()
}
}
'Just' 클래스는 'element' 라는 프로퍼티를 가지고 있는데, 초기화되면서 'just' 메서드의 인자를 통해 받아온 항목을 저장한다.
오버라이드 메서드 'subscribe'를 보자.
Observable의 정의에서 "방출된 항목을 받기 위해서는 Observer가 Observable에 구독을 해야한다" 라고 했다.
따라서 'subscribe' 메서드의 역할은 생성된 Observable에서 방출된 항목을 Observer에게 전달하는 것이다.
'subscribe' 메서드의 바디 부분을 보면
'observer.on(.next(self.element))' 는 'Observable'이 'just' 의 인자로 받은 항목을 Observer에게 전달하고 있는 것이다. 전달하는 데이터는 'self.element' 라고 되어 있는데 위에서 설명했듯이 'Just' 클래스의 초기화 메서드에서 전달받은 인자를 저장해두었다.
'observer.on(.completed)' 는 'Observable'이 더 이상 방출할 데이터가 없음을 'Observer' 에게 알리고 있다.
'Disposables.create()' 이 메서드의 반환 값은 'Disposable' 이며, 이에 대한 설명은 나중에 할 것입니다. 간단하게 설명하면 Observerble의 'subscribe'를 취소할 때, 사용하는 객체이다.
just를 통한 Observable 생성 정리
1) just 메서드의 인자로 방출할 항목 1이 전달된다.
2) just 메서드 내부적으로 'Just' 클래스를 생성하면서 init 메서드의 인자로 방출할 항목 1을 전달한다.
3) Just 클래스는 init 메서드에서 전달받은 1을 내부 'element' 프로퍼티에 저장한다.
4) 'singleElementObservable'이라는 Observable 생성 완료.
생성된 Observable의 이벤트를 받아보고 싶다면?
let singleElementObservable = Observable.just(1)
singleElementObservable.subscribe { event in
switch event {
case .next(let element):
print("방출된 항목: \(element)")
case .error(let error):
print("에러발생 : \(error)")
case .completed:
print("작업 완료")
}
}
"방출된 항목: 1"
"작업 완료"
Observable이 방출하는 이벤트를 받고 싶다면, 해당 이벤트를 받고 싶은 Observer를 생성하고, Observable에 subscribe를 해야 한다.
1) 해당 Observable을 subscribe 한다. 여기서 사용되는 subscribe 메서드는 'ObservableType' 프로토콜의 extension에 구현되어 있는 메서드이다. (아래서 자세하게 설명)
2) 'ObservableType' 프로토콜의 'subscribe' 메서드 내부에서
"self.asObservable().subscribe" 즉, Just 클래스의 subscribe 메서드를 호출한다.
Just 클래스의 subscribe 메서드는 'element' 프로퍼티에 저장해둔 항목을
.next 메서드를 통해 Observer에게 전달하고,
.completed 메서드를 통해 Observer에게 작업이 끝났음을 전달하고,
Disposable을 생성하여 반환하면서 메서드가 종료된다.
ObservableType 프로토콜의 Subscibe 메서드
extension ObservableType {
/**
Subscribes an event handler to an observable sequence.
- parameter on: Action to invoke for each event in the observable sequence.
- returns: Subscription object used to unsubscribe from the observable sequence.
*/
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
Observable이 방출하는 이벤트를 받기 위해서는 Observer를 생성하고, subscribe 해줘야 한다고 했다. 하지만 작업을 보면 "Observer를 생성한 적이 없는데, 어떻게 subscribe 메서드를 사용하는 것이지? Observer를 만들어주고, 해당 Observable에 구독을 해야하는거 아닌가?" 라는 의문이 들 수 있다.
생성한 'singleElementObservable' Observable은 'Observable<Int>' 타입이다.(엄밀히 말하면 'Just' 클래스 타입이며, 해당 클래스가 'ObservableType' 프로토콜을 준수하고 있음. )
따라서 해당 Observable에 'subscribe' 메서드를 사용하면 먼저 'ObservableType' 프로토콜의 'subscribe' 메서드가 호출된다.
다음 'ObservableType' 프로토콜의 subscribe 메서드 바디 부분을 보면, 'AnonymousObserver' 라는 Observer를 생성하여 'subscribe' 메서드의 인자로 전달하고 있다. (바디에서 사용되는 subscribe 메서드는 'Just' 클래스의 메서드다.)
따라서 Observer의 생성에 대한 의문이 풀린다. 'subscribe' 메서드는 자체적으로 Observer를 생성하여 넘겨주기 때문에 따로 Observer를 생성해줄 필요가 없는 것이다.
정리해보면,
1) 'just' 메서드를 사용하여 생성한 Observable에서 'subscribe' 를 사용하면 'ObservableType' 프로토콜의 'subscribe' 메서드가 호출된다.
2) 프로토콜의 'subscribe' 메서드 내에서 Observer를 생성하여 'Just' 클래스의 'subscribe' 메서드를 호출한다.
3) 'Just' 클래스의 'subscribe' 메서드는 '.next'와 '.completed' 이벤트를 방출하고 우리는 클로저를 통해 방출된 이벤트를 다룬다.
2. of
'just'는 단 1개의 항목만 방출할 수 있었지만, 'of'는 의 1개 이상의 항목을 방출할 수 있다.
이 때, 모든 인자를 모아서 한 번에 처리하는 것이 아니라 각 인자를 별도의 이벤트로 만들어 Observable이 순차적으로 방출할 수 있도록 한다. 이후 모든 요소가 방출된 후에는 .completed 이벤트를 방출하며 종료된다.
'of' 또한 'ObservableType' 프로토콜의 extension에 정의되어 있는 메서드로 'ObservableType' 프로토콜을 준수하는 타입은 모두 사용할 수 있다
사용 예시
let observable = Observable.of(1, 2, 3, 4, 5)
이 Observable은 인자로 받은 '1, 2, 3, 4, 5' 항목들을 각각의 이벤트로 방출한다.
'of' 연산자 정의
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
ObservableSequence(elements: elements, scheduler: scheduler)
}
1) elements: Element ...
Element ... 는 Swift의 가변 인자로 함수가 임의의 수의 인자를 받을 수 있도록 한다.
따라서 임의의 수의 'Element' 타입의 인자를 받을 수 있다.
2) scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance
scheduler는 옵셔널 파라미터로 호출하는 쪽에서 스케줄러를 지정하지 않으면 'CurrentThreadScheduler.instance'가 기본값으로 사용된다. scheduler는 Observable의 동작을 어떤 스레드에서 실행할 지 결정하는 역할을 한다. (스케줄러는 다음에 자세하게 다룰 것!)
3) ObservableSequence(elements: elements, scheduler: scheduler)
ObservableSequence 클래스의 인스턴스를 생성하고 반환한다.
ObservableSequence 클래스의 정의
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
fileprivate let elements: Sequence
fileprivate let scheduler: ImmediateSchedulerType
init(elements: Sequence, scheduler: ImmediateSchedulerType) {
self.elements = elements
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
ObservableSequence 클래스는 일련의 요소를 포함하고, 이를 순차적으로 방출하는 Observable이다.
init 메서드의 호출을 통해 방출할 아이템 목록을 self.elements 프로퍼티에 저장한다.
'run' 메서드는 해당 Observable이 실제로 실행되는 부분이다.
Observer와 Cancelable을 매개변수로 받아 'ObservableSequenceSink' 를 생성하고, 이 sink의 run 메서드를 호출하고, 반환된 Disposable을 저장한다. 그 후 sink와 subscription(둘 다 Disposable)을 반환한다.
이 코드를 보면 Observer에게 이벤트를 방출하는 코드는 없죠?
실제로 Observer에게 이벤트를 방출하는 코드는 'ObservableSequenceSink' 클래스의 run() 메서드에서 실행한다.
'ObservableSequenceSink' 클래스의 정의
final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
typealias Parent = ObservableSequence<Sequence>
private let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self.parent.scheduler.scheduleRecursive(self.parent.elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
먼저 'ObservableSequence' 의 run 메서드를 통해 'ObservableSequenceSink' 인스턴스가 생성되면 init 메서드에서는 부모 클래스인 'ObservableSequence' 인스턴스와 Observer, 그리고 Cancelable을 인자로 받는다. 그리고 run 메서드를 통해 실제 이벤트 발생 처리가 이루어진다.
1) run 메서드 내부에서는 부모 'ObservableSequence' 의 스케줄러를 통해 scheduleRecursive 메서드를 호출한다. 이 메서드는 ObservableSequence' 에서 저장해둔 'elements' 프로퍼티의 요소들을 순환하면서 작업을 수행한다.
2) 다음 'iterator'를 인자로 받아, 'iterator'가 다음 요소를 가지고 있는지 확인한다.
3) 만약 다음 요소가 있다면, .next 이벤트를 통해 Observer에게 요소를 전달한다. 그 후, recurse를 호출하여 다음 요소에 대해 같은 작업을 수행한다.
4) 'iterator' 에 더 이상 요소가 없다면, .cimpleted 이벤트를 전달하고, dispose 메서드를 호출하여 작업을 마무리한다.
따라서 'ObservableSequenceSink'의 run 메서드는 '방출할 항목' 즉, ObservableSequence 클래스에 있는 'elements'의 요소를 순환하며, 각 요소를 Observer에게 전달하고, 모든 요소를 순환한 후에는 .completed 이벤트를 전달하는 역할을 한다.
'ObservableSequence' 의 run 메서드가 호출되는 원리?
궁금한 점이 해당 Observable에 subscribe 메서드를 실행했을 경우, 'ObservableType' 프로토콜의 subscribe 메서드가 실행되는데, 해당 메서드 내부에는 'run' 메서드에 대한 코드가 전혀 없는데, 어떻게 호출되는거지?
'ObservableType' 프로토콜의 subscribe 메서드를 다시 보자.
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
self.asObservable()은 Observable을 생성하고, Observable에 있는 subscribe를 통해 Observer를 전달한다.
근데 여기서 생성하는 Observable은 정확하게 무슨 타입일까?
생성된 Observable의 실제 객체는 'ObservableSequence' 이며, 'ObservableSequence'는 'Producer' 클래스를 상속받고 있고, 'Producer' 클래스는 Observable 클래스를 상속받고 있기 때문에 Observable 타입으로 사용되는 것이다.
그럼 실제 객체의 'ObservableSequence' 에는 'subscribe' 메서드가 없었는데 어떻게 'run' 메서드가 실행된거죠?
'ObservableSequence' 인스턴스는 상속 관계에 의해 사실 'Producer' 타입이기도 하고, Observable<Element>타입이기도 하다. 따라서 'self.asObservable().subscribe()' 코드가 실행되면 실제 객체인 'ObservableSequence' 에 subscribe 메서드가 정의되어 있지 않다는 것을 확인하고, 부모 클래스인 'Producer' 에서 subscribe 메서드를 실행한다.
실제로 'Producer'의 subscribe 메서드를 살펴보면, run 메서드를 실행하는 것을 확인할 수 있다.
class Producer<Element>: Observable<Element> {
override init() {
super.init()
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
'Producer'의 subscribe가 실행하는 run 메서드가 바로 'ObservableSequence' 의 run 메서드인 것이다.
정리
1) of 연산자를 통해 Observable을 생성한다. 이 때 of의 인자로 받은 방출할 항목들은 모두 'ObservableSequence'의 'elements' 프로퍼티에 저장된다. 생성된 Observable은 실제로 'ObservableSequence' 타입.
2) 생성된 Observable에 대해 subscribe를 호출한다. 먼저 'ObservableType' 프로토콜의 subscribe 메서드가 실행되며, 이 메서드의 내부에서는 생성된 Observable의 subscribe 메서드를 호출한다. 하지만 'ObservableSequence' 에는 subscribe 메서드가 존재하지 않기 때문에 부모 클래스인 'Producer' 클래스의 subscribe 메서드가 실행된다.
3) 'Producer' 클래스의 subscribe 메서드는 내부적으로 'ObservableSequence' 의 run 메서드를 실행시킨다.
4) 'ObservableSequence' 의 run 메서드는 'ObservableSequenceSink' 객체를 생성하고, 이 객체의 run 메서드를 호출한다.
5) 'ObservableSequenceSink' 객체의 run 메서드가 바로 방출된 이벤트를 Observer에게 전달하는 역할이며, 'ObservableSequence'에 저장해두었던 방출할 항목 요소(elements)를 순환하면서 각 요소에 대한 이벤트를 Observer에게 방출한다.
3. from
'from' 연산자는 배열과 같은 Sequence 타입의 요소들을 하나씩 방출하는 Observable을 생성한다.
사용예시
let numbers: Observable<Int> = Observable.from([1, 2, 3])
numbers.subscribe(
onNext: { element in
print("Element:", element)
},
onError: { error in
print("Error:", error)
},
onCompleted: {
print("Completed")
},
onDisposed: {
print("Disposed")
}
).dispose()
Element: 1
Element: 2
Element: 3
Completed
Disposed
'from' 연산자를 통해 생성된 Observable은 각각의 배열안의 요소를 하나씩 방출하고, .completed와 .disposed를 방출한다.
'from' 연산자의 정의
public static func from(_ array: [Element], scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
ObservableSequence(elements: array, scheduler: scheduler)
}
'of' 연산자의 정의와 매우 비슷하죠?
파라미터로 받는 데이터가 [Element] 타입인 것을 제외하면 다른 부분은 모두 동일하다.
'from' 연산자의 방출할 항목(array)과 'of' 연산자의 방출할 항목(elements)이 동일한 Sequence 타입으로 내부적으로 돌아가는 코드도 모두 동일하며, 각 요소에 대한 이벤트를 방출한다.
4. create
Observable을 직접 생성하게 해주는 가장 기본적인 연산자이다. 이 연산자는 하나의 클로저를 인자로 받아, 이 클로저의 바디에서 이벤트(.next, completed, .error)를 방출하는 로직을 작성할 수 있다.
사용예시
let observable = Observable<Int>.create { observer in
observer.onNext(1)
observer.onNext(2)
observer.onNext(3)
observer.onCompleted()
return Disposables.create()
}
예시 코드를 보면 'Observable<Int>' 타입의 Observable을 생성하며, Observable이 subscribe 될 때, 1, 2, 3 을 순차적으로 방출한 후에 .completed 이벤트를 통해 작업을 완료한다. 이렇게 'create' 연산자를 이용한 Observable은 내가 어떤 이벤트를, 언제 방출할 것인지 제어할 수 있다.
create 연산자의 정의
public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
AnonymousObservable(subscribe)
}
'create' 메서드는 subscribe 라는 escaping 클로저를 매개변수로 받고 있다.
이 클로저는 AnyObserver<Element> 타입의 매개변수를 받고, Disposable 타입을 반환한다(예시 코드에서 Disposables.create()를 반환한 이유를 볼 수 있음).
Observable이 구독되면, 이 subscribe 클로저가 호출되는데, 이 때 Observer에게 이벤트를 방출하기 위해 AnyObsever<Element> 인스턴스가 제공된다. 따라서 클로저의 구현에서는 이 AnyObsever<Element>를 이용해 필요한 이벤트를 방출하게 된다.
메서드의 바디 부분을 보면 클로저를 인자로 받아 'AnonymousObservable' 인스턴스를 생성하고 반환한다. (AnonymousObservable이 주어진 subscribe 클로저를 이용해 이벤트를 방출하는 역할로 보임)
AnonymousObservable 클래스의 정의
final private class AnonymousObservable<Element>: Producer<Element> {
typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
let subscribeHandler: SubscribeHandler
init(_ subscribeHandler: @escaping SubscribeHandler) {
self.subscribeHandler = subscribeHandler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
AnonymousObservable은 'Producer' 클래스를 상속받는다.
'create' 메서드를 통해 전달받은 클로저를 'subscribeHandler' 프로퍼티에 저장한다.
AnonymousObservable의 'run' 메서드는 해당 옵저버블에 subscribe가 발생했을 때, Producer에 의해 호출되는 메서드로 'AnonymousObservableSink' 인스턴스를 생성하고, sink.run을 통해 실질적인 이벤트 방출을 처리한다.
AnonymousObservableSink 클래스의 정의(관련 로직만 첨부)
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = AnonymousObservable<Element>
override init(observer: Observer, cancel: Cancelable) {
super.init(observer: observer, cancel: cancel)
}
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
}
AnonymousObservable의 'run' 메서드에서 전달받은 observer와 cancel을 통해 init된다.
AnonymousObservableSink의 'run' 메서드를 보면, 수퍼 클래스인 AnonymousObservable의 'subscribeHandler' 클로저를 호출한다.
아까 위의 create 메서드의 정의에서 매개변수로 받는 클로저가 'AnyObserver<Element>' 타입을 요구하는 것을 보았죠?
따라서 클로저를 호출할 때 'AnyObserver(self)' 를 인자로 전달해준다. 여기서 self 는 'AnonymousObservableSink' 를 말하며, 이 인스턴스가 ObserverType 프로토콜을 준수하기 때문에 AnyObserver의 인자로 사용될 수 있다.
정리
1. create 연산자가 호출될 때, 사용자가 정의한 클로저를 'AnonymousObsevable' 클래스의 'subscribeHandler' 프로퍼티로 설정한다.
2. create 를 통해 생성된 Observable에 subscibe를 호출하면 'Producer' 클래스의 subscribe 메서드가 실행되고, 이 메서드에서 'AnonymousObsevable' 클래스의 run 메서드가 호출된다.(실제 옵저버블의 타입은 'Producer' 클래스가 아닌 'AnonymousObsevable' 클래스이기 때문)
3. 'AnonymousObsevable' 클래스의 run 메서드에서 'AnonymousObservableSink' 클래스의 인스턴스를 생성하여 run 메서드를 실행시킨다.('AnonymousObservableSink' 클래스의 run 메서드가 실질적으로 이벤트를 방출하는 역할)
4. 'AnonymousObservableSink' 클래스의 run 메서드에서 'AnonymousObsevable' 클래스의 'subscribeHandler' 프로퍼티에 저장되어 있던 클로저를 실행하면서 self(observer)를 인자로 전달한다.
5. 사용자가 정의한 이벤트 처리 클로저 실행.
5. empty
empty 연산자는 이름 그대로 아무런 요소도 방출하지 않고, .complted 이벤트를 방출하는 Observable을 생성해준다.
사용 예시
let disposeBag = DisposeBag()
let emptyObservable = Observable<Int>.empty()
emptyObservable
.subscribe { event in
print(event)
}
.disposed(by: disposeBag)
//completed
empty를 통해 생성된 Obsevable에 subscribe를 했을 때, empty 연산자는 아무런 항목도 방출하지 않고, completed 이벤트만 방출하기 때문에 completed만 출력된다.
empty 연산자의 정의
public static func empty() -> Observable<Element> {
EmptyProducer<Element>()
}
'EmptyProducer<Element>' 라는 'Observable<Element>' 타입의 인스턴스를 생성한다.
EmptyProducer 클래스의 정의
final private class EmptyProducer<Element>: Producer<Element> {
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.completed)
return Disposables.create()
}
}
'EmptyProducer' 클래스는 딱히 생성 연산자가 없는 것을 볼 수 있죠? subscribe 메서드를 확인해봅니다.
다른 연산자들과 같이 subscribe 메서드는 수퍼 클래스 Producer에 정의된 subscribe 메서드를 오버라이드 하고 있다. ObserverType 프로토콜을 준수하는 타입의 객체를 매개변수로 받아, Disposable 타입의 객체를 반환한다.
그리고 바디 부분을 보면, observer에게 아무런 이벤트도 방출하지 않고, 오직 completed 이벤트만을 전달하고, Disposable 객체를 생성하고, 반환한다.
그럼 empty는 언제 사용하는 것이죠?
empty 연산자는 주로 Observable의 완료를 알리고 싶을 때, 사용된다고 한다.
예를 들어, API를 호출했을 경우, 성공했지만 응답에 대한 데이터는 중요하지 않고 성공 여부만을 확인하고자 할 때, 유용하게 사용될 수 있다. (언제 사용해야 하는지는 실제 상황을 겪어봐야 정확하게 알겠네요..)
6. never
empty 연산자의 그림과 같아보이지만 약간의 차이가 있다.
empty 연산자는 .completed 이벤트를 방출해주었지만, never 연산자는 정말 아무런 이벤트도 방출하지 않는다. 이를 통해 무한히 기다리는 Observable을 생성할 수 있다.
사용 예시
let never = Observable<String>.never()
never.subscribe { event in
print("This will never be printed")
}
//아무것도 출력되지 않음.
never 연산자를 사용한 Observable은 아무런 이벤트도 방출하지 않기 때문에 결과는 아무것도 출력되지 않는다.
그럼 never 연산자는 언제 사용하는 것이죠?
never 연산자는 주로 테스트 환경에서 사용된다고 한다.
예를 들어, 비동기 작업이 끝나지 않았을 경우 해당 상황을 테스트하기 위해서 never 연산자를 사용할 수 있다. 비동기 작업이 끝나기를 기다리는 동안, 시스템이 어떻게 반응하는지를 확인하는 테스트를 작성하려는 경우에 유용하게 사용될 수 있다.
이렇게 never 연산자는 테스트 또는 디버깅과 같이 Observable이 어떤 이벤트도 방출하지 않는 상황을 시뮬레이션하고 싶을 때, 사용된다. 하지만 subscribe가 해제되기 전까지 리소스를 계속 점유하고 있기 때문에 적절하게 해제시켜주어야 한다.
7. interval
interval 정해진 간격으로 정수를 방출하는 Observable을 생성하는 연산자이다.
interval을 통해 생성된 Observable은 .completed 이벤트를 방출하지 않는다. 따라서 interval 을 사용할 때에는 메모리 누수를 방지하기 위해 불필요해졌을 경우 dispose를 통해 관리해주어야 한다.
사용 예시
let observable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe { event in
print(event)
}
//0
//1
//2
//3
...
코드를 실행하면, 첫 번째로 0이 출력되고, 1초마다 1씩 증가하는 정수가 출력된다. 이 때, 정수는 .next 이벤트로 방출되며, .completed 이벤트는 방출되지 않는다.
interval 연산자의 정의
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
Timer 클래스를 생성하며, dueTime(첫 이벤트가 발생할 시간)과 period(이벤트의 발생 간격), 그리고 해당 타이머를 실행할 스케줄러(어떤 스레드에서 실행할 것인지)를 지정한다.
Timer 클래스의 정의
final private class Timer<Element: RxAbstractInteger>: Producer<Element> {
fileprivate let scheduler: SchedulerType
fileprivate let dueTime: RxTimeInterval
fileprivate let period: RxTimeInterval?
init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
self.scheduler = scheduler
self.dueTime = dueTime
self.period = period
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
if self.period != nil {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
}
Timer 클래스는 ProducerElement>를 상속받는데, 여기서 Element는 'RxAbstractInteger' 프로토콜을 준수하는 타입이다. 'RxAbstractInteger' 은 Integer 타입의 추상화를 나타내는 프로토콜로, 주로 시간 간격이나 횟수를 나타내는 데 사용된다.
Timer 클래스의 run 메서드는 수퍼 클래스 Producer에서 정의한 run 메서드를 오버라이드한다.
Timer 클래스의 run 메서드는 실질적으로 이벤트를 방출하는 작업을 수행하지 않고, period 프로퍼티에 따라 각각 다른 Sink 객체를 생성하고, Sink 객체에서 run 메서드를 통해 이벤트를 방출한다.
1) period가 nil이 아니면, 즉 연속적인 이벤트를 발생시켜야 할 때, TimerSink를 생성하고 run 메서드를 호출한다. TimerSink는 주기적인 이벤트를 발생시키는 로직을 수행한다.
2) 반대로 period가 nil이면, 즉 한 번의 이벤트만 발생시켜야 할 때, TimerOneOffSink를 생성하고 run 메서드를 호출한다. TimerOneOffSink는 단일 이벤트를 발생시키는 로직을 수행한다.
하지만 interval 연산자를 사용할 때는 period에 nil을 할당할 수 없기 때문에 오직 TimerSink를 통한 run 메서드 로직이 실행된다!
TimerSink의 정의
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger {
typealias Parent = Timer<Observer.Element>
private let parent: Parent
private let lock = RecursiveLock()
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!) { state in
self.lock.performLocked {
self.forwardOn(.next(state))
return state &+ 1
}
}
}
}
TimerSink 클래스의 run 메서드를 살펴보자.
1) self.parent.scheduler.schedulePeriodic()
부모 클래스 'Timer' 의 scheduler를 통해 일정 주기(period)로 작업을 스케줄링하는 코드다.
작업은 self.parent.dueTime 후에 시작하고, 그 이후에는 self.parent.peiod! 간격으로 반복된다.
작업이 시작될 때, 초기값(state)은 0이며, Observer.Element 타입으로 캐스팅된다.
2) lock.performLocked
스레드의 동시 접근을 제어하고, race condition을 방지하기 위한 코드라고 하네요.
3) self.forwardOn(.next(state))
이 부분이 옵저버에게 해당 값(state)을 .next 이벤트를 통해 전달해주는 코드다.
4) return state &+ 1
다음으로 방출할 값을 계산한다. (&+ 연산자는 오버플로우를 허용하는 덧셈 연산자)
정리해보면, TimerSink 클래스는 이벤트 방출의 시작 시간과 간격을 정하고 값(state)을 더해가면서 Thread-safe하게 옵저버에게 이벤트를 전달해주는 역할이다.
Observer에 대한 의문점
코드를 뜯어보면서 생긴 의문점인데, RxSwift의 동작방식이 Observable가 방출하는 이벤트를 Observer가 Subscribe를 통해 받아 보는 것이죠? 근데 내부 동작방식을 보면, Observer가 두 번 생성되는 경우가 있다. 이에 대해 자세하게 알아보자.
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
subscribe를 통해 생성한 옵저버블을 구독하는 과정에서 ObservableType 프로토콜의 subscribe 메서드는 내부적으로 AnonymousObserver라는 옵저버를 생성하여 전달해준다. 이렇게 첫 번째 Observer가 생성된다.
다음은 Timer 클래스(Observable)에서 또 TimerSink 클래스(Observer)라는 Observer를 생성해주는데, 이렇게 두 번째 Observer가 생성된다. Observable이 방출하는 이벤트를 받는 Observer가 두 개씩이나 있다면, 도대체 어떤 Observer가 이벤트를 받는 것일까? 왜 두 개씩이나 Observer가 생성되어야 하는지 파악해보자.
RxSwift에서는 일반적으로 두 종류의 Observer가 사용된다고 한다.
첫 번째는 ObservableType의 subscribe 메서드에서 생성되는 Observer,
두 번째는 Observable이 구독될 때, 실제로 작업을 수행하는 Observer.
첫 번째 Observer는 외부에서 Observable에 구독하고자 할 때, 전달하는 클로저를 래핑한 것이다. 이 Observer는 Observable이 방출하는 이벤트를 받아서 외부로 전달하는 역할을 한다. 여기서 '외부로 전달한다' 는 것은 이 Observer가 받은 이벤트를 사용자가 작성한 클로저(on(e))를 통해 사용자에게 전달한다는 의미이다.
이게 무슨 뜻이냐면 ObservableType의 subscribe 메서드를 보면 @escaping 클로저 'on'을 매개변수로 받고 있는 것을 볼 수 있다. 따라서 우리가 생성한 옵저버블에 subscribe를 사용하면 트레일링 클로저를 통해 이벤트를 방출받았을 때 어떤 작업을 수행할 것인지 작성하고, 이 클로저를 내부적으로 옵저버블에게 전달해주고 있는 것이다.
두 번째 Observer를 보자. 예를 들어 'Timer' 클래스는 run 메서드에서 'TimerSink'를 생성했죠? TimerSink는 실질적으로 Observable인 Timer의 값(period, dueTime 등)을 통해 작업을 수행하고, 이 결과를 다시 첫 번째 Observer에게 전달하는 역할을 한다.
TimerSink 클래스의 run 메서드에서 self.forwardOn(.next(state)) 가 바로 첫 번째 Observer에게 이벤트와 값을 전달해주는 역할이다. 'on' 클로저의 매개변수로 .next와 state가 전달되며, 우리는 작성한 클로저를 통해 이 이벤트와 값을 처리해줄 수 있는 것이다.
정리하면, 첫 번째 생성되는 Observer가 이벤트 처리 클로저를 받아서 Observable 내부로 전달하고, 두 번째 Observer는 Observable이 방출하려는 이벤트 작업을 실제로 수행하면서, 결과를 다시 첫 번째 Observer를 통해 사용자에게 전달한다.
8. range
range 연산자는 특정 범위의 정수를 순서대로 방출하는 Observable을 생성해준다.
사용 예시
let observable = Observable.range(start: 1, count: 3)
observable.subscribe(onNext: { element in
print(element)
})
//1
//2
//3
start는 시작할 정수를 지정하고, count는 방출할 정수의 개수를 지정한다. 다음 onNext 클로저는 Observable이 정수를 하나씩 방출할 때마다 호출된다.
이렇게 range 연산자는 원하는 범위의 정수를 방출할 수 있는 Observable을 만들 수 있다. 이를 반복문을 대체하여 사용할 수 있고, 시퀸스의 각 요소에 대해 연산을 수행하고 결과를 방출하는 작업을 하는 경우에도 유용하다.
하지만 큰 범위의 정수를 방출하는 Observable을 생성하려는 경우 메모리 사용에 주의해야 한다. 모든 정수를 메모리에 미리 할당하지는 않지만, 범위 내의 모든 정수에 대해 동시에 연산을 수행하려는 경우, 메모리 사용량이 크게 증가할 수 있기 때문이다.(이 경우 별도의 스케줄러에서 연산을 수행할 수 있도록 하는 것을 추천)
range 연산자의 정의
public static func range(start: Element, count: Element, scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
RangeProducer<Element>(start: start, count: count, scheduler: scheduler)
}
range 연산자 역시 'RangeProducer' 인스턴스를 생성하여 반환한다.
이제 Observable의 생성 연산자가 어떻게 돌아가는지 익숙해졌죠?
Observable의 생성과 구독 프로세스에서 내부 코드가 어떻게 작동되는지 원리를 이해했기 때문에 내부 코드를 하나하나씩 정리하는 것은 이제 그만하고, 필요할 때마다 찾아보면 될 것 같다!
9. defer
defer 연산자는 Observable의 생성을 지연시키는 역할을 한다. 즉, Observable이 실제로 subscribe되기 전까지 Observable의 생성을 미룬다.
defer 연산자의 정의
public static func deferred(_ observableFactory: @escaping () throws -> Observable<Element>)
-> Observable<Element> {
Deferred(observableFactory: observableFactory)
}
defer 연산자는 'observableFactory' 라는 @escaping 클로저를 매개변수로 받고 있다.
따라서 이 연산자를 사용할 때, Observable<Element> 타입을 반환하는 클로저를 작성해주어야 한다.
'defer로 생성된 Observable'이 subscribe 될 때, '클로저(observableFactory)를 통해 생성된 Observable'이 호출된다. 또한 '클로저(observableFactory)를 통해 생성된 Observable' 의 이벤트가 방출될 때, 'defer로 생성된 Observable' 을 통해 우리에게 전달된다.
사용 예시
let disposeBag = DisposeBag()
let deferredObservable = Observable<String>.deferred {
return Observable.create { observer in
print("이벤트 방출 시작")
observer.onNext("1")
observer.onNext("2")
observer.onNext("3")
return Disposables.create()
}
}
deferredObservable
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
//이벤트 방출 시작
//1
//2
//3
defer 연산자를 통해 Observable을 생성하는데, 매개변수 'observableFactory' 에게 클로저를 생성하여 전달해주었고, 클로저는 Observable을 반환하고 있다. 그리고 'deferredObservable'에 subscribe를 실행했을 때, 우리가 작성한 클로저(Observable)이 이벤트를 방출한다.
해당 예시에서는 defer 연산자의 특성인 Observable의 생성지연에 대한 내용을 알 수 없다.
하지만 defer 연산자를 통해 생성된 Observable의 생성 시간과 subscribe된 시간을 찍어보면, 동일하게 나오는 것을 볼 수 있다. 반면에 다른 연산자들은 모두 Observable의 생성 시간과 subscribe된 시간이 다르게 찍힌다.
defer 연산자의 특성
-defer 연산자는 동적 생성이 가능하다. Observable을 생성하는 코드가 시간이 많이 걸릴 경우, 작업이 필요할 때까지 지연시키며, 리소스를 효율적으로 사용할 수 있다.
-defer 연산자는 조건에 따라 다른 Observable을 생성하여 반환할 수 있다. defer 내부의 클로저는 호출될 때마다 새로운 Observable을 생성하므로, 해당 클로저 안에서 조건부 로직을 이용하여 상황에 따라 다른 Observable을 반환할 수 있다.
-위 2번 째 특성과 비슷한데, defer 연산자는 Observable이 subscribe될 때마다 새로운 상태를 가진 Observable 생성할 수 있는데 이를 통해 관찰하는 대상의 상태가 시간에 따라 변경되는 것을 파악할 수 있다.
Reference
-https://reactivex.io/documentation/operators.html#creating
-https://okanghoon.medium.com/rxswift-1-rxswift-%EC%9E%85%EB%AC%B8-67bfdbd91969
-https://babbab2.tistory.com/188
'RxSwift' 카테고리의 다른 글
RxSwift에 관하여(Error Handling Operators) (0) | 2023.08.08 |
---|---|
RxSwift에 관하여(Combining Observables) (0) | 2023.08.08 |
RxSwift에 관하여(Disposable) (0) | 2023.07.31 |
RxSwift에 관하여(Filtering Observables) (0) | 2023.07.27 |
RxSwift에 관하여(Transforming Observables) (0) | 2023.07.23 |