본문 바로가기
RxSwift

RxSwift에 관하여(PublishSubject, ReplaySubject, AsyncSubject)

by iOS 개린이 2023. 8. 10.

PublishSubject

 

PublishSubject는 Observer가 subscribe한 시점 이후에 방출되는 아이템만 전달하는 Subject이다.

구독 시점 이후에만 방출되는 아이템을 전달하기 때문에 PublishSubject의 생성 시점과 구독 시점 사이에 방출된 아이템은 전달되지 않는다.

 

또한 PublishSubject는 생성 즉시 이벤트를 방출할 준비가 되기 때문에 새로운 Observer가 구독하기 전의 이벤트 요소는 유실될 수 있다. 만약 원본 Observable이나 Subject가 에러 이벤트로 종료된 경우, PublishSubject 는 이후 옵저버에게 해당 에러 이벤트만을 전달한다. 

 

 

 

PublishSubject의 사용예시

let publishSubject = PublishSubject<String>()

// 옵저버 1 구독
let subscription1 = publishSubject.subscribe { event in
    print("Observer 1: \(event)")
}

publishSubject.onNext("첫 번째 이벤트") // 옵저버 1에게 전달

// 옵저버 2 구독
let subscription2 = publishSubject.subscribe { event in
    print("Observer 2: \(event)")
}

publishSubject.onNext("두 번째 이벤트") // 옵저버 1, 2에게 전달

publishSubject.onCompleted() // 옵저버 1, 2에게 전달

//Observer1: next(첫 번째 이벤트)
//Observer1: next(두 번째 이벤트)
//Observer2: next(두 번째 이벤트)
//Observer1: next(completed)
//Observer2: next(completed)

 

위 코드에서 볼  수 있듯이 옵저버 1과 옵저버 2는 모두 구독한 이후에 방출하는 이벤트만 받을 수 있다.

우리가 'BehaviorSubject' 에서는 옵저버가 구독을 하면 즉시 최근에 방출했던 이벤트를 전달해주었었는데,

PublishSubject 는 이벤트가 방출되어야 해당 이벤트를 Observer에게 전달해준다.

 

PublishSubject의 동작 메커니즘은 이전에 BehaviorSubject의 코드를 뜯어보면서 학습했던 것과 동일하기 때문에 넘어가도록 한다.

 

PublishSubject의 유용성

동적 구독

PublishSubject 는 구독 시점 이후의 이벤트만 Observer에게 전달하기 때문에, 

필요없는 데이터 방출을 피하고, 필요한 시점에 이벤트를 방출받을 수 있다.

 

 

 

ReplaySubject

 

ReplaySubject는 구독시점과 관계없이 Observable이 방출한 모든 요소를 Observer에게 전달한다.

즉, 구독 시점 이전에 방출된 이벤트도 replay하여 받을 수 있다.

 

ReplaySubject는 replay 할 요소의 갯수나 시간 제한을 설정할 수 있는데, 이 갯수가 넘어가거나, 지정된 시간이 경과한 아이템은 버려진다. 이를 통해 필요한 만큼의 이벤트를 전달할 수 있다.

 

따라서 ReplaySubject는 늦게 구독한 Observer도 과거의 이벤트를 받아야 하는 경우나, 특정 범위 내의 이벤트만 전달하고 싶은 경우 유용하게 사용된다. 

 

 

ReplaySubject의 사용예시

let replaySubject = ReplaySubject<String>.create(bufferSize: 2)

// 아이템 방출
replaySubject.onNext("A")
replaySubject.onNext("B")
replaySubject.onNext("C")

// 첫 번째 옵저버 구독
replaySubject.subscribe { event in
    print("Observer 1: \(event)")
}

// 두 번째 아이템 방출
replaySubject.onNext("D")

// 두 번째 옵저버 구독
replaySubject.subscribe { event in
    print("Observer 2: \(event)")
}

// 결과 종료
replaySubject.onCompleted()

