본문 바로가기
RxSwift

RxSwift에 관하여(Filtering Observables)

by iOS 개린이 2023. 7. 27.

Filtering Observable

-필터링 연산자는 Observable에서 방출하는 요소를 조건에 따라 필터링하는데 사용되는 연산자들을 말한다.

'Filter', 'Skip', 'Take', 'Distinct', 'Debounce' 등이 있으며, 차례대로 어떻게 수행하는지 학습해보자. 

 

1. Filter

 

연산자 이름 그대로 필터링 기능을 수행한다.

원본 Observable이 방출하는 요소 중 특정 조건을 충족하는 요소만을 선택해서 새로운 Observable로 출력한다.

 

filter의 사용 예시

let numbers = Observable.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        
let filterObservable = numbers.filter { $0 % 2 == 0 }
        
filterObservable.subscribe(onNext: {
      print($0)
})
.dispose()
        
//2
//4
//6
//8
//10

 

filter 연산자는 인자로 주어지는 클로저를 통해 조건을 제공 받는다.

원본 Observable(numbers)이 방출하는 이벤트 요소(1, 2, 3 ...)가 조건에 부합하는지 여부를 판단하고, 

반환 값이 true인 경우만, 새로운 Observable에 포함되어 방출된다.

 

 

filter의 정의

public func filter(_ predicate: @escaping (Element) throws -> Bool)
        -> Observable<Element> {
        Filter(source: self.asObservable(), predicate: predicate)
    }

 

 

filter의 매개변수 'predicate' 는 이벤트의 요소를 매개변수로 받아들이고, 이 요소가 특정 조건을 만족하는지에 따라 Bool값을 반환한다. 

 

함수의 바디에서는 'Filter' 클래스의 인스턴스를 생성하고 반환한다. 

Filter 클래스는 source(원본 Observable)와 predicate 클로저를 전달 받는다.

 

 

 

Filter 클래스의 정의

final private class Filter<Element>: Producer<Element> {
    typealias Predicate = (Element) throws -> Bool
    
    private let source: Observable<Element>
    private let predicate: Predicate
    
