Disposable
Observable은 비동기 이벤트의 스트림을 생성하고, 전달하는 역할을 하며, 이 때 방출되는 이벤트는 Observer가 받아서 처리한다. 받은 이벤트를 처리한 후에는 더 이상 Observer가 Observable을 관찰하고 있을 필요가 없게 된다. 하지만 Observer를 해제하지 않았을 경우, 필요하지 않은 리소스를 계속 사용하고 있게 된다. 따라서 필요하지 않은 리소스를 적절하게 해제하는 것이 중요하고, 이를 위해 'Disposable' 이 사용된다.
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable { ... }
위 코드와 같이 ObservableType 프로토콜의 extension에 정의된 'subscribe' 메서드들은 모두 'Disposable' 타입을 반환하고 있다. 'subscribe' 메서드는 Observable의 스트림에 대한 구독을 시작하며, 주어진 이벤트를 처리하고, 'Disposable' 객체를 반환한다.
우리는 방출되는 이벤트를 처리하면, 필요하지 않은 리소스는 해제해주어야 한다고 했죠?
'Disposable' 객체를 통해 구독의 수명을 관리하며, 언제든지 취소할 수 있기 때문에 'subscribe' 메서드는 'Disposable' 객체를 반환하고 있는 것이다.
Disposable 정의
public protocol Disposable {
/// Dispose resource.
func dispose()
}
'Disposable' 프로토콜은 하나의 메서드 'dispose()' 를 요구하고 있다.
이 dispose() 메서드가 실제로 관련 리소스를 해제하는 작업을 수행하는 것이다.
따라서 'Disposable' 프로토콜을 준수하고 있는 객체들은 모두 'dispose()' 메서드를 정의하고 있고, 이 'dispose()' 메서드의 정의 내용에는 관련 리소스를 해제하는 작업이 포함되어 있다.
Disposable 예시 코드
지금부터 해당 예시 코드의 내부동작을 보면서, 어떻게 리소스를 해제시키는지 살펴보자.
let observable = Observable<String>.just("1")
observable.subscribe { event in
print(event)
}
.dispose()
just 연산자를 통해 생성한 Observable에 subscribe 메서드를 사용했을 경우, 다음과 같은 작업이 수행된다.
public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
let observer = AnonymousObserver { e in
on(e)
}
return self.asObservable().subscribe(observer)
}
1) self.asObservable() 을 통해 'self' 를 전달한다.(asObservable() 메서드를 따라가보면, self를 반환하고 있는 것을 볼 수 있음)
2) self(just 연산자를 통해 생성된 Observable)의 'subscribe' 메서드를 호출하며, 'AnonymousObserver' 객체의 인스턴스를 인자로 전달한다.
3) 따라서 just 연산자를 통해 생성된 Observable의 subscribe 메서드는 'Disposable' 객체를 반환한다는 것을 알 수 있다.
Just(just 연산자를 통해 생성된 Observable)
final private class Just<Element>: Producer<Element> {
private let element: Element
init(element: Element) {
self.element = element
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
observer.on(.next(self.element))
observer.on(.completed)
return Disposables.create()
}
}
just 연산자를 통해 생성된 Observable의 subscribe 메서드는
1) 인자로 받은 observer(AnonymousObserver 객체의 인스턴스)의 on 메서드를 호출하여 .next 이벤트와 .completed 이벤트를 전달한다.
2) Disposables.create() 를 호출하여 Disposable을 반환한다.
Disposable 객체를 생성한 Disposables.create()
public struct Disposables {
private init() {}
}
먼저 Disposables는 구조체로 선언되어 있는 것을 볼 수 있다.
extension Disposables {
/**
Creates a disposable that does nothing on disposal.
*/
static public func create() -> Disposable { NopDisposable.noOp }
}
Disposables 구조체의 create 메서드는 오버로드 형태로 여러가지가 정의되어 있다.
하지만 Just 클래스의 subscribe 메서드에서 사용한 Disposables.create() 의 정의는 위 코드와 같다.
create() 메서드는 'Disposable' 타입인 'NopDisposable.noOp' 을 반환하고 있다.
NopDisposable의 정의
private struct NopDisposable : Disposable {
fileprivate static let noOp: Disposable = NopDisposable()
private init() {
}
/// Does nothing.
public func dispose() {
}
}
NopDisposable은 'Disposable' 프로토콜을 준수하는 구조체로, 이름에서 알 수 있듯이 'No Operation Disposable' 을 의미한다. 따라서 Disposables.create() 메서드를 호출하여 생성된 NopDisposable의 'dispose' 메서드는 아무런 동작을 수행하지 않는다. NopDisposable은 이렇게 이벤트를 방출한 후에 따로 취소할 작업이 없는 경우에 사용된다.
우리가 사용한 just 연산자는 한 번의 이벤트만 방출하는 Observable이다.
따라서 이 Observable에 subscribe 메서드를 호출하면, 정해진 한 번의 이벤트를 방출한 후에는 더 이상 할 일이 없다. 즉, 이벤트를 취소하거나 중지하는 작업이 필요 없다.
NopDisposable은 아무것도 하지 않는 Disposable이며, dispose 메서드를 호출해도 아무런 효과가 없다. 그래서 just Observable은 한 번의 이벤트 방출 후에 더 이상 할 작업이 없기 때문에 NopDisposable을 반환한다.
Disposable이 어떻게 취소 작업을 하는지 보고 싶었는데, 사용예시를 just 연산자로 드는 덕분에 'NopDisposable' 에 대해서 이해하게 되었다. 다른 예시를 하나 더 보면서 Disposable의 취소 작업을 알아보자.
interval을 이용한 사용예시
let intervalObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe { event in
switch event {
case .next(let value):
print("\(value)")
case .error(let error):
print("에러: \(error)")
case .completed:
print("작업 완료")
}
}
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
intervalObservable.dispose()
}
//0
//1
//2
//3
//4
//dispose됨.
interval 연산자는 1초마다 정수를 방출하는 Observable을 생성한다.
그리고 'DispatchQueue.main.asyncAfter(deadline: .now() + 5)' 메서드를 통해 5초 후에 해당 Observable을 dispose 한다. dispose() 메서드가 호출되면, Observable의 동작이 취소되고, 더 이상 이벤트를 방출하지 않는다.(출력된 결과에서도 볼 수 있음)
interval 메서드의 정의
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
interval 메서드는 'Timer' 라는 클래스 타입 Observable을 생성하여 반환한다.
interval 메서드를 통해 생성한 Observable(Timer)을 subscribe 하면,
Timer 클래스에는 'subscribe' 가 정의되어 있지 않아, 'Timer' 가 상속받고 있는 'Producer' 클래스의 subscribe 메서드가 호출된다.
Producer 클래스의 subscribe 정의
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
'Producer' 클래스의 subscribe 메서드는 'Disposable' 타입을 반환하고 있다.
그리고 이 Disposable은 리소스를 해제하는데 사용된다.
'CurrentThreadScheduler.isScheduleRequired'
이 코드는 현재 스레드에서 작업을 싱핼해야 할지, 아니면 스케줄링을 거쳐야 하는지를 판단하는 로직이다.
false라면, 현재 스레드에서 바로 작업을 실행해도 되고, true면 현재 스레드에서 작업을 실행하면 안되고, 작업을 스케줄링해야 한다는 의미. (우리는 Disposable이 어떻게 작동되는지만 알면 되기 때문에 깊게 이해할 필요는 없다.)
'let disposer = SinkDisposer()'
이 코드는 'SinkDisposer' 객체를 생성한다.
'SinkDisposer' 객체는 'Disposable' 프로토콜을 준수하는 클래스로 'dispose' 메서드를 호출하면 관련된 모든 리소스를 해제하는 로직을 가지고 있다.
'let sinkAndSubscription = self.run(observer, cancel: disposer)'
'Producer' 클래스의 'run' 메서드를 호출하는데,
실제로 수행되는 것은 해당 Observable인 'Timer' 클래스에서 재정의 된 'run' 메서드가 호출된다.
잠시 'Timer' 클래스의 'run' 정의를 확인해보자.
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
if self.period != nil {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
run 메서드는 'Observer(AnonymousObserver)' 객체와 'Cancelable(SinkDisposer)' 객체를 파라미터로 받고 있으며,
'(sink: Disposable, subscription: Disposable)' 형태의 튜플을 반환하고 있다.
여기서 sink는 'TimerSink' 객체를 의미하는데, Observable에서 발생하는 이벤트를 처리하고, 이를 Observer에게 전달하는 역할을 수행하고, subscription은 run 메서드가 반환하는 'Disposable' 객체이며, run 메서드는 주기적으로 이벤트를 발생시키는 작업을 수행하고, 이를 취소할 수 있는 기능을 제공한다. (자세한지는 모르겠음..)
이제 다시 'Producer'의 'subscribe' 메서드로 돌아가보자.
'disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)'
생성된 'SinkDisposer' 객체는 'setSinkAndSubscription' 메서드를 호출하며, sink(TimerSink 객체)와 subscription(TimerSink의 run 메서드를 통해 반환된 Disposable)을 받고 있다.
SinkDisposer의 정의
private final class SinkDisposer: Cancelable {
private enum DisposeState: Int32 {
case disposed = 1
case sinkAndSubscriptionSet = 2
}
private let state = AtomicInt(0)
private var sink: Disposable?
private var subscription: Disposable?
var isDisposed: Bool {
isFlagSet(self.state, DisposeState.disposed.rawValue)
}
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self.sink = sink
self.subscription = subscription
let previousState = fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
func dispose() {
let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self.sink else {
rxFatalError("Sink not set")
}
guard let subscription = self.subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
}
먼저 'SinkDisposer' 객체는 'Disposable' 프로토콜을 준수하는 클래스이다.
'setSinkAndSubscription' 메서드를 보면, 두 Disposable 객체(sink, subscription)를 SinkDisposer 내부 프로퍼티에 저장한다. 그리고 두 객체가 설정되었다는 상태 값을 저장한다.(fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue))
if (previousState & DisposeState.disposed.rawValue) != 0
이 부분은 'SinkDisposer' 가 dispose 된 상태를 확인하는 코드인데, 만약 dispose() 메서드가 호출되면 상태가 설정되고, 두 Disposable 객체(sink, subscription)의 dispose() 메서드를 호출하고, nil로 할당하여 참조를 해제한다.
이 코드가 필요한 이유는 SinkDisposer에 dispose()가 호출된 상태에서 두 Disposable 객체(sink, subscription)가 설정되는 경우를 대비하기 위해서다.
따라서 'setSinkAndSubscription' 메서드에서는 SinkDisposer 객체에 두 Disposable 객체(sink, subscription)를 저장하고, 만약 inkDisposer에 dispose()가 호출된 상태라면 즉시 설정된 두 Disposable 객체(sink, subscription)의 dispose 메서드를 호출하고, nil을 할당하여 참조를 해제한다.
다시 예시 코드를 보자.
intervalObservable.dispose()
생성된 Observable에 dispose() 메서드를 호출하면,
바로 위 'SinkDisposer' 객체의 'dispose' 메서드가 호출되는 것이다.
SinkDispose 객체의 dispose 메서드는 객체의 상태를 확인하고, 'sink' 와 'subscription' 이 설정되어 있다면, 두 객체의 dispose 메서드를 호출하고, nil을 할당하여 참조를 해제한다. 따라서 SinkDispose 객체는 Observable을 구독하고, 이벤트를 처리하는 동안 생성된 리소스를 안전하게 처리할 수 있게 해준다.
'sink' 와 'subscription' 객체의 dispose() 메서드는 자신의 상태를 dispose 상태로 설정하고, 작업을 중지하는 역할을 수행한다. 실제로 'sink' 의 dispose() 메서드를 호출하면, 'TimerSink(sink)' 의 수퍼클래스인 'Sink' 의 dispose() 가 호출되는데, 내부 동작을 확인해보면 'fetchOr' 메서드를 통해 상태를 설정(객체가 dispose되었음)하고, 이 상태 코드를 통해 작업을 중지할 수 있다.
Sink의 dispose 메서드
func dispose() {
fetchOr(self.disposed, 1)
self.cancel.dispose()
}
fetchOr을 통해 상태를 설정하고, cancel(SinkDispose)의 dispose() 메서드를 호출하는 것을 볼 수 있다.
(SinkDispose는 이미 disposed 상태이기 때문에 dispose() 메서드는 중복 호출되지 않음)
dispose(리소스 해제) 작업 정리
1. interval 연산자는 'Producer' 를 상속받는 'Timer' 객체(Observable)를 생성한다.
여기서 Observable에 subscribe를 호출하면, Producer의 subscribe 가 호출되며,
Producer의 subscribe 메서드는 'SinkDisposer' 객체를 생성하고, Timer의 run 메서드를 호출한다.
'SinkDisposer' 는 작업이 모두 마무리 되면 관련 리소스들을 제거하는 역할을 담당하는 Disposable이다.
2. Timer의 run 메서드는 실제 이벤트를 처리하는 객체인 'TimerSink' 를 생성하고,
TimerSink 의 run 메서드를 통해 실제로 이벤트를 발생시키는 subscription을 생성한다.
이 sink와 subscription은 모두 'SinkDisposer' 에 저장된다.
3. Observable에 dispose() 메서드가 호출되면, SinkDisposer의 dispose() 메서드가 호출된다.
SinkDisposer의 dispose() 메서드는 sink와 subscription에 대해 각각 dispose 메서드를 호출하며,
마지막으로 nil을 할당(참조 해제)하여 메모리에서 해제될 수 있도록 한다.
4. sink와 subscription에 대해 각각 dispose 메서드가 불릴 때,
sink는 자신의 상태를 dispose 상태로 변경하고, cancel 객체의 dispose 메서드를 호출한다.
subscription은 스케줄러에 등록한 타이머 작업을 중지한다.
따라서 dispose는 Observable이 생성, 구독되고 이벤트를 처리하는 과정에서 생성되는 모든 리소스에 nil을 할당하고,
수행중인 작업을 중지하는 역할을 수행한다.
DisposeBag
Observable을 구독하고 이벤트를 처리한 후, 관련 리소스를 모두 해제해주어야 하고, 이 작업은 dispose() 를 통해 수행되는 것을 학습했다. 근데 모든 작업에 대해 dispose() 를 일일이 달아주는 것은 귀찮고, 코드가 지저분해 질 수 있다.
이를 간단하게 수행하기 위해 'DisposeBag'이 수행된다.
DisposeBag은 작업을 처리한 후 생성되는 Disposable을 담는 배열이다.
따라서 생성되는 Disposable을 DisposeBag에 담아두었다가 한 번에 dispose() 처리를 할 수 있다.
Disposable을 DisposeBag에 담는 메서드
extension Disposable {
/// Adds `self` to `bag`
///
/// - parameter bag: `DisposeBag` to add `self` to.
public func disposed(by bag: DisposeBag) {
bag.insert(self)
}
}
Disposable 프로토콜을 준수하고 있는 객체는 모두 disposed 메서드를 수행할 수 있다.
그럼 DisposeBag에 담긴 Disposable 객체들을 언제 dispose() 해야 하는가?
public final class DisposeBag: DisposeBase {
deinit {
self.dispose()
}
}
DisposeBag의 정의를 찾아보면, 자신이 deinit 될 때, dispose() 메서드를 호출하는 것을 볼 수 있다.
따라서 DisposeBag을 따로 dispose() 처리해 줄 필요는 없다.
하지만 자신이 원하는 순간에 DisposeBag을 비워주고 싶다면, 해당 순간에 직접 dispose() 메서드를 호출해도 되며,
주의할 점은 DisposeBag이 deinit 될 때, dispose가 호출되기 때문에 지역변수로 두면 해당 구간이 종료되면 메모리를 해제해버릴 수 있다. 따라서 DisposeBag은 보통 전역 변수로 두기도 한다.
DisposeBag의 dispose 메서드
public final class DisposeBag: DisposeBase {
private var disposables = [Disposable]()
private var isDisposed = false
/// This is internal on purpose, take a look at `CompositeDisposable` instead.
private func dispose() {
let oldDisposables = self._dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
}
자신에게 저장되어 있던 Disposable 객체를 하나씩 dispose() 하는 것을 볼 수 있다.
DisposeBag의 사용예시
import RxSwift
class MyViewController: UIViewController {
let disposeBag = DisposeBag()
override func viewDidLoad() {
super.viewDidLoad()
// 첫번째 Observable을 구독
Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
.subscribe(onNext: { value in
print("첫번째 Observable: \(value)")
})
.disposed(by: disposeBag)
// 두번째 Observable을 구독
Observable<String>.of("가", "나", "다")
.subscribe(onNext: { value in
print("두번째 Observable: \(value)")
})
.disposed(by: disposeBag)
// 세번째 Observable을 구독
Observable<Int>.from([1, 2, 3])
.subscribe(onNext: { value in
print("세번째 Observable: \(value)")
})
.disposed(by: disposeBag)
}
}
각 Observable을 subscribe 하고, 반환되는 Disposble 객체에 'disposed(by bag: disposeBag)' 를 호출하여 DisposeBag에 담는다. 그리고 MyViewController가 메모리에서 해제될 때, DisposeBag도 deinit 되면서 자동으로 메모리에서 해제한다.
Reference
https://babbab2.tistory.com/186
'RxSwift' 카테고리의 다른 글
RxSwift에 관하여(Error Handling Operators) (0) | 2023.08.08 |
---|---|
RxSwift에 관하여(Combining Observables) (0) | 2023.08.08 |
RxSwift에 관하여(Filtering Observables) (0) | 2023.07.27 |
RxSwift에 관하여(Transforming Observables) (0) | 2023.07.23 |
RxSwift에 관하여(Observable과 Creating Observables) (0) | 2023.07.14 |