// 세 번째 옵저버 구독
replaySubject.subscribe { event in
    print("Observer 3: \(event)")
}

//Observer 1: B
//Observer 1: C
//Observer 1: D

//Observer 2: C
//Observer 2: D

//Observer 1: completed
//Observer 2: completed

//Observer 3: C
//Observer 3: D
//Observer 3: completed

 

ReplaySubject 를 생성하면서 buffer의 크기를 2로 제한해두었다. 

따라서 구독하는 옵저버들은 모두 최근에 방출된 2개의 요소만을 전달받고 있다.

 

여기서 신기한 것은 ReplaySubject가 .completed를 통해 이벤트를 종료했는데,

이 시점 이후에 구독한 Observer3이 최근 데이터를 전달받는 것이다.

 

ReplaySubject에 대해 좀 더 알아보자.

 

ReplaySubject의 create 메서드 정의

public static func create(bufferSize: Int) -> ReplaySubject<Element> {
        if bufferSize == 1 {
            return ReplayOne()
        }
        else {
            return ReplayMany(bufferSize: bufferSize)
        }
    }

 

각 객체의 네이밍을 통해 어떤 작업을 수행하는지 감을 잡을 수 있다.

먼저 전달된 'bufferSize' 가 1인지 아닌지 여부를 체크하고, 생성할 객체를 선택한다.

우리는 2를 전달했기 때문에 'ReplayMany' 객체에 대해서 알아보자.

 

 

ReplayMany의 정의

private final class ReplayMany<Element> : ReplayManyBase<Element> {
    private let bufferSize: Int
    
    init(bufferSize: Int) {
        self.bufferSize = bufferSize
        
        super.init(queueSize: bufferSize)
    }
    
    override func trim() {
        while self.queue.count > self.bufferSize {
            _ = self.queue.dequeue()
        }
    }
}

 

ReplayMany 클래스는 전달받은 'bufferSize' 를 내부 프로퍼티에 저장하고, 

수퍼클래스인 'ReplayManyBase' 의 init 메서드를 호출한다. 

 

trim() 메서드는 큐의 크기가 지정된 'bufferSize' 보다 클 경우 오래된 이벤트를 제거한다.

이를 통해 메모리 사용량을 제어하며, bufferSize 를 초과하는 이벤트가 저장되지 않도록 관리한다.

 

 

ReplayManyBase의 정의

private class ReplayManyBase<Element>: ReplayBufferBase<Element> {
    fileprivate var queue: Queue<Element>
    
    init(queueSize: Int) {
        self.queue = Queue(capacity: queueSize + 1)
    }
    
    override func addValueToBuffer(_ value: Element) {
        self.queue.enqueue(value)
    }

    override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        for item in self.queue {
            observer.on(.next(item))
        }
    }

    override func synchronized_dispose() {
        super.synchronized_dispose()
        self.queue = Queue(capacity: 0)
    }
}

 

ReplayMany 클래스에서 호출한 'super.init(queueSize: bufferSize)' 를 통해 ReplayManyBase의 초기화 메서드가 실행된다.

 

ReplayManyBase 초기화 메서드에서는 'self.queue = Queue(capacity: queueSize + 1)' 를 실행한다.

일단 이렇게 queue를 설정해주는 것으로 create 메서드의 동작은 마친다.

 

여기서 queue란 ReplaySubject가 방출한 값들을 저장하는 데 사용되는 객체다.(실제로 요소를 저장하는 곳은 내부 프로퍼티 storage) 이 queue를 통해 Observer가 구독을 시작할 때, 해당 queue에 저장된 이벤트를 replay 해준다.

 

또한 'Queue' 구조체는 내부적으로 요소를 담을 저장공간을 만들고, 

enqueue(), dequeue() 등의 메서드를 통해 새로운 요소를 추가하고, 저장공간에서 요소를 제거하는 작업을 수행하면서 정해진 크기를 조절한다. 

 

 

 