    init(source: Observable<Element>, predicate: @escaping Predicate) {
        self.source = source
        self.predicate = predicate
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = FilterSink(predicate: self.predicate, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

 

 

Create, Transform 연산자에서 보았던 내부 동작 메커니즘과 같네요.

source(원본 Observable)와 predicate(조건 클로저)를 저장하고,

'FilterSink' 객체를 생성하여 source(원본 Observable)를 subscribe하면서 전달해준다. 

이를 통해 원본 Observable이 방출하는 이벤트를 FilterSink가 받아 처리하는 로직을 수행하겠죠?

 

 

 

FilterSink 클래스의 정의

final private class FilterSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Predicate = (Element) throws -> Bool
    typealias Element = Observer.Element
    
    private let predicate: Predicate
    
    init(predicate: @escaping Predicate, observer: Observer, cancel: Cancelable) {
        self.predicate = predicate
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            do {
                let satisfies = try self.predicate(value)
                if satisfies {
                    self.forwardOn(.next(value))
                }
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .completed, .error:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

 

'FilterSink'의 'on' 메서드를 보자.

 

1) case .next(let value):

원본 Observable에서 방출한 이벤트가 .next일 때, predicate(조건 클로저)를 통해 value(방출된 요소)가 조건에 부합하는지 여부를 판단하고, 그 결과를 'satisfies' 에 저장한다. 'satisfies' 가 'true' 일 때, .next 이벤트로 요소를 전달해준다.

 

predicate(조건 클로저)를 통해 value(방출된 요소)가 조건에 부합하는지 여부를 판단하는 과정에서 발생하는 error는 Observable의 .error 이벤트를 통해 전달하고 작업을 마무리 한다.

 

2) case .completed, .error:

원본 Observable에서 방출한 이벤트가 .completed거나 .error일 때, 그대로 다음 Observable 이벤트로 전달해준다.

 

 

 

2. Skip

 

'Skip'은 원본 Observable이 방출하는 요소 중 처음부터 n개의 요소를 생략하고, 나머지 요소만 방출하는 연산자이다.  

 

skip의 사용 예시

let numbers = Observable.from([1, 2, 3, 4, 5, 6])
let skipped = numbers.skip(3)

skipped.subscribe(onNext: { print($0) })

//4
//5
//6

 

위 코드는 skip 연산자를 사용하여 'numbers(from Observable)' 가 방출하는 요소 중 3개까지는 생략하고, 그 이후의 요소만 방출하도록 한다.

 

 

skip의 정의

public func skip(_ count: Int)
        -> Observable<Element> {
        SkipCount(source: self.asObservable(), count: count)
    }

 

skip의 매개변수 'count' 는 몇 개의 요소까지 방출을 제한할 것인지를 나타내는 정수를 받고 있다.

그리고 'SkipCount' 클래스의 인스턴스를 생성하여 source(원본 Observable)와 count를 파라미터로 받는다.

 

다음 내부동작은 다른 연산자들과 동일하게 원본 Observable에서 방출하는 이벤트를 subscribe하고, skip 처리를 수행할 Observer 객체를 생성하여 전달하는 작업을 하기 때문에 설명은 생략하도록 한다.

 

이렇게 skip 연산자는 데이터 스트림에서 특정 시점 이후의 데이터만 처리하고 싶을 때 사용할 수 있다.

 

 

 

3. Take

 

'take'는 'skip' 연산자와 유사하지만 다른 기능을 수행한다.

'skip' 연산자는 n개의 요소를 제외한 나머지 요소에 대한 이벤트를 받았다면,

'take' 연산자는 n개의 요소에 대한 이벤트를 받고, 나머지 요소를 제외한다. 

즉, 원본 Observable이 방출하는 요소 중 처음부터 n개의 요소까지 이벤트를 받고 나머지 요소는 생략하는 연산자이다.

 

take의 사용 예시

let numbers = Observable.from([1, 2, 3, 4, 5, 6])
let takeObservable = numbers.take(3)

takeObservable.subscribe(onNext: { print($0) })

//1
//2
//3

 

take 연산자를 사용하여 'numbers(from Observable)' 가 방출하는 요소 중 3개까지만 받고, 그 이후의 요소는 방출을 생략하도록 한다.

 

take의 정의

public func take(_ count: Int)
        -> Observable<Element> {
        if count == 0 {
            return Observable.empty()
        }
        else {
            return TakeCount(source: self.asObservable(), count: count)
        }
    }

 

'count' 매개변수는 몇 개의 요소까지 방출을 받을 것인지 나타내는 정수를 받고 있다.

원본 Observable에서 처음부터 'count' 갯수의 이벤트만을 선택하여 새 Observable에 포함시키는 역할을 한다.

 

만약 count가 0일 경우, 아무런 이벤트도 방출하지 않아야 하기 때문에 Observable.empty()를 반환한다.

count가 0이 아닌 경우, 'TakeCount' 라는 클래스 인스턴스를 생성하여 source(원본 Observable)와 count를 파라미터로 받는다.

 

이렇게 take 연산자는 skip과 반대로 데이터 스트림에서 특정 시점까지의 데이터만 처리하고 싶을 때 사용할 수 있다.

 

 

 

4. DistinctUntilChanged

 

distinctUntilChanged 연산자는 Observable이 방출하는 항목 중에서 연속적으로 중복되는 항목을 제거한다. 

따라서 distinctUntilChanged 연산자를 적용한 Observable은 원본 Observable에서 방출되는 항목 중 연속적으로 중복되는 항목은 한 번씩만 방출한다.

 

distinctUntilChanged의 사용 예시

let observable = Observable.of(1, 1, 2, 3, 3, 3, 4, 4, 5)

let distinctObservable = observable.distinctUntilChanged()
distinctObservable.subscribe(onNext: { print($0) })

//1
//2
//3
//4
//5

 

 

'distinctUntilChanged()' 메서드를 적용하여 같은 요소가 연속해서 방출되는 경우, 그 중 첫 번째 요소만 방출하고, 이후에 나오는 요소들은 제외한다.

 

distinctUntilChanged의 정의

extension ObservableType where Element: Equatable {

    /**
     Returns an observable sequence that contains only distinct contiguous elements according to equality operator.

     - seealso: [distinct operator on reactivex.io](http://reactivex.io/documentation/operators/distinct.html)

     - returns: An observable sequence only containing the distinct contiguous elements, based on equality operator, from the source sequence.
     */
    public func distinctUntilChanged()
        -> Observable<Element> {
        self.distinctUntilChanged({ $0 }, comparer: { ($0 == $1) })
    }
}

 

'distinctUntilChanged' 연산자는 'ObservableType' 프로토콜에서 'Element'가 'Equatable' 프로토콜을 준수하는 경우에만 사용할 수 있다. 'Equatable' 프로토콜은 동일한 유형의 두 인스턴스가 비교가능한지 여부를 결정하는 방법을 정의한다.  따라서 distinctUntilChanged 연산자가 연속된 중복 항목을 검증하기 위해서는 이전 항목과 현재 항목이 같은지 여부를 판단할 수 있어야 하기 때문에 'Equatable' 프로토콜의 준수 여부가 필요한 것이다.

 

'distinctUntilChanged()' 메서드 내부에서 호출하고 있는 distinctUntilChanged()에 대해 알아보자.

public func distinctUntilChanged<K>(_ keySelector: @escaping (Element) throws -> K, comparer: @escaping (K, K) throws -> Bool)
        -> Observable<Element> {
            return DistinctUntilChanged(source: self.asObservable(), selector: keySelector, comparer: comparer)
    }

 

매개변수 'keySelector' 는 각 요소로부터 key를 계산하는 함수이다. key는 해당 요소가 서로 다른지 여부를 결정하는데 사용된다. 위의 사용예시에서는 '{ $0 }' 을 인자로 전달했는데, 이는 각 요소 그 자체를 키로 사용한다는 뜻이다.

 

매개변수 'comparer' 는 두 키 값이 같은지를 비교하는 함수이다. 여기서 키는 첫 번째 클로저(keySelector)에 의해 반환된 값이다. 위의 사용예시에서는 '{ $0 == $1 }' 을 인자로 전달했는데, '$0' 과 '$1' 은 각각 현재 요소와 이전 요소를 나타내고, 두 키 값이 같으면 true를, 다르면 false를 반환한다.

 

바디에서는 'DistinctUntilChanged' 객체를 생성하여 반환하고 있다. 이 객체에 대해 알아보자.

 

DistinctUntilChanged 클래스의 정의

final private class DistinctUntilChanged<Element, Key>: Producer<Element> {
    typealias KeySelector = (Element) throws -> Key
    typealias EqualityComparer = (Key, Key) throws -> Bool
    
    private let source: Observable<Element>
    fileprivate let selector: KeySelector
    fileprivate let comparer: EqualityComparer
    
    init(source: Observable<Element>, selector: @escaping KeySelector, comparer: @escaping EqualityComparer) {
        self.source = source
        self.selector = selector
        self.comparer = comparer
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = DistinctUntilChangedSink(parent: self, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

 

'source' 는 원본 Observable,

'selector' 는 원본 Observable에서 방출하는 요소를 받아 Key를 생성하는 클로저,

'comparer' 는 이전 key와 현재 key를 비교하여 bool 타입을 반환하는 클로저이다.

 

'DistinctUntilChanged' 의 'run' 메서드를 보자.

'DistinctUntilChangedSink' 객체를 생성하며, 원본 Observable을 subscribe하여 이벤트를 받을 수 있도록 한다.

 

DistinctUntilChangedSink 클래스의 정의

final private class DistinctUntilChangedSink<Observer: ObserverType, Key>: Sink<Observer>, ObserverType {
    typealias Element = Observer.Element 
    
    private let parent: DistinctUntilChanged<Element, Key>
    private var currentKey: Key?
    
    init(parent: DistinctUntilChanged<Element, Key>, observer: Observer, cancel: Cancelable) {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }
    
    func on(_ event: Event<Element>) {
        switch event {
        case .next(let value):
            do {
                let key = try self.parent.selector(value)
                var areEqual = false
                if let currentKey = self.currentKey {
                    areEqual = try self.parent.comparer(currentKey, key)
                }
                
                if areEqual {
                    return
                }
                
                self.currentKey = key
                
                self.forwardOn(event)
            }
            catch let error {
                self.forwardOn(.error(error))
                self.dispose()
            }
        case .error, .completed:
            self.forwardOn(event)
            self.dispose()
        }
    }
}

 

'DistinctUntilChangedSink' 클래스의 'currentKey' 프로퍼티는 최근에 처리한 요소의 키를 저장한다. 이를 통해 새로운 요소가 들어왔을 때, 비교하여 중복을 판단할 수 있다. 다음은 'DistinctUntilChangedSink' 클래스의 'on' 메서드를 보자.

 

case .next(let value):

원본 Observable의 .next 이벤트를 통해 새로운 요소가 들어오면,

먼저 'parent' 의 'selector' 클로저를 사용하여, 새로운 키를 생성하여 저장한다.

다음 'currentKey(이전 키)'가 존재하는지 여부를 판단하고, 있다면 비교를 시작한다. 

만약 'key(새로운 키)', 'currentKey(이전 키)'가 동일하다면, 즉, 연속된 중복 값이라면 아무런 작업도 수행하지 않고 함수를 종료한다. 반면, 두 키가 동일하지 않다면, 'currentKey' 를 해당 값으로 업데이트 하고, 옵저버에게 해당 값을 전달한다.

 

'parent'의 'selector' 클로저를 사용하여 새로운 값을 생성하거나,

'parent'의 'comparer' 클로저를 사용하여 두 키를 비교하는 과정에서 error가 발생하면, 그대로 Observable에게 error를 전달한다.

 

.next 이벤트 외에 .completed 이벤트나 .error 이벤트 또한 그대로 Observable에게 전달한다. 

 

 

distinctUntilChanged 연산자의 내부동작 정리

1. distinctUntilChanged 연산자를 호출하면 'DistinctUntilChanged' 클래스의 인스턴스가 생성되고,

이 클래스는 'source(원본 Observable)', 'selector(요소의 키를 생성하는 클로저)' 그리고 'comparer(키를 비교하는 클로저)'를 전달받는다.

 

2. 'DistinctUntilChanged' 클래스의 'run' 메서드는 'DistinctUntilChangedSink(옵저버)' 클래스의 인스턴스를 생성하고,  이 옵저버가 원본 Observable의 이벤트를 받을 수 있도록 subscribe한다.

 

3. 원본 Observable이 이벤트를 방출하면, 'DistinctUntilChangedSink' 의 'on' 메서드가 호출된다.

만약 방출된 이벤트가 .next라면 'selector' 클로저를 통해 새로운 키를 생성하고, 'comparer' 클로저를 사용하여 이전 키(currentKey 프로퍼티)와 비교한다.

 

4. 'comparer' 클로저의 결과(두 키의 비교결과)가 true라면 아무런 작업도 수행하지 않고 함수가 종료되고,

false라면 Observer에게 해당 값을 전달하고, 'currentKey'를 새로운 키로 업데이트 한다.

 

 

distinctUntilChanged 연산자의 사용 상황

1. 사용자가 텍스트 필드에 입력하는 값이 이전과 달라졌을 경우에만, UI를 업데이트 하는 등의 동작을 수행해야 할 때, 유용하게 사용된다.

 

2. 데이터 모델의 특정 속성이 변경되었을 때만 동작을 수행해야 할 경우, 유용하게 사용된다.

 

이렇게 'distinctUntilChanged' 연산자는 변경되는 이벤트를 감지하거나, 중복된 이벤트를 무시하고 싶을 경우, 유용하게 사용된다. 하지만 이전 값과 현재 값을 비교해야 하는 메커니즘에서 가장 최근의 값을 저장해두어야 하기 때문에 큰 데이터를 다룰 때에는 메모리 사용량에 영향을 미칠 수 있다. 

 

 

 

5. Debounce

 

'debounce' 연산자는 특정 시간을 지정하여, 이 시간이 끝난 시점에 가장 최근의 값을 방출한다.

 

예를 들어, 사용자가 문자를 입력할 때마다 이벤트가 생성되는 상황에서, 사용자가 'Apple' 을 검색하려고 한다.

그럼 'A', 'Ap', 'App', 'Appl', 'Apple' 이라는 다섯 번의 검색을 서버에 요청하겠죠? 

이렇게 사용자가 문자를 입력을 하는 동안 검색 요청을 보내는 것은 비효율적일 수 있다. 

 

여기서 'debounce' 를 사용하여 특정 시간을 1초라고 설정해보자.

그러면 사용자가 'Apple' 을 입력하고 1초 동안 다른 추가 입력이 없다면, 서버에 'Apple(가장 최근의 값)'이라는 한 번의 검색을 요청한다.

 

 

debounce의 정의

public func debounce(_ dueTime: RxTimeInterval, scheduler: SchedulerType)
        -> Observable<Element> {
            return Debounce(source: self.asObservable(), dueTime: dueTime, scheduler: scheduler)
    }

 

매개변수 'dueTime' 은 각 요소에 대한 지연 시간을 설정받는다. 각 요소가 방출된 후, 이 dueTime 동안 다른 요소가 방출되지 않으면, 마지막 요소가 방출된다.

 

매개변수 'scheduler'는 타이머를 실행하는 스케줄러이다. 코드가 메인 스레드에서 실행되어야 하는지, 아니면 새로운 스레드를 생성해서 실행되어야 하는지 등을 결정한다.

 

 

Debounce 클래스의 정의

final private class Debounce<Element>: Producer<Element> {
    fileprivate let source: Observable<Element>
    fileprivate let dueTime: RxTimeInterval
    fileprivate let scheduler: SchedulerType

    init(source: Observable<Element>, dueTime: RxTimeInterval, scheduler: SchedulerType) {
        self.source = source
        self.dueTime = dueTime
        self.scheduler = scheduler
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = DebounceSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    
}

 

'source' 는 원본 Observable을 저장하고,

'dueTime'은 각 요소가 방출된 후에 다른 요소가 방출되지 않아야 하는 시간을 저장하고,

'scheduler' 는 이 작업이 어떤 스레드에서 작업되어야 하는지를 저장한다.

 

'run' 메서드에서는 'DebounceSink' 객체를 생성하고, 'run' 메서드를 실행한다.

 

 

DebounceSink 클래스의 'run' 메서드 

func run() -> Disposable {
        let subscription = self.parent.source.subscribe(self)

        return Disposables.create(subscription, cancellable)
    }

 

'Debounce'의 'run' 메서드에서 호출하는 'DebounceSink'의 'run' 메서드 정의이다.

 

'DebounceSink'의 'run' 메서드는 수퍼 클래스(Debounce)에 저장되어 있는 원본 Observable을 subscribe한다.

이 때 전달되는 'self' 는 'DebounceSink(옵저버)' 자체이다.

 

이를 통해 DebounceSink는 원본 Observable에서 방출하는 이벤트를 받을 수 있다.

 

DebounceSink 클래스의 'on' 메서드 

func on(_ event: Event<Element>) {
        self.synchronizedOn(event)
    }

 

원본 Observable에서 이벤트를 방출하면, 'DebounceSink' 객체의 'on' 메서드가 호출된다.

내부에서는 'self.synchronizedOn(event)' 를 호출하고 있다.

 

 

DebounceSink 클래스의 'synchronizedOn' 메서드 

'synchronizedOn' 메서드의 로직을 이해하기 위해서 먼저 'DebounceSink' 에 정의되어 있는 프로퍼티가 무엇이 있는지, 어떤 역할을 수행하는지 알아야 한다.

 

 

1) parent

private let parent: ParentType

'DebounceSink' 클래스를 생성하는 'Debounce' 객체를 말한다.

 

 

2) id

private var id = 0 as UInt64

이벤트에 순차적으로 부여되는 고유한 식별자로, 새로운 .next 이벤트가 발생할 때마다 'id'가 증가하게 된다.

이를 통해 각 이벤트는 자신만의 고유한 'id' 를 가지게 되며, 이를 통해 이벤트를 구분할 수 있다.

 

'debounce' 연산자는 주어진 시간(dueTime)동안 새로운 이벤트가 발생하지 않을 경우, 가장 최근에 발생한 이벤트를 방출한다고 했죠? 

따라서 최신 이벤트인지 판단할 수 있도록 각 이벤트에 'id' 를 부여하는 것입니다.

 

 

3) value

private var value: Element?

 

이 프로퍼티는 .next 이벤트에서 방출된 가장 최신의 요소를 저장한다. 

이를 통해 .completed 이벤트가 발생했을 때, 최신의 요소를 전달하는데 사용된다.

 

 

4) cancellable

let cancellable = SerialDisposable()

 

현재 진행중인 disposable 작업을 관리하는 객체로, 새로운 이벤트가 발생하면 이전의 disposable 작업은 취소(dispose)되고, 새로운 disposable 작업이 대체된다. 이를 통해 최신 이벤트만을 전달하는 로직을 구현할 수 있다. 

 

이제 'synchronizedOn' 메서드를 파악해보자.

func synchronized_on(_ event: Event<Element>) {
        switch event {
        case .next(let element):
            self.id = self.id &+ 1
            let currentId = self.id
            self.value = element


            let scheduler = self.parent.scheduler
            let dueTime = self.parent.dueTime

            let d = SingleAssignmentDisposable()
            self.cancellable.disposable = d
            d.setDisposable(scheduler.scheduleRelative(currentId, dueTime: dueTime, action: self.propagate))
        case .error:
            self.value = nil
            self.forwardOn(event)
            self.dispose()
        case .completed:
            if let value = self.value {
                self.value = nil
                self.forwardOn(.next(value))
            }
            self.forwardOn(.completed)
            self.dispose()
        }
    }

 

1. case .next(let element):

원본 Observable에서 .next 이벤트를 방출했을 때, 처리하는 로직이다.

 

1) 'id' 를 1 증가시킴으로써 각 이벤트마다 고유한 식별자를 부여하고, 증가된 'id' 를 'currentId'에 저장한다.

 

2) 원본 Observable에서 방출한 값(element)를 'value' 에 저장한다.(value는 최신 요소를 저장하고 있음.)

 

3) parent(Debounce 클래스)의 스케줄러와 지연 시간을 참조한다. 

 

