본문 바로가기
RxSwift

RxSwift에 관하여(Transforming Observables)

by iOS 개린이 2023. 7. 23.

지난 시간에는 Observable의 Creating 연산자에 대해 학습하며, 특정한 특성을 가지고 있는 Observable의 생성에 관해 알아보았다. 이번에는 Observable의 Tansforming 연산자에 대해 학습해보자.

 

 

Transforming 연산자

Observable이 방출하는 항목을 새로운 형태로 변환하거나, 새로운 Observable로 변환하는 등의 작업을 할 수 있도록 하는 연산자이다.

 

 

1. Map

 

map 연산자는 Observable이 방출하는 각 항목에 함수를 적용하여 그 결과를 새로운 Observable로 반환한다.

 

map의 사용 예시

let observable = Observable.from([1, 2, 3, 4, 5])

let newObservable = observable.map { $0 * $0 }
newObservable.subscribe(onNext: { print($0) })

//1, 4, 9, 16, 25를 각각 방출함

 

from 연산자는 인자로 받는 배열의 각 요소를 방출하는 연산자이다.

map 연산자를 통해, from으로 인해 생성된 Observable이 방출하는 각 항목이 자기 자신과 곱해지는 새로운 Observable이 생성된다. 

 

기존 Observable의 값을 변환하지 않은 상태에서, 데이터를 처리하고 새로운 Observable로 반환하기 때문에 Side-effect를 최소화할 수 있다.(함수형 프로그래밍)

 

 

map의 정의

extension ObservableType {

    /**
     Projects each element of an observable sequence into a new form.

     - seealso: [map operator on reactivex.io](http://reactivex.io/documentation/operators/map.html)

     - parameter transform: A transform function to apply to each source element.
     - returns: An observable sequence whose elements are the result of invoking the transform function on each element of source.

     */
     
    public func map<Result>(_ transform: @escaping (Element) throws -> Result)
        -> Observable<Result> {
        Map(source: self.asObservable(), transform: transform)
    }
}

 

map 연산자 또한 ObservableType 프로토콜의 extension 메소드로 정의되어 있다. 즉, 모든 Observable이 map 연산자를 사용할 수 있다는 것이다.

 

파라미터로 받는 'transform' 은 방출되는 element(요소)에 적용할 함수를 뜻하고,

반환타입 'Result' 는 각 element에 transform 함수를 적용한 결과를 뜻한다.

transform은 원본 Observable에서 방출하는 각 요소에 적용되며, 변환된 결과는 새로운 Observable에 포함된다.

 

바디 부분은 'Map' 객체를 생성하고 반환하고 있는데, 매개변수 'source'에 전달하는 'self.asObservable()' 은 현재 map 연산이 수행될 원본 Observable을 말한다. 이를 통해 원본 Observable이 방출하는 이벤트 요소에 대해 transform 연산을 수행하고, 결과를 새로운 Observable 인스턴스로 반환한다.  

 

 

Map 클래스의 정의

final private class Map<SourceType, ResultType>: Producer<ResultType> {
    typealias Transform = (SourceType) throws -> ResultType

    private let source: Observable<SourceType>

    private let transform: Transform