ReplaySubject의 create 메서드 동작 여기까지 정리

1. create 메서드 호출

create(...) 메서드가 호출되면, 전달된 'bufferSize' 가 1인지 아닌지에 따라 서로 다른 객체를 생성한다.

우리는 2를 전달했기 때문에 'ReplayMany' 객체를 생성한다.

 

2. ReplayMany 초기화

ReplayMany는 내부 프로퍼티에 bufferSize를 저장하고,

수퍼클래스의 초기화 메서드를 호출하면서 bufferSize를 전달한다.

 

3. ReplayManyBase 초기화

ReplayManyBase 는 'Queue(...)' 구조체를 생성하면서,

bufferSize 크기를 참고한 'queue' 를 생성한다.

 

4. Queue 초기화

'Queue' 구조체는 내부적으로 'bufferSize' 에 맞는 저장공간을 준비하고, 

요소가 들어오면 저장하고, 초과하면 삭제하면서 정해진 크기를 조절한다.

 

 

여기까지는 ReplaySubject의 create() 메서드 동작 방식이었고, 

ReplaySubject 가 onNext 이벤트를 방출할 때, 내부적으로는 어떤 동작이 수행되는지 알아보자.

 

ReplaySubject의 onNext 이벤트

지난시간 Subject에서 onNext 이벤트를 호출했을 경우, ObserverType을 준수하는 타입은 내부적으로 on 메서드의 정의를 가지고 있기 때문에 해당 on 메서드가 호출되는 내부동작을 볼 수 있었다. 

 

그럼 ReplaySubject에 정의되어 있는 on 메서드를 찾아봐야 할까?

아니다. 우리는 ReplaySubject의 create() 를 통해 'ReplayMany' 객체를 생성했다.

따라서 ReplayMany 의 on 메서드가 정의되는 부분을 봐야한다. 

 

하지만 ReplayMany 객체에는 on 메서드가 정의되어 있지 않다.

'ReplayMany'는 'ReplayManyBase' 를 상속받고 있고,

'ReplayManyBase' 는 'ReplayBufferBase' 를 상속받고 있고,

'ReplayBufferBase' 는 'ReplaySubject' 를 상속받고 있다. 

 

이런 부모관계를 가지고 있기 때문에 'ReplayMany' 의 on 메서드를 호출하면,

차례대로 올라가면서 호출할 on 메서드의 정의를 찾고,

'ReplayBufferBase' 객체에서 on 메서드를 찾을 수 있다. 그리고 이를 호출한다.

 

override func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        dispatch(self.synchronized_on(event), event)
    }

 

위 코드는 'ReplayBufferBase' 객체의 on 메서드의 정의이다. Subject의 동작방식이 대부분 비슷하다고 했죠?

따라서 dispatch() 를 통해 각 옵저버에게 동일한 이벤트 스트림을 전달하는 동작을 수행한다.

 

 

ReplayBufferBase의 synchronized_on 메서드

 

func synchronized_on(_ event: Event<Element>) -> Observers {
        self.lock.lock(); defer { self.lock.unlock() }
        if self.isDisposed {
            return Observers()
        }
        
        if self.isStopped {
            return Observers()
        }
        
        switch event {
        case .next(let element):
            self.addValueToBuffer(element)
            self.trim()
            return self.observers
        case .error, .completed:
            self.stoppedEvent = event
            self.trim()
            let observers = self.observers
            self.observers.removeAll()
            return observers
        }
    }

 

 

 

나머지 부분은 지난시간에 학습했던 코드로 다른 스레드와의 충돌을 방지하거나, 객체가 이벤트를 방출할 수 있는 상태인지 체크하는 코드고, switch event { ... } 부분을 집중해보자.

 

 

next 이벤트일 경우 addValueTobuffer의 동작

override func addValueToBuffer(_ value: Element) {
        self.queue.enqueue(value)
    }

 