4) 새로운 'SingleAssignmentDisposable' 객체를 생성한다. 이 객체는 나중에 예약된 작업을 취소할 수 있게 해주는 역할을 한다.

 

5) 생성한 'SingleAssignmentDisposable' 객체를 'cancellable' 에 할당한다. 'cancellable' 은 이전에 예약된 작업이 있다면 그 작업을 취소할 수 있게 해주는 역할을 한다.

 

6) 'SingleAssignmentDisposable' 객체의 'setDisposable' 메서드를 수행한다. 이 메서드에 전달되는 'scheduler.scheduleRelative' 메서드를 보면,

'currentId(현재 이벤트의 id)'와 'dueTime(지연할 시간)', 그리고 'action(실행할 작업)' 을 받고 있다.

이 로직은 'debounce' 연산자의 핵심인데, 간단하게 정리해보면 새로운 이벤트가 발생했을 때, 이 이벤트를 처리하는 작업을 예약하는 것이다. 예약하는 작업은 'self.propagate' 메서드로 수행하고, 이 메서드는 주어진 'currentId(현재 이벤트의 id)' 를 가진 이벤트를 처리한다. 

 

근데 이 예약된 작업은 바로 실행되는 것이 아니라 'dueTime' 만큼의 시간이 지난 후에 실행되어야 하죠?