    init(source: Observable<SourceType>, transform: @escaping Transform) {
        self.source = source
        self.transform = transform
    }

    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType {
        let sink = MapSink(transform: self.transform, observer: observer, cancel: cancel)
        let subscription = self.source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

 

먼저 Map 클래스의 init 메서드를 보면, 인자로 받은 'source(원본 Observable의 인스턴스)' 와 'transform(map 연산자에 전달된 변환 함수)' 를 저장한다.

 

run 메서드는 'MapSink' 객체를 생성하고, transform(변환 함수), observer, cancel를 전달한다.

다음 self.source(원본 Observable)에 subscribe를 통해 MapSink를 전달하는데,

MapSink는 ObserverType 프로토콜을 준수하고 있기 때문에 Observer에 해당하며,

이를 통해 MapSink가 원본 Observable에서 방출하는 이벤트를 받을 수 있는 것이다.

 

map 연산자의 변환을 처리하고, 새로운 Observable을 생성하는 로직은 MapSink에서 수행하겠죠?

 

MapSink 클래스의 정의

final private class MapSink<SourceType, Observer: ObserverType>: Sink<Observer>, ObserverType {
    typealias Transform = (SourceType) throws -> ResultType

    typealias ResultType = Observer.Element 

    private let transform: Transform

    init(transform: @escaping Transform, observer: Observer, cancel: Cancelable) {
        self.transform = transform
        super.init(observer: observer, cancel: cancel)
    }

    func on(_ event: Event<SourceType>) {
        switch event {
        case .next(let element):
            do {
                let mappedElement = try self.transform(element)
                self.forwardOn(.next(mappedElement))
            }
            catch let e {
                self.forwardOn(.error(e))
                self.dispose()
            }
        case .error(let error):
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.completed)
            self.dispose()
        }
    }
}

 

map을 통해 새로운 Observable을 생성하고 subscribe 했을 때,

map은 'Map' 클래스 인스턴스를 호출하고, 'Map' 클래스의 'run' 메서드는 원본 Observable에 subscribe하여 'MapSink'를 전달한다. 이로 인해 원본 Observable에서 방출하는 이벤트를 'MapSink' 가 처리할 수 있다.

그리고 'MapSink' 의 'on' 메서드가 바로 원본 Observable에서 방출하는 이벤트를 받고, 각 항목에 대해 transform 함수를 적용하는 로직을 수행한다.

 

MapSink의 'on' 메서드

1) .next(let element)

element는 원본 Observable에서 전달된 요소의 값이다. 이 요소에 transform 함수를 적용하여 변환시키고, 변환된 요소는 .next(mappedElement) 이벤트를 통해 새로운 Observer에게 전달된다. 여기서 방출하는 .next(mappedElement) 이벤트는 map을 통해 새로 생성된 Observable에서 방출하는 이벤트이다. 하지만 만약 transform 함수에서 에러가 발생하면, .error(e) 이벤트를 통해 Observer에게 전달하고, dispose() 메소드를 호출한다. 

 

2) .error(let error)

원본 Observable에서 에러가 발생했을 경우 방출되는 이벤트이다. 이 case에서는 전달된 에러를 그대로 .error 이벤트를 통해 새로운 Observer에게 전달된다.

 

3) .completed

이 case 또한 원본 Observable에서 방출된 completed 이벤트를 그대로 새로운 Observer에게 전달한다.

 

 

map 연산자 내부 동작 정리

1. map 연산자를 통해 새로운 Observable을 생성한다. map 연산자는 'transform(변환 함수)' 를 인자로 받는데 Observable이 방출하는 각 항목에 어떤 작업을 수행할 것인지 작성한다. 

 

2. map 은 새로운 'Map' 객체를 생성하는데, 이 객체는 'source(원본 Observable)' 와 'transform(변환 함수)' 를 인자로 받아 저장한다.

 

3. 'Map' 객체의 run 메서드는 'MapSink' 객체를 생성한다. 'MapSink' 는 원본 Observable에서 방출하는 이벤트를 받고 처리하는 Observer이며, 'Map' 객체의 run 메서드 내부에서는 원본 Observable에 'MapSink'를 subscribe 한다. 

 

4. 원본 Observable이 방출하는 이벤트를 'MapSink' 에서 받고 처리한다. 이 과정은 'MapSink'의 'on' 메서드에서 수행하는데, .next 이벤트에 대해서는 transform 함수를 적용하여 새로운 요소를 생성하고, 결과를 새로운 Observable의 .next 이벤트를 통해 새로운 Observer(새로운 Observable을 subscribe하면서 생성되는 Observer)에게 전달한다. 그 외에 .error 이벤트나 .completed 이벤트는 그대로 새로운 Observer에게 전달한다.

 

 

 

 

2. FlatMap

 