'addValueToBuffer' 는 'ReplayManyBase' 객체에 정의되어 있는 메서드로, 

아까 create() 메서드를 통해 ReplayManyBase가 초기화되면서 Queue 객체의 인스턴스인 queue를 저장하는 것을 볼 수 있었다. 그리고 .enqueue() 메서드를 호출한다.

 

Queue의 enqueue() 메서드

mutating func enqueue(_ element: T) {
        if count == storage.count {
            resizeTo(Swift.max(storage.count, 1) * resizeFactor)
        }
        
        storage[pushNextIndex] = element
        pushNextIndex += 1
        innerCount += 1
        
        if pushNextIndex >= storage.count {
            pushNextIndex -= storage.count
        }
    }

 

strage: 큐에 저장된 요소들을 담고 있는 배열.

innerCount: 현재 큐에 저장된 요소의 수를 나타냄. == count

pushNextIndex: 다음 요소가 추가될 위치의 인덱스를 나타냄.

 

먼저, enqueue는 현재 큐의 크기(count)와 storage의 크기가 같은지 확인한다. 

같다면 이미 가득 찼다는 의미이므로, 'resizeTo' 메서드를 호출하여 큐의 크기를 늘린다.

 

다음은 'strage[pushNextIndex]' 위치에 방출받은 요소를 저장한다.

그리고 pushNextIndex와 innerCount를 1씩 더하여 업데이트한다.

 

'if pushNextIndex >= storage.count { ... }' 코드를 통해

'pushNextIndex' 가 'strage.count' 보다 크거나 같아지면 pushNextIndex에서 strage.coun를 빼준다.

이는 인덱스가 배열의 범위를 벗어나지 않도록 하는 것으로 추가할 인덱스 위치가 배열의 끝에 도달하면 다시 처음으로 돌아가 요소를 추가할 수 있도록 해준다.

 

따라서 'enqueue' 는 replay 할 요소를 저장하고, 저장공간의 크기를 조절하는 역할을 수행한다.

 

 

next 이벤트일 경우 trim()의 동작

override func trim() {
        while self.queue.count > self.bufferSize {
            _ = self.queue.dequeue()
        }
    }

 

queue.count는 현재 큐에 저장된 요소의 수를 나타낸다.

따라서 큐에 저장된 요소의 수가 'bufferSize' 보다 커졌을 경우, dequeue() 메서드를 통해 큐의 크기를 조절하는 역할을 수행한다. 반복문을 통해 현재 큐에 저장된 요소가 저장해야 하는 요소보다 작아질 수 있도록 크기를 줄인다.

 

Queue의 dequeue() 메서드

mutating func dequeue() -> T? {
        if self.count == 0 {
            return nil
        }

        defer {
            let downsizeLimit = storage.count / (resizeFactor * resizeFactor)
            if count < downsizeLimit && downsizeLimit >= initialCapacity {
                resizeTo(storage.count / resizeFactor)
            }
        }

        return dequeueElementOnly()
    }

 

먼저 현재 큐에 있는 요소가 0이라면 크기 조절이 필요없기 때문에  그대로 종료한다.

defer는 메서드가 종료될 때 실행되는데, 현재 큐의 저장공간 크기를 줄일 수 있는지를 체크하고, 조건이 만족되면 큐의 크기를 줄이는 동작을 수행한다. 

 

우리는 요소를 삭제하는 동작을 보고 싶은 거죠?

그 동작은 바로 마지막에 반환되는 dequeueElementOnly() 에서 수행한다.

 

dequeueElementOnly 메서드 동작

private mutating func dequeueElementOnly() -> T {
        precondition(count > 0)
        
        let index = dequeueIndex

        defer {
            storage[index] = nil
            innerCount -= 1
        }

        return storage[index]!
    }

 

먼저 precondition을 통해 큐에 원소가 존재해야 메서드가 호출될 수 있다는 전제조건을 체크한다. 

 