그리고 'dueTime' 만큼의 시간 동안 또 다른 새로운 이벤트가 발생하면, 새로운 이벤트에 대한 새로운 작업이 예약되고, 이전에 예약된 작업은 취소되어야 한다.

 

따라서 'dueTime' 만큼의 시간 동안 이벤트가 발생하면, 새로운 이벤트를 예약하는 작업과 이전에 예약된 이벤트를 갈아치우는 작업을 수행하고, 'dueTime' 만큼의 시간 동안 아무 이벤트가 발생하지 않으면, 예약된 가장 최신의 이벤트가 수행되는 것이다.

 

 

2. case .completed:

원본 Observable이 .completed 이벤트를 방출했을 경우, 처리하는 로직이다.

 

1) 만약 마지막으로 방출된 값이 있다면(value가 nil이 아닐 경우),

.next 이벤트를 통해 value(가장 최신에 방출한 이벤트의 요소)를 옵저버에게 전달한다.

 

원본 Observable이 .completed 이벤트를 방출하면, 더 이상 새로운 이벤트가 발생하지 않겠죠? 

이 경우, 'dueTime' 을 기다리더라도 더 이상 새로운 이벤트는 방출되지 않기 때문에 아직 전달되지 않은 마지막 이벤트가 있다면 이를 옵저버에게 전달해주는 것이다. 

 