flatMap 연산자는 원본 Observable이 방출하는 각 항목에 함수를 적용하여 새로운 Observable을 반환하는데, 이렇게 생성된 Observable들의 요소를 단일 Observable 스트림으로 결합한다.

 

 

flatMap의 사용 예시

let observable = Observable.of("Apple", "Banana", "Orange")

let result = observable.flatMap { word in
     return Observable.from(word.map { String($0) })
}

result.subscribe(onNext: { print($0) })

// "A"
// "p"
// "p"
// "l"
// "e"
// "B"
// "a"
// ...

 

of 연산자를 통해 생성된 Observable은 각 요소를 하나씩 방출한다.  

 

flatMap을 사용하면,

1. 원본 Observable에서 방출되는 'Apple', 'Banana', 'Orange' 요소 각각에 대한 Observable(예제에서는 from)이 생성된다.

 

2. from을 통해 생성된 각 Observable은 원본 Observable에서 방출하는 요소('Apple')에 map 연산자를 적용한 값('A', 'p', 'p', 'l', 'e')을 인자로 받는다.

 

3. from을 통해 생성된 Observable이 방출하는 이벤트 요소를 모두 단일 스트림으로 결합하여 방출한다.

 

여러 Observable들이 독립적으로 이벤트를 방출하면, 이 이벤트들은 시간적으로 분산되어 있다. 하지만 flatMap은 여러 Observable이 방출하는 이벤트들을 모두 하나의 Observable 스트림으로 결합한다.

 

그 결과, 각 Observable에서 방출되는 요소(A', 'p', 'p', 'l', 'e', 'B', 'a', 'n')가 순서대로 하나의 스트림에 들어가게 된다. 

이렇게 flatMap을 사용하면, 여러 Observable이 독립적으로 방출하는 이벤트들을 하나의 스트림에서 처리할 수 있다. 

 

 

flatMap의 정의

public func flatMap<Source: ObservableConvertibleType>(_ selector: @escaping (Element) throws -> Source)
        -> Observable<Source.Element> {
            return FlatMap(source: self.asObservable(), selector: selector)
    }

 

flatMap 메서드는 제네릭 타입 'Source' 를 사용하며, 이 Source는 'ObservableConvertibleType'을 준수해야 한다.

'ObservableConvertibleType' 은 어떤 타입이든 자신을 Observable로 변환할 수 있다는 것을 나타내는 프로토콜이다. 따라서 Source는 Observable로 변환될 수 있는 어떤 타입이라는 뜻이다.

 

매개변수로 받는 'selector' 는 변환함수를 의미한다. 원본 Observable이 방출하는 요소를 매개변수로 받아서 Source 타입을 반환한다. 따라서 selector는 방출되는 각 요소를 받아 처리하고, 새로운 Observable로 변환한다.

 

반환타입을 보면 Observable<Source.Element> 인데, 여기서 Source.Element는 새로운 Observable의 요소 타입을 의미한다. 그리고 flatMap이 반환하는 객체를 보면 'FlatMap' 클래스이며, 'self.asObservable(원본 Observable )' 과 'selector(변환 함수)' 를 인자로 받고 있다.  

 

 

FlatMap 클래스의 정의

final private class FlatMap<SourceElement, SourceSequence: ObservableConvertibleType>: Producer<SourceSequence.Element> {
    typealias Selector = (SourceElement) throws -> SourceSequence

    private let source: Observable<SourceElement>
    
    private let selector: Selector

    init(source: Observable<SourceElement>, selector: @escaping Selector) {
        self.source = source
        self.selector = selector
    }
    
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == SourceSequence.Element {
        let sink = FlatMapSink(selector: self.selector, observer: observer, cancel: cancel)
        let subscription = sink.run(self.source)
        return (sink: sink, subscription: subscription)
    }
}

 

전체적인 흐름은 'map' 연산자와 비슷하다.

'source' 는 원본 Observable을 저장하고 있고, 'selector' 는 변환 함수를 저장하고 있다.

'run' 메서드는 'FlatMapSink' 객체를 생성하고, 이 객체의 run 메서드를 호출하며 'source(원본 Observable)' 를 전달하고 있다.

 

 

FlatMapSink 클래스의 정의

private final class FlatMapSink<SourceElement, SourceSequence: ObservableConvertibleType, Observer: ObserverType> : MergeSink<SourceElement, SourceSequence, Observer> where Observer.Element == SourceSequence.Element {
    typealias Selector = (SourceElement) throws -> SourceSequence

    private let selector: Selector

    init(selector: @escaping Selector, observer: Observer, cancel: Cancelable) {
        self.selector = selector
        super.init(observer: observer, cancel: cancel)
    }

    override func performMap(_ element: SourceElement) throws -> SourceSequence {
        try self.selector(element)
    }
}

 

당연히 원본 Observable에서 방출되는 요소를 'FlatMapSink'에서 받아 변환함수를 적용하고, 나온 결과(새로운 Observable)를 모아서 단일 이벤트 스트림으로 결합하는 작업을 모두 수행할 줄 알았다.

 

하지만 이 모든 작업을 FlatMapSink 혼자 수행하지 않는다. FlatMapSink가 상속받고 있는 'MergeSink' 와 함께 해당 작업을 수행한다. 따라서 작동 메커니즘을 이해하기 위해서는 MergeSink도 함께 보아야 한다.(눈으로 보고 정리한 내용을 알려드림)

 

'FlatMap' 클래스의 run 메서드에서 'FlatMapSink'를 생성하고, run 메서드를 호출했었죠?

하지만 'FlatMapSink'의 정의를 보면, run 메서드에 대한 정의가 없기 때문에 'FlatMapSink' 가 상속받고 있는 'MergeSink' 라는 클래스에 정의되어 있는 'run' 메서드가 호출된다. 'MergeSink' 클래스의 run 메서드는 주어진 원본 Observable을 구독하는 작업을 수행한다.

 

flatMap을 통해 새롭게 생성된 Observable에 subscribe를 하면,

1) 원본 Observable에서 방출하는 이벤트를 'MergeSink' 의 'on' 메서드가 받아서 처리한다.

 