dequeueIndex은 계산 프로퍼티로 큐에서 다음에 제거할 요소의 인덱스를 계산하여 반환하는 역할을 수행한다.

그리고 큐에 저장된 요소들을 담은 배열 'storage' 에서 해당 인덱스에 위치한 요소를 제거하고, innerCount 도 -1 로 업데이트한다.

 

따라서 trim() 를 호출하면 현재 큐의 크기가 우리가 설정한 큐의 크기(bufferSize) 보다 커질 경우, 초과된 부분을 제거하는 역할을 수행한다. 이 과정에서 큐의 크기를 조절할 수 있는지 여부를 판단하고, 제거할 요소를 선택하여 해당 요소를 제거하는 작업을 수행한다. 

 

next 이벤트일 경우, 정리

따라서 'ReplaySubject' 에서 next 이벤트가 방출되면, 

1. 'addValueToBuffer()' 를 통해 이벤트 요소를 저장 공간(stroage)에 저장하고, 관련 변수를 업데이트 한다.

 

2. 'trim()' 메서드는 현재 요소의 크기가 우리가 설정해둔 크기보다 클 경우, 초과된 부분을 제거하는 역할을 수행하며,

이를 통해 항상 설정한 크기 이하로 유지되도록 한다.

 

3. 그리고 옵저버들을 반환한다. 

 

 

Error나 Completed 이벤트일 경우

이 경우에는 이벤트를 중단시키는 상태 변수를 업데이트하고, 모든 Observer 들을 제거하여 반환한다.

근데 여기서도 trim() 을 호출하는데, 이 이유가 무엇일까? 

찾아봤는데.. 잘모르겠다. (지피티는 일관성이나 나중에 확장성을 위해 놔두었다고 하는데 정확한 이유는 모르겠음!)

 

 

 

ReplaySubject의 subscribe 정리

ReplaySubject를 생성하고, Next 이벤트를 방출할 때 내부적으로는 어떤 동작이 수행되는지를 알아보았다.

그럼 이제 Observer가 ReplaySubject를 구독할 때에는 어떤 작업이 수행되는지 찾아보자.

 

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self.lock.performLocked { self.synchronized_subscribe(observer) }
    }

 

여기도 subscribe 메서드를 호출하면 'ReplayBufferBase' 의 subscribe 메서드가 호출된다.

그리고 해당 메서드는 'synchronized_subscribe' 메서드를 통해 구독 작업을 수행한다.

 

 

synchronized_subscribe 메서드 동작

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if self.isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
     
        let anyObserver = observer.asObserver()
        
        self.replayBuffer(anyObserver)
        if let stoppedEvent = self.stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        else {
            let key = self.observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    }

 

먼저 해당 객체가 disposed 상태인지 확인하고, 이미 리소스가 해제되었다면 옵저버에게 에러를 전달하고, Disposable을 반환한다.

 

다음 전달받은 observer를 AnyObserver 타입으로 변환 후 self.replayBuffer() 메서드를 호출한다.

그리고 replayBuffer 메서드가 바로 옵저버에게 최근에 저장한 요소를 방출하는 역할을 수행한다.

 

우리가 사용예시에서 이미 객체가 .completed 이벤트를 통해 종료되었는데, 저장한 요소를 다시 한번 replay하고 종료 이벤트를 전달받은 것을 보았다. 그 해답이 여기서 나온다. 해당 객체의 이벤트 종료 상태를 확인하기 전에 replayBuffer 메서드를 통해 저장한 요소를 방출하기 때문이다.

 

replayBuffer 메서드 동작

override func replayBuffer<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element {
        for item in self.queue {
            observer.on(.next(item))
        }
    }

 

이렇게 replayBuffer 메서드에서 전달받은 옵저버에게 그 동안 저장해두었던 요소를 이벤트로 방출한다.

self.queue는 Sequence 프로토콜을 준수하고 있기 때문에 내부적으로 'makeIterator' 메서드를 정의하고 있다.