이 과정을 통해 원본 Observable이 완료되었을 때, 마지막으로 발생한 이벤트의 요소를 놓치지 않고 전달할 수 있다.

 

2) 완료 이벤트를 Observer에게 전달해준다.

 

 

'debounce' 연산자의 내부동작 정리

1. 'debounce' 연산자를 사용하면, 'Debounce' 객체를 생성하여 반환하고, 'Debounce' 객체는 구독되었을 때, 'run' 메서드를 실행한다.

 

2. 'Debounce' 객체의 'run' 메서드는 'DebounceSink(옵저버)' 를 생성하고, 'run' 메서드를 호출한다.

 

3. 'DebounceSink' 의 run 메서드는 원본 Observable을 subscribe 한다. 이를 통해 방출되는 이벤트를 처리할 수 있다.

 

4. 원본 Observable에서 이벤트가 방출되면, 'DebounceSink' 의 'synchronized_on' 메서드가 호출되며, 방출되는 각 이벤트를 처리한다.

 

5. 원본 Observable에서 .next 이벤트가 방출되면, 'dueTime' 만큼의 시간 이후에 최신 이벤트를 처리하는 로직이 수행되도록 스케줄링 한다. 이 스케줄링 된 작업은 취소가 가능하며, 'dueTime' 시간 동안 새로운 이벤트가 발생하면 예약된 작업을 최신 작업으로 갈아치운다.

 