2) 원본 Observable에서 방출하는 이벤트가 .next 인 경우, 'MergeSink' 의 'nextElementArrived(element: SourceElement)' 메서드가 호출된다. 

 

3) 'nextElementArrived' 메서드는 원본 Observable로부터 .next 이벤트를 받을 때마다 호출되며, .next 이벤트를 통해 받아온 element(요소)를 'performMap' 메서드를 호출하여 처리한다. 이 때, performMap 메서드는 'FlatMapSink' 클래스에 override된 performMap이 호출된다. 

 

4) FlatMapSink의 'performMap' 메서드를 보면, 'selector(변환 함수)'를 element에 적용하고 있다. 즉, 원본 Observable에서 .next 이벤트를 통해 방출한 요소를 새로운 Observable로 만드는 작업을 수행하고 있는 것이다.

 

5) FlatMapSink의 performMap을 통해 생성된 새로운 Observable은 다시 'MergeSink'의 'subscribeInner' 메서드를 통해 subscribe 된다. 새로운 Observable에서 방출하는 이벤트를 받아 병합된 스트림에 추가한다.

 

간단하게 보면,

'FlatMapSink' 는 여러 Observable에서 방출되는 요소를 받아 'selector(변환함수)'를 적용하여 새로운 Observable을 생성하는 작업을 수행하고, 'MergeSink' 클래스에서는 새롭게 만들어진 Observable들을 받아서 단일 Observable 스트림으로 결합하는 작업을 수행한다.

 

 

flatMap 연산자 내부 동작 정리

1. flatMap 연산자를 사용하면, FlatMap 클래스의 객체가 생성된다. 이 때 FlatMap의 init 메서드는 '원본 Observable'과 '사용자가 제공한 변환 함수 selector' 를 인자로 받는다.

 