따라서 방출할 요소를 내부적으로 하나씩 전달하고 observer에게 .next 이벤트를 통해 방출한다.

 

 

ReplaySubject 정리

ReplaySubject는 과거에 발생한 이벤트를 저장하고, 새로운 Observer에게 이 이벤트를 전달하는 역할을 수행한다.

 

1. ReplaySubject가 create() 를 통해 생성되면, 내부적으로 설정한 bufferSize 를 저장하고, 작업을 준비한다.

 

2. 이벤트가 발생하면 'synchronized_on' 메서드를 통해 이벤트 유형에 따라 적절한 처리를 수행한다. 처리 과정에서 'enqueue' 및 'dequeue' 메서드를 통해 방출된 요소를 큐에 추가하고 제거한다. 

 

3. subscribe 메서드가 호출되면, 'synchronized_subscribe' 를 통해 observer에게 저장된 요소를 방출하거나, 에러 및 종료 이벤트를 전달한다.

 

 

 

AsyncSubject

 

AsyncSubject는 원본 Observable(or Subject)이 완료된 경우에만 마지막으로 방출된 요소를 Observer에게 전달한다.

만약 원본 Observable이 어떤 값을 방출하지 않고 완료되면, AsyncSubject 역시 값을 방출하지 않고 완료된다.

 

AsyncSubject 가 요소를 방출하기 위해서는 반드시 원본 Observable이 완료되어야 하며, 만약 중간에 에러로 종료된다면, AsyncSubject 는 값을 방출하지 않고 해당 에러를 전달한다.

 

 

AsyncSubject 사용예시

let asyncSubject = AsyncSubject<String>()

//옵저버 1 추가
asyncSubject.subscribe(onNext: { value in
    print("옵저버 1: \(value)")
}).disposed(by: disposeBag)

// 파일 다운로드가 시작되었음을 가정하고, 상태를 업데이트.
asyncSubject.onNext("Downloading...")
asyncSubject.onNext("50% completed")
asyncSubject.onNext("75% completed")

//옵저버 2 추가
asyncSubject.subscribe(onNext: { value in
    print("옵저버 2: \(value)")
}).disposed(by: disposeBag)

// 다운로드 완료 후 완료 이벤트를 전달합니다.
asyncSubject.onNext("Download completed")
asyncSubject.onCompleted()

//옵저버 3 추가
asyncSubject.subscribe(onNext: { value in
    print("옵저버 3: \(value)")
}).disposed(by: disposeBag)


// 옵저버 1: Download completed
// 옵저버 2: Download completed
// 옵저버 3: Download completed

 

 

옵저버 1과 2는 asyncSubject 가 .onCompleted() 이벤트가 방출되고 난 후, 가장 최근에 방출되었던 이벤트 요소를 전달받는다. 

 

 

그리고 asyncSubject가 이미 완료 이벤트를 방출한 후, asyncSubject 를 구독하는 옵저버 3에게도 동일하게 최근에 방출되었던 요소를 전달한다. 

 

 

 

AsyncSubject의 synchronized_on 메서드 동작