원본 Observable에서 .completed 이벤트가 방출되면 아직 전달되지 않은 마지막 이벤트 요소가 있다면, 옵저버에게 이를 전달하고 작업을 마무리 한다.

 

6. 'dueTime' 이후에 'propagate' 메서드가 실행되면, 'value(현재 저장된 요소)' 와 '이벤트 Id'를 확인한다.

만약 저장된 요소 값이 있고, 이 값의 'self.id' 와 'currentId(현재 이벤트 id)'가 일치하면 마지막 이벤트로 간주되어 옵저버에게 .next 이벤트를 통해 저장된 요소를 전달해준다. 

 

이렇게 'debounce' 연산자는 주어진 시간(dueTime) 동안 더 이상 새로운 이벤트가 발생하지 않았을 경우, 가장 최근에 방출된 이벤트만을 전달하므로, 자주 발생하는 이벤트를 제외하고, 최근의 이벤트만을 받기 위해 사용된다.

 

debounce 연산자의 사용예시 

import RxSwift
import RxCocoa

let searchTextField: UITextField = UITextField()

let disposeBag = DisposeBag()

searchTextField.rx.text.orEmpty
    .debounce(.milliseconds(500), scheduler: MainScheduler.instance)
    .distinctUntilChanged()
    .flatMapLatest { query in
        return self.performSearch(query: query)
    }
    .subscribe(onNext: { results in
        print("Search results: \(results)")
    })
    .disposed(by: disposeBag)

 