2. FlatMap 클래스의 run 메서드를 통해 'FlatMapSink' 객체를 생성하고, FlatMapSink의 run 메서드를 호출한다.(FlatMapSink의 run 메서드는 정의되어 있지 않아 수퍼 클래스인 'MergeSink'의 run 메서드가 호출됨)

 

3. 'MergeSink' 의 run 메서드는 원본 Observable을 subscribe 한다.

 

4. 원본 Observable에서 이벤트가 발생하면, 'MergeSink'의 on 메서드가 호출된다. 이 때, 방출되는 이벤트가 .next 라면 MergeSink의 'nextElementArrived' 메서드가 호출된다. 

 

5. 'nextElementArrived' 메서드는 FlatMapSink 에서 오버라이드 된 'performMap' 메서드를 호출한다. 이 메서드는 사용자가 제공한 변환 함수를 실행하여 원본 Observable에서 방출된 요소를 새로운 Observable로 변환한다.

 

6. 변환된 새 Observable은 MergeSink의 'subscribeInner' 메서드를 통해 구독되며, 이 메서드는 새로운 Observable이 방출하는 각 이벤트를 병합된 스트림에 추가한다. 이를 통해 여러 Observable이 방출하는 이벤트가 단일 스트림으로 결합되는 것이다. 

 

 

flatMap은 Observable이 방출하는 각 요소에 대해 비동기 작업을 수행하고, 모든 Observable의 결과를 하나의 스트림으로 결합하여 받고 싶은 상황에서 유용하다고 한다.

 

 

3. Scan

 

scan 연산자는 Swift의 reduce 함수와 유사하게 누적 값을 계산하는 역할을 한다. Observable을 통해 방출되는 첫 번째 항목에 함수를 적용하고 방출한다. 그 다음 이전에 방출된 항목과 새롭게 방출된 항목을 결합하여 방출하는 누산기 역할을 한다.

 

scan의 사용 예시

Observable.of(1, 2, 3, 4, 5)
    .scan(0, accumulator: +)
    .subscribe(onNext: {
        print($0)
    })

//1
//3
//6
//10
//15

 

of 연산자를 통해 방출하는 1, 2, 3, 4, 5를 scan 연산자를 통해 각 요소를 더하여 누적 값을 계산한다.

 

이제 scan 연산자의 정의에 대해서 알아볼 것인데, Swift의 reduce도 2가지 형태가 있듯이, scan 연산자에도 2가지 형태가 있다.

 

첫 번째 형태의 scan 연산자 정의

public func scan<A>(into seed: A, accumulator: @escaping (inout A, Element) throws -> Void)
        -> Observable<A> {
        Scan(source: self.asObservable(), seed: seed, accumulator: accumulator)
    }

 

첫 번째 형태의 scan 연산자는 'seed(초기값)'와 'accumulator(누산 함수)'를 매개변수로 받는다. 'accumulator' 의 매개변수를 inout 키워드로 표시함으로써 반환값이 필요없이 누적값을 저장할 수 있다.

 

바디 부분은 'Scan' 클래스 객체를 생성하고 있으며, 이 객체는 'source(원본 observable)', 'seed(초기값)', 'accumulator(누산 함수)' 를 전달 받고 있다. 

 

이렇게 첫 번째 형태의 scan 연산자는 'inout' 키워드를 매개변수에 사용하여, 함수가 누산 값을 반환하지 않고 직접 변경해준다.

 

첫 번째 형태 scan 연산자의 사용

let numbers = Observable.of(1, 2, 3, 4, 5)

let scanned = numbers.scan(into: 0) { (total, number) in
    total += number
}

scanned.subscribe(onNext: { print($0) })

// 1
// 3
// 6
// 10
// 15

 

 

우리가 매개변수 첫 번째 형태의 scan 연산자는 'accumulator' 함수의 매개변수에 'inout' 키워드를 사용하기 때문에 직접 누적 값을 설정해줄 수 있다고 했다. 클로저에서 매개변수 'total(inout 파라미터)' 은 누적 값을 저장하며, 'number'는 Observable에서 방출되는 요소를 저장한다.

 