func synchronized_on(_ event: Event<Element>) -> (Observers, Event<Element>) {
        self.lock.lock(); defer { self.lock.unlock() }
        if self.isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            self.lastElement = element
            return (Observers(), .completed)
        case .error:
            self.stoppedEvent = event

            let observers = self.observers
            self.observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = self.observers
            self.observers.removeAll()

            if let lastElement = self.lastElement {
                self.stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                self.stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

 

이는 AsyncSubject 가 이벤트를 방출했을 때, on 메서드에서 호출하는 'synchronized_on' 메서드의 정의이다.

 

.next 이벤트의 경우 방출한 요소만을 저장하고, 반환 이벤트로 .completed를 내보낸다. 

반환 이벤트로 .completed 를 내보내는 이유는 AsyncSubject 특성 상 next 이벤트가 방출되어도 요소만을 저장하고, Observer에게 아무런 이벤트도 내보내지 않아야 하기 때문이다. 

 

.completed 이벤트를 내보내면, 각 Observer는 해당 이벤트를 단순하게 받기만 할 수 있다.

하지만 .next 이벤트를 내보내면, 각 Observer는 .next 이벤트에 해당하는 로직을 수행하면서, 요소를 방출할 수 밖에 없다.

따라서 AsyncSubject의 특성을 살리기 위해 .completed 이벤트를 방출하는 것을 볼 수 있다.

 

그리고 AsyncSubject의 완료 이벤트에 대한 로직은 'case .completed:' 내부에 존재한다.

따라서 .completed 이벤트인 경우, 먼저 마지막으로 방출된 요소가 있는지 확인하고,

요소가 있는 경우, 해당 값을 옵저버에게 전달한다. 

요소가 없는 경우, 그대로 완료 이벤트를 전달한다.

 

 

 

AsyncSubject의 synchronized_subscribe 메서드 동작

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self.stoppedEvent {
            switch stoppedEvent {
            case .next:
                observer.on(stoppedEvent)
                observer.on(.completed)
            case .completed:
                observer.on(stoppedEvent)
            case .error:
                observer.on(stoppedEvent)
            }
            return Disposables.create()
        }

        let key = self.observers.insert(observer.on)

        return SubscriptionDisposable(owner: self, key: key)
    }

 

옵저버가 AsyncSubject 를 subscribe 했을 경우, 내부에서 호출하는 'synchronized_subscribe' 메서드의 정의이다.

 

AsyncSubject 를 subscribe 한 Observer는 AsyncSubject 가 완료되었을 경우에만 마지막 요소를 받을 수 있었다.

그리고 'synchronized_on' 메서드의 .completed 케이스에서는 마지막 요소가 있으면 'stoppedEvent' 에 .next 이벤트를 저장해두었고, 마지막 요소가 없다면 .completed 이벤트를 저장하는 것을 볼 수 있었다.

 

따라서 'stoppedEvent' 가 .next 이벤트인 경우는 이미 'AsyncSubject' 가 완료 이벤트를 방출한 것이자, 전달할 마지막 요소가 존재한다는 의미이다. 

 

위 코드에서 .next 이벤트에 대한 처리를 보면, 마지막으로 전달할 요소를 .next 이벤트로 방출하고, .completed 이벤트를 방출하는 것을 볼 수 있다!

 

 

 

AsyncSubject의 유용성

1. 최종 결과 관찰

위 사용예시에서도 비슷한 상황을 가정했듯이, 파일 다운로드 작업과 같이 긴 작업에서 최종 완료 상태에만 관심이 있을 경우, AsyncSubject를 사용하면 좋다.

 

2. 최신 상태 관찰

최종 결과 관찰과 동일한 맥락으로, 특정 객체나 리소스의 상태 변경을 관찰할 때 최신 상태만 필요한 경우 유용하다.

 

 

AsyncSubject의 주의할 점과 한계점

1. 완료 이벤트 필요

AsyncSubject 는 .completed 이벤트가 방출되지 않으면, Observer에서 아무런 요소도 받지 못한다.

 

2. 한 번만 사용

AsyncSubject 는 한 번 .completed 되면 재사용할 수 없고, 완료 시점 이후 새로운 Observer가 구독을 해도 마지막 값만 받을 수 있다.

 

 

 

Reference

-https://reactivex.io/documentation/subject.html

 

 

 

'RxSwift' 카테고리의 다른 글

RxSwift에 관하여(Scheduler)  (0) 2023.08.11
RxSwift에 관하여(Subject)  (0) 2023.08.09
RxSwift에 관하여(Error Handling Operators)  (0) 2023.08.08
RxSwift에 관하여(Combining Observables)  (0) 2023.08.08
RxSwift에 관하여(Disposable)  (0) 2023.07.31