Subject의 정의
Subject는 Observable과 Observer의 역할을 동시에 수행할 수 있는 객체이다.
이를 통해 데이터 스트림을 생성하고 관리할 수 있으며, 다른 Observable로부터 데이터를 받아 그대로 전달하거나 새로운 데이터를 생성할 수도 있다.
Subject의 역할
1. Observer 역할
Subject는 하나 이상의 Observable에 subscribe 할 수 있으며, Observer로서 데이터를 받을 수 있다.
또한 Subject는 에러를 처리하고, 필요한 경우 처리된 에러를 다음 Observer에 전달할 수 있다.
2. Observable 역할
Subject는 Observable처럼 데이터를 방출할 수 있으며, 이를 통해 하나 이상의 Observer에 데이터를 전달한다.
여러 Observer가 하나의 Subject에 구독할 수 있으며 이 때, 동일한 데이터 스트림을 공유할 수 있다.
3. 구독 없는 Observable
일반적으로 Observable은 subscribe 할 때까지 이벤트를 방출하지 않는다.
하지만 Subject는 구독 여부와 상관없이 이벤트를 방출할 수 있고, 새로운 Observer가 중간에 들어와도 이전에 방출된 데이터를 받을 수 없다. 이를 통해 여러 Observer가 동일한 데이터 스트림을 공유할 수 있다.
Subject와 Observable의 차이
Subject는 Observable과 Observer의 역할을 모두 할 수 있는 Observable이다.
따라서 Observable이나 Subject 모두 subscribe를 할 수 있다.
다만 subscribe의 차이가 있다면 Subject는 multicast 방식이기 때문에 여러 개의 Observer를 subscribe 할 수 있다.
반면에 Observable은 unicast 방식이기 때문에 Observer 하나만을 subscribe 할 수 있다.
Subject와 Observable의 차이를 보여주는 예시
먼저, 일반 Observable을 사용하는 경우를 살펴보자.
let randomNumGenerator1 = Observable<Int>.create{ observer in
observer.onNext(Int.random(in: 0 ..< 100))
return Disposables.create()
}
randomNumGenerator1.subscribe(onNext: { (element) in
print("observer 1 : \(element)")
})
randomNumGenerator1.subscribe(onNext: { (element) in
print("observer 2 : \(element)")
})
//observer 1 : 54
//observer 2 : 69
'randomNumGenerator1' 옵저버블은 .next 이벤트를 통해 옵저버에게 0부터 99까지의 정수 중 하나를 랜덤으로 뽑아 전달한다. 그리고 이를 subscribe하는 2개의 옵저버는 각각 다른 정수를 받게 된다.
이는 두 개의 옵저버가 모두 같은 Observable을 구독하고 있지만, 각각 독립된 스트림을 받기 때문이다.
따라서 동일한 Observable 구독을 통해 생성된 두 개의 Observer라고 해도 Observable이 각각 이벤트를 방출하기 때문에 서로 다른 스트림을 가지게 된다.
Subject의 경우
let randomNumGenerator2 = BehaviorSubject(value: 0)
randomNumGenerator2.onNext(Int.random(in: 0..<100))
randomNumGenerator2.subscribe(onNext: { (element) in
print("observer subject 1 : \(element)")
})
randomNumGenerator2.subscribe(onNext: { (element) in
print("observer subject 2 : \(element)")
})
//observer subject 1 : 92
//observer subject 2 : 92
Observable을 구독했을 경우와는 다르게 Subject를 구독한 Observer들은 모두 동일한 값을 받는다.
이것이 바로 여러 Observer가 동일한 스트림을 받는 것이고, Subject의 multicast 방식이 이를 뜻한다.
Subject의 종류
Subject의 종류는 BehaviorSubject, ReplaySubject, PublishSubject, AsyncSubject 등이 있다.
이에 대해 하나씩 알아보자.
BehaviorSubject
BehaviorSubject 는 초기화할 때 초기값을 제공해야 하며, 이후에 'onNext' 로 방출된 가장 최근의 값을 Observer에게 전달한다. 따라서 새로운 Observer가 구독하면 가장 최근에 방출한 요소부터 시작하게 된다.
BehaviorSubject의 사용예시
let behaviorSubject = BehaviorSubject(value: "초기값") //초기값 지정
let observer1 = behaviorSubject.subscribe { print("Observer 1: \($0)") }
behaviorSubject.onNext("두 번째 값")
let observer2 = behaviorSubject.subscribe { print("Observer 2: \($0)") }
behaviorSubject.onError(MyError.someError)
let observer3 = behaviorSubject.subscribe { print("Observer 3: \($0)") }
//Observer 1: 초기값
//Observer 1: 두 번째 값
//Observer 2: 두 번째 값
//Observer 1: error(MyError.someError)
//Observer 2: error(MyError.someError)
//Observer 3: error(MyError.someError)
이렇게 BehaviorSubject는 초기값을 받아 가장 처음 이벤트 요소로 방출하며,
가장 최근 이벤트는 저장해두었다가 중간에 subscribe를 하는 새로운 Observer에게는 최근 이벤트를 방출한다.
BehaviorSubject의 메커니즘
BehaviorSubject가 구독한 옵저버들에게 동일한 스트림의 이벤트를 방출하고, 최신 이벤트를 저장해두었다가 방출하는 이 메커니즘에 대해서 알아보자.
BehaviorSubject의 정의
public final class BehaviorSubject<Element>
: Observable<Element>
, SubjectType
, ObserverType
, SynchronizedUnsubscribeType
, Cancelable { ... }
BehaviorSubject는 Observable<Element> 을 상속받고 있는데, 이는 'BehaviorSubject' 가 Observable의 모든 기능을 가지고 있음을 의미하며, 다른 Observer 들이 구독할 수 있도록 한다.
또한 BehaviorSubject는 ObserverType 프로토콜을 준수하고 있는데, 이는 BehaviorSubject 가 Observer로 작동하고, 다른 Observable의 이벤트를 받을 수 있음을 의미한다.
BehaviorSubject의 onNext() 메서드
/// Convenience API extensions to provide alternate next, error, completed events
extension ObserverType {
/// Convenience method equivalent to `on(.next(element: Element))`
///
/// - parameter element: Next element to send to observer(s)
public func onNext(_ element: Element) {
self.on(.next(element))
}
/// Convenience method equivalent to `on(.completed)`
public func onCompleted() {
self.on(.completed)
}
/// Convenience method equivalent to `on(.error(Swift.Error))`
/// - parameter error: Swift.Error to send to observer(s)
public func onError(_ error: Swift.Error) {
self.on(.error(error))
}
}
우리가 'BehaviorSubject' 의 사용예시에서 onNext() 메서드를 통해 옵저버들에게 요소를 방출하는 것을 볼 수 있었다.
그래서 찾아보니 위와 같이 'ObserverType' 프로토콜의 extension 메서드에 정의되어 있었다.
'onNext' 는 다음 이벤트를 통해 방출할 요소를 받고, self.on(.next(element)) 을 통해 .next 이벤트와 함께 요소를 전달한다. self.on() 메서드가 어떤 작업을 수행하는지 찾아보자.
self.on() 메서드의 정의
public protocol ObserverType {
/// The type of elements in sequence that observer can observe.
associatedtype Element
/// Notify observer about sequence event.
///
/// - parameter event: Event that occurred.
func on(_ event: Event<Element>)
}
on 메서드는 'ObserverType' 프로토콜의 메서드로
'ObserverType' 프로토콜을 준수하는 타입은 모두 이 on 메서드를 정의해야 한다.
따라서 BehaviorSubject의 내부에도 on 메서드를 정의해둔 부분이 있을 것임.
BehaviorSubject의 on() 메서드
/// Notifies all subscribed observers about next event.
///
/// - parameter event: Event to send to the observers.
public func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
dispatch(self.synchronized_on(event), event)
}
위 코드가 'BehaviorSubject' 가 OvserverType 프로토콜을 채택하면서 정의하는 on 메서드이다.
#if Debug ~ #endif 부분은 디버그 빌드에서만 실행되는 코드이기 때문에 우리는 dispatch(...) 부분만 확인하면 된다.
먼저 dispatch() 함수의 역할은 'BehaviorSubject' 에 구독된 모든 Observer에게 'BehaviorSubject' 에서 발생한 이벤트를 전달하는 것이다.
dispatch() 함수는 두 개의 매개변수를 받고 있는데,
두 번째 매개변수는 방출된 event를 전달하고 있고, 첫 번째 매개변수를 확인해보자.
synchronized_on 메서드의 정의
func synchronized_on(_ event: Event<Element>) -> Observers {
self.lock.lock(); defer { self.lock.unlock() }
if self.stoppedEvent != nil || self.isDisposed {
return Observers()
}
switch event {
case .next(let element):
self.element = element
case .error, .completed:
self.stoppedEvent = event
}
return self.observers
}
-self.lock.lock()
stnchronized_on 함수가 동시에 여러 스레드에서 호출되는 경우에도 안전하게 작동하도록 해주는 부분.(잘 모르니 넘어가도댐!)
-if self.stoppedEvent != nil || self.isDisposed { ... }
이 부분은 객체가 더 이상 이벤트를 처리할 필요가 없는 상태인지를 확인하는 것이다.
-switch event { ... }
.next 이벤트가 들어오면 방출된 요소를 self.element에 저장하는데, self.element는 'BehaviorSubject' 가 초기화될 때, 입력되는 초기값이 저장되어 있다.(해당 프로퍼티가 최신 요소를 저장하고 있음)
그리고 .error나 .completed 이벤트가 들어오면 self.stoppedEvent에 해당 이벤트를 저장한다.
-return self.observers
이는 현재 구독하고 있는 모든 옵저버를 반환하는 것이다.
이를 'dispatch' 함수에서 받아 각 옵저버에게 이벤트를 전달함.
따라서 'BehaviorSubject' 가 각 Observer에게 동일한 이벤트를 방출하는 과정에서
synchronized_on 함수는 'BehaviorSubject' 의 상태를 관리하고, 현재 구독중인 모든 Observer를 반환하는 역할을 수행한다. 새로운 .next 이벤트가 방출되면 항목을 저장하고, .error나 .completed 이벤트를 받으면 중지 상태를 저장한다.
그리고 이미 중지되어야 하는 상태를 체크하고, 이벤트를 처리할지, 말지를 결정한다.
다시 'dispatch' 함수는 'synchronized_on' 함수가 반환하는 옵저버들과 방출된 이벤트를 가지고
각 옵저버들에게 방출된 이벤트를 전달하는 역할을 수행한다. (이를 위해 정의를 찾아봤는데 몽말인지 몰라서 생략..)
BehaviorSubject의 onNext 동작 메커니즘 정리
1. onNext 호출
'BehaviorSubject' 가 onNext 이벤트를 방출한다.
이 메서드는 .next 이벤트와 방출 요소를 'on' 메서드에 전달한다.
2. on 메서드 처리
on 메서드는 'dispatch' 함수를 호출하면서,
'synchronized_on(event)' 와 'event' 를 매개변수로 전달한다.
3. synchronized_on 메서드 처리
'synchronized_on' 메서드는 현재 객체의 상태를 확인하고,
이벤트를 방출할 수 없는 상태라면, 빈 옵저버 목록인 'Observers()' 를 반환하여 'dispatch' 단계에서 아무런 동작도 수행하지 않게 된다.
반면에 이벤트를 방출할 수 있는 상태라면, 방출된 이벤트가 어떤 이벤트인지 확인한다.
.next 이벤트라면 현재 요소를 업데이트하고, .error, .completed 이벤트라면 종료 이벤트 상태를 설정한다.
그리고 현재 구독중인 observer 목록을 반환한다.
4. dispatch 함수 처리
'synchronized_on' 에서 반환된 observer 목록과 이벤트를 받고,
각 observer에 해당 이벤트를 전달한다. 이를 통해 모든 옵저버는 동일한 스트림의 이벤트를 받을 수 있게된다.
그럼 BehaviorSubject 를 구독하는 각 Obsever는 어떻게 저장되는 것일까?
먼저 해당 객체에 subscribe 메서드가 호출되면, 다음과 같은 작업이 실행된다.
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self.lock.performLocked { self.synchronized_subscribe(observer) }
}
먼저 여러 스레드에서 동시에 구독 작업을 호출하려고 하면 문제가 발생할 수 있기 때문에 동시 접근을 제어하기 위해 락 작업을 한다. 그리고 'self.synchronized_subscribe(observer)' 에서 BehaviorSubject를 구독한 observer 를 observer 목록에 저장한다.
synchronized_subscribe 정의
func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if self.isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
if let stoppedEvent = self.stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
let key = self.observers.insert(observer.on)
observer.on(.next(self.element))
return SubscriptionDisposable(owner: self, key: key)
}
이 메서드에서는 2번의 상태 확인 후,
Observer 목록에 해당 Observer를 insert 하고,
해당 Observer에게 가장 최신 요소를 방출한다.
isDisposed의 상태확인을 통해 이미 'BehaviorSubject' 가 해제된 상태인지,
stoppedEvent 상태확인을 통해 'BehaviorSubject' 가 에러 또는 완료된 이벤트인지를 확인하고
일치하는 경우, 리소스를 해제하는 작업을 수행한다.
let key = self.observers.insert(observer.on)
observer.on(.next(self.element))
바로 이 부분이 Observer 목록에 observer를 저장하고, observer에게 최신 요소를 방출하는 로직이다.
저장하는 부분을 보면 observer가 아닌 observer.on 을 전달하고 있는데, 이것은 'BehaviorSubject' 에서 이벤트를 방출하면 그 이벤트가 observer.on 으로 전달되도록 하는 것이다. 따라서 BehaviorSubject 가 나중에 이벤트를 방출할 때마다 해당 이벤트는 목록에 있는 observer.on 에게 전달된다.
Subject의 유용성
1. 멀티 캐스팅
하나의 Observable 시퀸스를 여러 Observer에게 동시에 전달할 수 있다.
이를 통해 데이터를 한 번만 처리하면서 여러 Observer에게 전달할 수 있어서 효율성이 높아질 수 있다.
2. 상태 유지
오늘 배운 BehaviorSubject 처럼 특별한 Subject는 최근에 발생한 이벤트나 특정 수의 이벤트를 저장하고, 나중에 subscribe 하는 Observer에게 전달할 수 있다. 이를 통해 새로운 구독자도 이전의 이벤트를 받을 수 있으므로 상태 유지에 유용하다.
3. 에러 처리
Subject를 통해 에러를 중아에서 한 번만 처리하고, 여러 Observer에게 동일한 에러처리 로직을 적용할 수 있다.
정리
오늘 배운 것을 정리해보면 이렇다.
1. Subject의 정의와 역할
2. Subject와 Observable의 차이
3. BehaviorSubject의 정의와 사용예시
4. BehaviorSubject의 동작 메커니즘
5. BehaviorSubject가 구독하는 Observer를 목록에 저장하는 방법.
6. Subject의 유용성
BehaviorSubject의 동작 메커니즘 부분은 BehaviorSubject만의 동작이 아니라
Subject의 각 종류에 따라 조금씩 다르겠지만, Subject가 기본적으로 동작하는 방식이다.
모든 정보가 정확한지는 모르겠지만, 코드를 뜯어보면서 Subject에 대해서 더 깊게 이해할 수 있었다.
Reference
-https://sujinnaljin.medium.com/rxswift-subject-99b401e5d2e5
-https://reactivex.io/documentation/subject.html
'RxSwift' 카테고리의 다른 글
RxSwift에 관하여(Scheduler) (0) | 2023.08.11 |
---|---|
RxSwift에 관하여(PublishSubject, ReplaySubject, AsyncSubject) (0) | 2023.08.10 |
RxSwift에 관하여(Error Handling Operators) (0) | 2023.08.08 |
RxSwift에 관하여(Combining Observables) (0) | 2023.08.08 |
RxSwift에 관하여(Disposable) (0) | 2023.07.31 |