그리고 함수 내부에서는 total(누적 값)을 직접 설정하여 총 결과를 받아볼 수 있다.

 

 

두 번째 형태의 scan 연산자 정의

public func scan<A>(_ seed: A, accumulator: @escaping (A, Element) throws -> A)
        -> Observable<A> {
        return Scan(source: self.asObservable(), seed: seed) { acc, element in
            let currentAcc = acc
            acc = try accumulator(currentAcc, element)
        }
    }

 

두 번째 형태의 scan 연산자 또한 'seed(초기값)'와 'accumulator(누산 함수)' 두 개의 매개변수를 받고 있다.

하지만 'accumulator' 의 정의를 보면, 첫 번째 형태와 다르게 'inout' 키워드를 사용하고 있지 않다. 따라서 누적 값과 현재 요소를 받아 새로운 누적 값을 계산하여 반환하는 로직을 수행한다.

 

바디에서도 동일하게 'Scan' 클래스를 생성하여 'source(원본 observable)', 'seed(초기값)', 'accumulator(누산 함수)' 를 전달 받고 있는데, 여기서는 'acc(누적 값)'와 'element(현재 요소)'를 받아서 그것을 기반으로 새로운 누적 값을 계산하고 반환한다. 

 

여기서 누적 값 'acc'는 이전 단계의 연산 결과이며, 처음에는 'seed(초기값)' 가 사용된다. 이후에는 각 단계의 연산 결과가 계속해서 업데이트되며 사용된다. 여기서 '매개변수인 acc에 어떻게 'accumulator'를 적용한 값을 저장할 수 있냐' 라는 의문을 품을 수 있는데 'Scan' 객체의 정의를 보면 누적 값에 대한 매개변수에 'inout' 키워드를 사용한 것을 볼 수 있다.

 

final private class Scan<Element, Accumulate>: Producer<Accumulate> {
    typealias Accumulator = (inout Accumulate, Element) throws -> Void
    
    private let source: Observable<Element>
    fileprivate let seed: Accumulate
    fileprivate let accumulator: Accumulator
    
    init(source: Observable<Element>, seed: Accumulate, accumulator: @escaping Accumulator) {
        self.source = source
        self.seed = seed
        self.accumulator = accumulator
    }

 

이렇게 scan 연산자는 누적 연산을 수행하면서 각 단계의 결과를 방출하는 Observable을 생성하는 역할을 수행하고, 우리는 이를 통해 누산 과정의 각 단계를 볼 수 있다.

 

두 번째 형태 scan 연산자의 사용

let numbers = Observable.of(1, 2, 3, 4, 5)

let scanned = numbers.scan(0) { (total, number) in
    total + number
}

scanned.subscribe(onNext: { print($0) })

// 1
// 3
// 6
// 10
// 15

 

첫 번째 형태와 다르게 우리가 누적 값을 직접 변경해주는 로직을 넣을 필요 없이, 수행할 작업만 작성하면 누적 값이 계속 'total' 로 들어오면서 총 누적 값의 결과를 받아볼 수 있다.

 

처음에 소개했던 scan 연산자의 사용예시는 바로 이 두 번째 형태의 scan 연산자를 사용한 예시이다.

 

 

어떤 형태의 scan 연산자를 사용하는 것이 좋을까?

두 가지 형태 모두 각각 다른 상황에서 사용할 수 있도록 설계되었기 때문에 상황에 맞게 사용하는 것이 좋다.

첫 번째 형태의 scan 연산자는 'inout' 매개변수를 사용하기 때문에 누적 값을 직접 변경할 수 있었다. 따라서 누적 값을 직접 조작하거나 타입의 상태를 변경해야 하는 경우에 사용하면 좋다.

 

 

 

4. Buffer

 

buffer 연산자는 Observable에서 발생하는 항목을 수집하고 버퍼링 한 다음, 해당 버퍼가 특정 조건을 만족하면 버퍼 내의 모든 항목을 배열로 방출하는 역할을 수행한다.

 

buffer의 사용 예시

let source = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)