위 코드는 텍스트 필드를 통해 입력받은 검색어 쿼리를 처리하는 작업에 관한 내용을 담고 있다.

처음 보는 부분은 가볍게 넘어가고, 'debounce' 연산자가 이렇게 사용된다는 것만 파악해보자.

 

1) searchTextField.rx.text.orEmpty

텍스트 필드의 text 변화를 관찰하는 역할을 수행한다. 'text' 가 nil인 경우를 제외하기 위해 'orEmpty' 가 사용되었다.

 

2) .debounce(.milliseconds(500), scheduler: MainScheduler.instance)

입력받은 텍스트에 대한 변화를 0.5초간 기다린다. 

따라서 사용자가 텍스트를 입력하고, 0.5초 동안 추가 입력이 없을 때, 마지막 이벤트를 방출한다.

 

3) .distinctUntilChanged()

연속으로 중복되는 검색어에 대한 이벤트는 제외하여, 동일한 검색어에 대한 요청을 방지한다.

 

4) .flatMapLatest { query in ... }

새로운 검색 요청이 들어올 때마다 이전에 수행중이던 검색 작업을 취소하고, 새로운 검색을 수행한다. 

 

5) .subscribe(onNext: { results in })

검색 결과를 받아와 출력한다.

 

 

이렇게 'debounce' 연산자의 정의, 내부 동작 메커니즘, 사용 예시를 학습해보았다.

내부 동작 부분에서는 아직 배우지 않은 Scheduler나 Disposabel 등이 있기 때문에 정확한 정보인지 헷갈린다.

해당 개념을 모두 학습한 후 다시 'debounce' 연산자의 내부 동작을 확인해보아야 할 것 같다.

 

 

 

 

Reference

https://reactivex.io/documentation/operators.html

https://velog.io/@horeng2/Swift-RxSwift-Filtering-Operators2

https://ios-development.tistory.com/175