Scheduler
-Scheduler는 반응형 프로그래밍에서 Observable 연산자의 작업을 수행하는 스레드를 제어할 수 있게 해준다.
여러 스레드에서 동시에 작업을 수행할 수 있으므로 성능을 향상시킬 수 있다.
-기본적으로 Observable은 subscribe 메서드가 호출된 스레드에서 작동된다. 따라서 별도의 지시가 없으면 모든 작업은 동일한 스레드에서 이루어진다.
-특정 Observable 연산자는 Scheduler를 파라미터로 받아 해당 Scheduler에서 작업을 수행하도록 할 수 있다.
이를 통해 해당 연산자가 동작하는 스레드를 통제할 수 있다.
-'subscribeOn' 연산자는 Observable이 작동하기 시작하는 스레드를 지정한다. 연산자 체인의 어느 위치에서 호출되든 상관없이 첫 작업부터 지정한 스레드에서 실행된다.
-'ObserveOn' 연산자는 Observable이 옵저버에게 알림을 보내는 스레드를 지정한다. 연산자 체인의 중간에서 여러 번 호출될 수 있으며, 호출된 위치 아래에서의 알림 전달 스레드를 변경한다.
-'subscribeOn' 과 'ObserveOn' 를 사용하면 연산자 체인의 각 부분에서 작동할 스레드를 세밀하게 제어할 수 있다. 이를 통해 특정 작업은 백그라운드에서 수행하고, 특정 알림은 메인 스레드에서 수신하는 등 복잡한 동작 흐름을 만들 수 있다.
'subscribeOn'과 'ObserveOn' 사용예시
let backgroundScheduler = ConcurrentDispatchQueueScheduler(qos: .background)
let disposeBag = DisposeBag()
Observable<Int>.create { observer in
print("작업 스레드: \(Thread.current)")
observer.onNext(42)
observer.onCompleted()
return Disposables.create()
}
.subscribe(on: backgroundScheduler) //작업 스레드 지정
.observe(on: MainScheduler.instance) //메인 스레드에서 알림 수신
.subscribe(onNext: { value in
print("결과 스레드: \(Thread.current), 값: \(value)")
})
.disposed(by: disposeBag)
//작업 스레드: <NSThread: 0x600003564000>{number = 6, name = (null)}
//결과 스레드: <_NSMainThread: 0x600003538400>{number = 1, name = main}, 값: 42
위 코드를 보면, Observable이 'backgroundScheduler' 에서 작업을 시작하게 하고, 결과는 메인 스레드에서 받아 처리하는 것을 알 수 있다.
'subscribeOn' 은 Observable이 작동하기 시작하는 작업의 스레드를 지정해준다고 했다. 따라서 Observable이 생성되고, 구독한 Observer에게 이벤트를 방출하는 작업까지의 부분을 담당한다.
'ObserveOn' 의 경우는 Observable이 방출한 이벤트를 Observer가 받았을 때 작업의 스레드를 지정해준다. 따라서 'subscribe' 의 클로저 작업을 원하는 지정한 스레드에서 처리하는 것을 볼 수 있다.
이제 'subscribeOn' 과 'observeOn' 은 각각 내부적으로 어떻게 동작하는지 파악해보자.
subscribeOn의 정의
@available(*, deprecated, renamed: "subscribe(on:)")
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
-> Observable<Element> {
subscribe(on: scheduler)
}
먼저 'subscribeOn' 메서드는 deprecated 상태고, 'subscribe(on:)' 메서드로 변경되어 사용한다.
public func subscribe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
SubscribeOn(source: self, scheduler: scheduler)
}
메서드의 바디를 보면, 'SubscribeOn' 객체의 인스턴스를 생성하고 있다.
매개변수 'source' 에는 self(원본 ObservableType) 을 전달하며,
매개변수 'scheduler' 는 구독과 구독 해제 로직을 수행할 스케줄러를 나타낸다.
따라서 원본 시퀸스의 구독과 구독 해제 로직을 지정된 스케줄러에서 실행되도록 한다.
'subscribeOn' 의 정의에 나와있는 주석을 읽어보면, subscribeOn의 역할에 대해 나와있다.
1. 'Wraps the source sequence in order to run its subscription and unsubscription logic on the specified scheduler.'
'subscribeOn' 은 원본 시퀸스를 감싸 구독과 구독 해제의 로직을 지정된 스케줄러에서 실행한다.
2. This only performs the side-effects of subscription and unsubscription on the specified scheduler
'subscribeOn' 메서드는 오직 구독과 구독 해제의 'side-effects' 만 지정된 스케줄러에서 실행한다.
3. In order to invoke observer callbacks on a `scheduler`, use `observeOn`
Observer의 콜백 메서드를 특정 스케줄러에서 실행하려면 'observeOn' 을 사용해라!
여기서 헷갈릴 수 있는 점!
'subscribeOn' 메서드는 구독과 구독 해제의 로직을 지정된 스케줄러에서 한다고 나와있다.
의문점이 드는 부분은 'subscribeOn' 메서드의 구독 로직을 수행한다는 것이 'subscribe' 메서드의 작업을 말하는 것인가?
사용 예시에서도 보았듯이, subscribeOn 메서드가 스케줄하는 작업은 Observable이 생성되는 시점에서 Observer 에게 방출할 이벤트를 정하는 로직이다. 그리고 이 로직은 Observable에 'subscribe' 발생했을 때, 해당 Observable 내부에서 실행되는 작업이기 때문에 정의에 나와있는 구독 로직을 수행한다고 하는 것으로 보인다.(내부 동작을 알고 있으면 이해할 수 있음.)
SubscribeOn 정의
final private class SubscribeOn<Ob: ObservableType>: Producer<Ob.Element> {
let source: Ob
let scheduler: ImmediateSchedulerType
init(source: Ob, scheduler: ImmediateSchedulerType) {
self.source = source
self.scheduler = scheduler
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Ob.Element {
let sink = SubscribeOnSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
전달받은 'source(원본 옵저버블)' 와 'scheduler' 를 내부 프로퍼티에 저장하고,
원본 Observable에 'subscribe' 메서드가 호출되면, run 메서드가 호출된다.
run 메서드에서는 'SubscribeOnSink' 객체를 생성하고, run() 메서드를 호출한다.
SubscribeOnSink 정의
final private class SubscribeOnSink<Ob: ObservableType, Observer: ObserverType>: Sink<Observer>, ObserverType where Ob.Element == Observer.Element {
typealias Element = Observer.Element
typealias Parent = SubscribeOn<Ob>
let parent: Parent
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
self.forwardOn(event)
if event.isStopEvent {
self.dispose()
}
}
func run() -> Disposable {
let disposeEverything = SerialDisposable()
let cancelSchedule = SingleAssignmentDisposable()
disposeEverything.disposable = cancelSchedule
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
cancelSchedule.setDisposable(disposeSchedule)
return disposeEverything
}
}
'SubscribeOnSink' 의 run 메서드를 보자.
'SerialDisposable', 'SingleAssignmentDisposable' 의 인스턴스를 생성한다. (각 객체가 무슨 역할을 수행하는지 정확하게는 모르겠지만, Disposable 인 것을 보아 작업에 사용되는 리소스를 해제해주는 역할인 것 같음)
'let disposeSchedule = self.parent.scheduler.schedule(()) ...'
주목해야 할 코드는 이 부분이다.
해당 코드는 'subscribeOn' 연산자를 사용하면서 할당한 스케줄러를 통해 'schedule' 메서드를 호출한다.
사용 예시에서는 'ConcurrentDispatchQueueScheduler' 를 사용했으니, 해당 객체의 schedule 메서드를 보자.
ConcurrentDispatchQueueScheduler의 schedule 메서드
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
self.configuration.schedule(state, action: action)
}
매개변수 'state' 는 이름에 걸맞게 작업을 실행할 때, 필요한 상태를 나타내는 것으로 보인다. 하지만 예제에서는 필요하지 않기 때문에 빈 튜플 ()을 전달하고 있다. 또한 매개변수 'action' 은 스케줄러에서 실행할 클로저를 전달받고 있다.
그리고 'configuration' 의 'schedule' 메서드를 호출한다.
여기서 configuration의 정의는 다음과 같다.
let configuration: DispatchQueueConfiguration
DispatchQueueConfiguration 의 schedule 메서드
struct DispatchQueueConfiguration {
let queue: DispatchQueue
let leeway: DispatchTimeInterval
}
extension DispatchQueueConfiguration {
func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
let cancel = SingleAssignmentDisposable()
self.queue.async {
if cancel.isDisposed {
return
}
cancel.setDisposable(action(state))
}
return cancel
}
...
}
'DispatchQueueConfiguration' 의 정의와
'DispatchQueueConfiguration' 의 schedule 메서드를 가져왔다.
schedule 메서드를 보면, 'action' 작업을 해당 큐(DispatchQueue)의 비동기 작업으로 스케줄링한다.
그리고 이제 다시 'SubscribeOnSink' 의 run 메서드에서 해당 작업을 수행하는 코드를 보자.
let disposeSchedule = self.parent.scheduler.schedule(()) { _ -> Disposable in
let subscription = self.parent.source.subscribe(self)
disposeEverything.disposable = ScheduledDisposable(scheduler: self.parent.scheduler, disposable: subscription)
return Disposables.create()
}
scheduler 메서드의 트레일링 클로저 부분은 'action' 에 전달되는 클로저이며,
내부에서는 parent.source(원본 Observable)에게 'subscribe' 작업을 실행한다.
'subscribeOn' 의 'ConcurrentDispatchQueueScheduler' 작업 스케줄링은 이렇게 수행된다!
subscribeOn 내부 동작 정리
주기: Observable생성 -> subscribeOn을 통한 새로운 Observable -> subscribe 메서드 호출
1. 'SubscribeOn' 의 생성
'subscribeOn' 메서드를 통해 원본 Observable을 저장하고 있는 'SubscribeOn' 객체를 생성한다.
2. subscribe 호출
subscribe 메서드는 'SubscribeOn' 객체의 run 메서드를 호출하게 된다.
3. 'SubscribeOn' 객체의 run 메서드
'SubscribeOn' 객체의 run 메서드는
'SubscribeOnSink' 객체의 생성과 동시에 run 메서드를 호출한다.
4. SubscribeOnSink의 run 메서드
'SubscribeOnSink' 의 run 메서드 내에서는 작업이 마무리되면 사용한 리소스를 정리해 줄 몇몇 객체를 생성하고,
사용할 스케줄러의 schedule 메서드를 통해 작업을 스케줄링 한다.
여기서 작업이란 원본 Observable에 subscribe 하는 것을 말하며,
subscribe 메서드는 원본 Observable에서 Observer에게 방출할 이벤트 작업을 수행한다.
5. ConcurrentDispatchQueueScheduler의 scheduler 메서드
해당 메서드는 매개변수로 받은 state(작업을 위해 필요한 상태, 여기선 안쓰임)와 action(스케줄링 할 작업)을
DispatchQueueConfiguration의 scheduler 메서드를 호출하면서 전달해준다.
6. DispatchQueueConfiguration의 scheduler 메서드
해당 메서드는 action(스케줄링 할 작업)을 DispatchQueue의 async 메서드를 사용하여 비동기 작업을 수행한다.
observeOn의 정의
public func observe(on scheduler: ImmediateSchedulerType)
-> Observable<Element> {
guard let serialScheduler = scheduler as? SerialDispatchQueueScheduler else {
return ObserveOn(source: self.asObservable(), scheduler: scheduler)
}
return ObserveOnSerialDispatchQueue(source: self.asObservable(),
scheduler: serialScheduler)
}
'subscribeOn' 과 동일하게 'observeOn' 은 scheduler를 인자로 받고, Observable<Element> 타입을 반환하는 것을 볼 수 있다.
메서드 내부에서는 받아온 scheduler가 'SerialDispatchQueueScheduler' 타입인지를 확인한다.
맞다면 ObserveOn 객체의 인스턴스를 생성하여 반환하고,
아니라면 ObserveOnSerialDispatchQueue 객체의 인스턴스를 생성하여 반환한다.
먼저 'SerialDispatchQueueScheduler' 앞에 'Serial' 이 들어가는 것으로 보아, 작업을 순차적으로 실행하는 것을 도와주는 객체인 것 같다. 그리고 사용 예시에서 'observeOn' 에 전달한 스케줄러는 'MainScheduler' 였다.
'MainScheduler' 의 특성 상 메인 스레드에서 작업을 순차적으로 실행하기 때문에 'MainScheduler' 클래스는 'SerialDispatchQueueScheduler' 를 상속받고 있는 것을 볼 수 있다.
observeOn의 주석 설명
1. Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
observeOn 연산자는 원본 시퀸스를 감싸며, 지정된 스케줄러에서 옵저버의 콜백을 실행한다.
2. This only invokes observer callbacks on a `scheduler`. In case the subscription and/or unsubscription actions have side-effects that require to be run on a scheduler, use `subscribeOn
이 연산자는 오로지 옵저버의 콜백을 지정된 스케줄러에서 호출한다.
만약 구독 및 구독 취소 작업에 스케줄러에서 실행해야 할 side-effects가 있는 경우 'subscribeOn' 을 사용해라.
ObserveOn 클래스의 정의
final private class ObserveOn<Element>: Producer<Element> {
let scheduler: ImmediateSchedulerType
let source: Observable<Element>
init(source: Observable<Element>, scheduler: ImmediateSchedulerType) {
self.scheduler = scheduler
self.source = source
#if TRACE_RESOURCES
_ = Resources.incrementTotal()
#endif
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObserveOnSink(scheduler: self.scheduler, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
#if TRACE_RESOURCES
deinit {
_ = Resources.decrementTotal()
}
#endif
}
내부적으로 원본 Observable과 scheduler를 저장하고 있으며,
구독 작업이 실행되면 run 메서드가 호출되면서, 'ObserveOnSink' 를 생성한다.
그리고 원본 Observable을 subscribe 하면서 생성한 'ObserveOnSink' 를 전달한다.
이제 어떤 일이 일어나냐면, 원본 Observable에서 subscribe 메서드의 작업을 수행하면서,
전달된 observer의 on 메서드를 호출하고, 'ObserveOnSink'가 상속받고 있는
'ObserverBase' 의 on 메서드는 'ObserveOnSink' 의 'onCore' 메서드를 호출하게 된다.
ObserveOnSink 클래스의 onCore 메서드
override func onCore(_ event: Event<Element>) {
let shouldStart = self.lock.performLocked { () -> Bool in
self.queue.enqueue(event)
switch self.state {
case .stopped:
self.state = .running
return true
case .running:
return false
}
}
if shouldStart {
self.scheduleDisposable.disposable = self.scheduler.scheduleRecursive((), action: self.run)
}
}
onCore 메서드에서는 먼저 'queue.enqueue(event)' 를 통해 받은 이벤트 작업을 큐에 추가해둔다.
그리고 'state' 를 통해 상태를 체크하는데 .stopped 상태라면 큐를 처리하기 위해 .running 상태로 변경하고, 'true' 를 반환한다. .running 상태라면 이미 큐 처리를 수행 중이기 때문에 false를 반환한다.
반환된 타입(shouldStart)을 확인하고, 큐를 처리하기 위한 상태라면, scheduler의 'schedulerRecursive()' 메서드를 호출한다. 이 때, ObserveOnSink의 run 메서드가 전달된다.
ObserveOnSink의 run 메서드는 Observable이 전달한 이벤트를 저장하고 있는 'queue' 에서 이벤트가 있는 지를 체크하고, Observer에게 이를 전달해주는 역할을 수행한다. 그리고 이 작업은 'schedulerRecursive' 를 통해 지정한 스케줄러에서 수행되도록 한다.
ObserveOnSink의 run 메서드
func run(_ state: (), _ recurse: (()) -> Void) {
let (nextEvent, observer) = self.lock.performLocked { () -> (Event<Element>?, Observer) in
if !self.queue.isEmpty {
return (self.queue.dequeue(), self.observer)
}
else {
self.state = .stopped
return (nil, self.observer)
}
}
if let nextEvent = nextEvent, !self.cancel.isDisposed {
observer.on(nextEvent)
if nextEvent.isStopEvent {
self.dispose()
}
}
else {
return
}
let shouldContinue = self.shouldContinue_synchronized()
if shouldContinue {
recurse(())
}
}
ObserveOnSink의 run 메서드는 먼저 큐가 비어있는 지를 체크한다.
비어있지 않다면(처리해야 할 작업이 존재), 'dequeue()' 메서드를 통해 이벤트를 꺼내고, 반환한다.
반면에 큐가 비어있다면, 상태를 .stopped 로 설정하고, nil 이벤트를 반환한다.
다음은 꺼낸 이벤트가 존재하는지를 확인하고, 취소되지 않았다면,
observe의 on 메서드를 호출하여 이벤트를 처리한다. 만약 이벤트가 종료 이벤트(.completed 또는 .errer)인 경우, 리소스를 정리한다.
'shouldContinue_synchronized()' 메서드는 큐에 더 처리할 이벤트가 있는지 확인하는 작업이다.
func shouldContinue_synchronized() -> Bool {
self.lock.performLocked {
let isEmpty = self.queue.isEmpty
if isEmpty { self.state = .stopped }
return !isEmpty
}
}
큐가 비어있지 않았을 경우(아직 처리해야 할 작업이 남아있는 상황), true를 반환하고,
큐가 비어있는 경우, 상태를 .stopped로 변경하고 false를 반환한다.
마지막으로 'shouldContinue_synchronized' 를 통해 반환되는 타입을 보고,
큐에 처리할 작업이 남아있다면, 'recurse' 를 호출하여 다음 반복을 스케줄링 한다.(이 부분은 정확하지 않음.)
따라서 run 메서드는 큐에서 이벤트를 하나씩 꺼내어 처리하고, 더 이상 처리할 이벤트가 없을 때까지 스케줄링을 진행한다.
그리고 이 작업은 우리가 지정한 'MainScheduler' 에서 수행된다.
observerOn 정리
1. observeOn 메서드 호출
observeOn 메서드는 전달받은 'scheduler' 의 타입을 체크하는데,
SerialDispatchQueueScheduler 타입이라면 'ObserveOn' 객체의 인스턴스를 생성하고 반환한다.
해당 타입이 아니라면 'ObserveOnSerialDispatchQueue' 객체의 인스턴스를 생성하고 반환한다.
2. ObserveOn의 run 메서드
observeOn을 통해 생성된 'ObserveOn' 객체에 subscribe 가 호출되면,
'ObserveOn' 은 run 메서드를 호출한다.
run 메서드에서는 'ObserveOnSink' 객체를 생성하며,
저장해둔 원본 Observable의 subscribe 메서드를 통해 자신(ObserveOnSink)을 전달한다.
이로써 원본 Observable에서 방출하는 이벤트는 'ObserveOnSink' 의 'onCore()' 메서드에서 처리할 수 있게 된다.
3. ObserveOnSink 의 onCore()
해당 메서드에서는 원본 Observable에서 방출한 이벤트를 큐에 저장해두고, state의 상태를 업데이트 한다.
그리고 작업을 처리해야 하는 상태라면, 'schedulerRecursive()'를 통해 우리가 지정한 스케줄러(Scheduler)에서 작업을 처리할 수 있도록 한다.
이 과정에서 받은 이벤트를 처리하는 작업인 'ObserveOnSink' 의 run 메서드가 전달된다.
4. ObserveOnSink의 run()
해당 메서드는 큐가 비어있는지를 체크하고, 처리해야 할 작업이 남아있다면 큐에서 이벤트를 꺼낸다.
작업이 없다면 그대로 메서드는 종료된다.
그리고 꺼낸 이벤트가 수행 가능한 상태인지를 체크하고, 가능한 상태라면 옵저버에게 이벤트를 전달해준다.
이 때, 전달된 이벤트가 .completed 거나 .error 라면 리소스를 정리하는 dispose() 를 호출한다.
마지막으로 'shouldContinue_synchronized' 메서드를 통해 더 이상 처리할 이벤트가 있는지 확인하면서 스케줄링을 진행한다.
Scheduler의 종류
1. MainScheduler
메인 스레드에서 작업을 스케줄한다.
일반적으로 UI 작업을 수행할 때, 사용된다.
2. CurrentThreadScheduler
현재 스레드에서 작업을 스케줄한다.
이는 기본적인(default) Scheduler로 중첩된 작업을 지원한다.
3. SerialDispatchQueueScheduler
특정 DispatchQueue에서 수행해야하는 작업을 Serial하게 스케줄한다.
MainScheduler도 이를 상속받고 있으며, 코드의 실행 순서를 제어할 필요가 있을 때 사용된다.
4. ConcurrentDispatchQueueScheduler
특정 DispatchQueue에서 작업을 동시적으로 스케줄한다.
병렬 작업을 수행할 때 사용된다.
5. OperationQueueScheduler
특정 NSOperationQueue 에서 작업을 스케줄한다.
큐의 동작을 세밀하게 제어할 수 있어 복잡한 작업을 수행할 때 사용된다.
Reference
-https://reactivex.io/documentation/scheduler.html
'RxSwift' 카테고리의 다른 글
RxSwift에 관하여(PublishSubject, ReplaySubject, AsyncSubject) (0) | 2023.08.10 |
---|---|
RxSwift에 관하여(Subject) (0) | 2023.08.09 |
RxSwift에 관하여(Error Handling Operators) (0) | 2023.08.08 |
RxSwift에 관하여(Combining Observables) (0) | 2023.08.08 |
RxSwift에 관하여(Disposable) (0) | 2023.07.31 |