source
    .buffer(timeSpan: .seconds(3), count: 3, scheduler: MainScheduler.instance)
    .subscribe(onNext: { print($0) })

//[0, 1, 2]
//[3, 4, 5]
//[6, 7, 8]
...

 

interval 연산자를 통해 생성된 Observable은 0부터 1초 간격으로 1씩 증가하는 정수를 방출한다.

이 Observable에 위 코드처럼 buffer를 사용하면, 방출되는 항목을 3개씩 묶어서 배열로 방출한다.

따라서 1초마다 방출되는 항목을 3개씩 또는 3초동안 묶어 배열로 방출해준다.

 

이렇게 buffer 연산자는 Observable의 항목들을 일정 기간 또는 특정 개수에 따라 모아서 배열로 방출한다.

 

buffer의 정의

public func buffer(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
        -> Observable<[Element]> {
        BufferTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }

 

매개변수 'timeSpan'은 버퍼가 수집하는 시간 간격을 설정한다. 주어진 시간이 경과하면, 버퍼는 현재 수집된 항목들을 배열로 방출하고, 새로운 버퍼를 생성하여 다음 시간 간격 동안 수집을 시작한다.

 

매개변수 'count'는 버퍼가 수집할 항목의 개수를 설정한다. 버퍼가 주어진 개수만큼 항목을 수집하면 해당 항목들을 배열로 방출하고, 새로운 버퍼를 생성하여 다음 수집을 시작한다.

 

buffer 연산자는 'Observable<[Element]>' 타입을 반환하는데, 시간 또는 개수에 따라 수집한 항목들을 담은 배열을 방출하는 Observable을 뜻한다. 따라서 'BufferTimeCount' 객체는 시간 또는 개수에 따라 수집한 항목들을 담은 배열을 방출하는 Observable을 생성하는 역할을 수행하겠죠?

 

buffer 연산자의 사용 상황

1. 일괄 처리 상황

일반적으로 데이터를 한 번에 많이 처리하는 것이 개별적으로 처리하는 것보다 효율적이다. 왜냐하면 매번 작업을 수행할 때마다 발생하는 오버헤드를 줄일 수 있기 때문이다. 즉, 많은 작업을 한 번에 수행하여, 발생하는 오버헤드를 한 번만 발생시키도록 하는 것이다. 이런 상황에서 buffer 연산자를 사용하면 데이터를 일괄적으로 처리할 수 있다.

 

예를 들어, 여러 개의 네트워크 요청을 수행하는 경우, 각 요청을 개별적으로 보낸다면, 각 요청에 대한 네트워크 연결 과정에서 오버헤드가 발생할 것이다. 이 때 buffer를 통해 하나의 배치로 묶어서 요청을 보낸다면 오버헤드를 크게 줄일 수 있다.

 

2. 빈번하게 생기는 이벤트 스트림 상황

buffer 연산자를 사용하면, 빈번한 이벤트를 그룹화하고 일정 시간 간격으로 이벤트 그룹을 처리하도록 Observable을 생성할 수 있다. 이를 통해 각 이벤트를 즉시 처리하는 대신, 일정 수준에 도달하면 처리를 시작하도록 한다.

 

예를 들어 사용자가 검색어를 입력할 때마다 서버에 검색 요청을 보내는 경우, 사용자가 빠르게 여러 입력을 시도하면, 각 요청에 대한 이벤트가 짧은 시간 내에 방대하게 생성되고, 이 많은 이벤트를 모두 즉시 처리하려는 것은 시스템의 리소스를 효율적 사용하지 못하는 것이다.

 

따라서 buffer 연산자를 사용하여 일정 시간이나, 일정 갯수에 따라 이벤트를 그룹화하고, 그룹화된 이벤트를 한 번에 처리하여 작업을 효율적으로 만들 수 있다.

 

 

3. 대량 데이터 수용

반대로 방대한 양의 데이터가 유입되는 상황에서 사용자가 수용할 수 있는 양의 데이터를 그룹화한다. 이를 통해 처리 가능한 속도로 데이터를 제공할 수 있고, 데이터의 손실을 방지할 수 있다.

 

 

 

5. Window

 

window는 Observable 방출하는 아이템을 일정한 그룹, 즉 'window' 로 나누어 처리하고 싶을 때 사용하는 연산자이다. 

그룹으로 나눈다는 점에서는 'buffer' 연산자와 비슷하지만, buffer는 해당 그룹의 데이터를 배열로 바로 전달하고, window는 그룹의 데이터를 하나의 새로운 Observable로 전달한다.

 

이렇게 만들어진 새로운 Observable은 원본 Observable에서 데이터가 발생할 때마다 데이터를 방출하며, 일정 조건을 만족하면 .completed 이벤트를 방출하고 종료된다. 

 

 

window의 사용 예시

 let intervalObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.instance)
        
let windowObservable = intervalObservable.window(timeSpan: .seconds(3), count: 10, scheduler: MainScheduler.instance)
        
windowObservable.subscribe(onNext: { observable in
    
    print("새로운 window 시작")
            
    observable.subscribe(onNext: { value in
          print(value)
    })  
})

/*
새로운 window 시작
0
1
2
새로운 window 시작
3
4
5
...
*/

 

window 연산자는 Observable이 방출하는 데이터를 일정 시간 동안의 그룹(window)으로 나눌 수 있다. 각 window는 새로운 Observable이며, 이 Observable은 window에서 설정된 동안에 원본 Observable이 방출하는 데이터를 방출한다.

 

위 코드에서는 window의 시간을 3초로 설정해두었기 때문에, 매 3초마다 새로운 window에 새로운 Observable이 생성되고, 이 Observable은 3초 동안 원본 Observable이 방출하는 데이터를 방출하게 된다.

 

window의 정의

 public func window(timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)
        -> Observable<Observable<Element>> {
            return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }

 

매개변수 'timeSpan'은 새로운 window Observable이 생성되는 시간 간격을 설정한다. 예를 들어, 3초로 설정되면, 매 3초마다 새로운 window Observable이 생성된다.

 

매개변수 'count'는 한 window Observable이 방출할 수 있는 최대 항목의 수를 설정한다. 예를 들어, count가 5로 설정되면, window Observable은 최대 5개의 항목만 방출할 수 있다.

 

함수의 반환 타입을 보면 Observable<Observable<Element>> 타입을 반환하는데, Observable<Element>를 방출하는 Observable이라는 것을 의미한다. 바디 부분에서는 'WindowTimeCount' 클래스의 인스턴스를 생성하고 있고, 이 객체에서 timeSpan이나 count 만큼의 항목을 방출할 때마다 새로운 window Observable이 생성되는 역할을 수행하겠죠?

 

window와 buffer 연산자의 차이

window와 buffer 연산자 모두 데이터를 그룹화하여 처리할 수 있는 면에서 유용한 작업을 많이 수행할 수 있다.

 

먼저 두 연산자의 차이점을 정리해보면,

buffer 연산자는 원본 Observable에서 데이터를 수집하고, 그룹화된 데이터의 배열을 새로운 아이템으로 방출하는 반면, window 연산자는 각 그룹을 별도의 Observable로 방출한다.

 

따라서 window 연산자는 그룹화된 데이터를 별도의 Observable로 만들기 때문에 각각의 스트림에 대해 독립적으로 처리할 수 있다. 예를 들어 복잡한 비동기 처리 상황에서 각 그룹화된 데이터 스트림을 독립적으로 변환하거나, 다른 스트림과 병합하는 등 세분화된 작업을 수행할 수 있기 때문에 buffer에 비해 섬세한 작업이 가능하다.

 

 

 

Reference

https://reactivex.io/documentation/operators/window.html

https://jusung.github.io/scan/