sw engineering/RxJava

RxJava 소개

monotics 2021. 2. 19. 23:17

RxJava는 최근에 안드로이드 개발 쪽에서 많이 사용되고 있다. 오픈소스 등을 분석할 때 RxJava에 대해서 미리 파악하고 있으면 유용하다. RxJava를 익혀두면 최신 프로그래밍 패러다임 중의 하나인 Reactive Programming을 이해하는데 도움이 된다. 또한 개발에 RxJava를 적극적으로 활용하면 프로그래밍의 능률도 올 릴 수 있다. 그럼 우선 RxJava에서 사용하는 몇몇 개념들에 대해 살표 보자.

ReactiveX logo


RxJava

우리의 주변 혹은 내부 환경은 항상 변하는데 우리는 그것들에 늘 반응하면서 살아간다. 거리를 걸을 때도 의식하고 있지는 않지만 앞에서 다가오는 사람을 피하면서 자연스럽게 걸어가고 배가 고프면 음식을 먹는다. 배고픔, 앞에서 다가오는 사람, 가려움 등은 갑자기 나타난 데이터에 해당하며 이것들에 우리 몸은 반응을 한다. 이 메커니즘을 프로그래밍으로 옮기면 Reactive Programming이 된다. Reactive Programming을 요약하면 "데이터가 발행되어 통지될 때마다 이에 반응(Reactive)하여 받은 데이터를 처리하는 프로그래밍 방식"이 된다. RxJava는 이 Reactive Programming의 구현을 위해 Observable과 함수형 스타일의 Operator를 사용하여 비동기이벤트 기반의 코드를 작성하도록 도와주는 라이브러리이다.

RxKotlin

코틀린은 Java와 서로 상호운용성(interoperability)을 가진다. 즉 이미 많은 부분이 자바로 구현되어있다. 그러므로 코틀린용 RxJava를 완전히 새로 작성하는 것은 시간낭비다. RxKotlin은 기본 RxJava에 다양한 유틸리티와 확장 메서드를 추가하여 RxJava를 확장한 라이브러리이다. 그래서 코틀린에서 훨씬 더 편리하게 RxJava를 사용할 수 있다. 이러한 확장 메서드에는 toObservable(), toFlowable(), subscribeBy() 등이 있다. RxKotlin에 대한 자세한 내용은 여기에서 확인할 수 있다.

RxAndroid

RxAndroid는 안드로이드의 루퍼와 RxJava의 스케줄러를 연결하는 브리지 역할을 한다. 또한 RxAndroid는 RxJava에 대해 이미 의존성을 가지고 있기 때문에 RxJava를 의존성에 추가할 필요는 없지만, 최신 버전의 RxJava를 사용하려면 RxJava를 명시적으로 의존성에 넣는 것이 좋다. RxAndroid에 대해서는 여기에서 자세히 확인할 수 있다.

RxBinding

안드로이드에서는 클릭 이벤트 등과 같이 리스너를 통한 콜백(callback) 스타일을 많이 사용하는데, RxBinding은 이 같은 콜백 스타일을 Observable로 바꿔주는 유틸리티 메서드들을 제공한다. RxBinding은 안드로이드 개발 쪽에서 유명한 JakeWharton이 만들었으며, 여기에서 자세히 확인할 수 있다.

 

dependencies

위에서 설명된 RxJava 관련 라이브러리들을 의존성에 추가하면 아래와 같다.

implementation "io.reactivex.rxjava2:rxjava:2.2.21"
implementation "io.reactivex.rxjava2:rxkotlin:2.4.0"
implementation "io.reactivex.rxjava2:rxandroid:2.1.1"
implementation "com.jakewharton.rxbinding4:rxbinding:4.0.0"

RxJava vs Coroutine

안드로이드에서도 비동기 프로그래밍을 위해 최근에 코루틴(coroutine)을 소개하였으며 구글에서도 밀고 있다. 코루틴은 쓰레딩에 대해 가볍게 접근할 수 있으며 비동기 코드를 동기 방식으로 작성할 수 있도록 해준다. 그에 비해 RxJava는 반응형 애플리케이션을 이벤트 기반으로 작성할 수 있도록 해준다. 둘 다 메인 스레드에서 비동기 작업의 수행이 가능하지만 상황에 맞게 사용한다면 유용한 도구들이 된다. 기존 AsyncTask를 쉽게 대체할 목적이라면 코루틴을 사용하면 되고 반응형 이벤트 기반 아키텍처를 맛보고 싶다면 RxJava를 사용하면 된다.

 

Reactive Stream

Reactive Stream은 데이터의 흐름(stream)을 비동기적으로 다루는 메커니즘을 말하며 하나의 표준 상양이라고 보면 된다. RxJava 2.x도 이 표준을 따르는 라이브러리들 중 하나이다. Reactive Stream은 생산자(Publisher)와 소비자(Subscriber)로 구성되며 아래와 같은 규칙을 가진다. (자세한 규칙은 여기(한글)에 있다.)

  - 구독 시작 통지(onSubscribe)는 해당 구독에서 한 번만 발생한다.
  - 통지는 순차적으로 이루어진다.
  - null을 통지하지 않는다.
  - 생산자는 onComplete 또는 onError를 통해 발행 종료를 처리한다.

 

Reactive Stream의 인터페이스

인터페이스 설명
Publisher 데이터를 생산하고 통지
Subscriber 통지된 데이터를 받아 처리
Subscription 원하는 데이터의 개수를 요청하고 구독을 해지
Processor Publisher와 Subscriber의 기능을 모두 가짐

Reactive Stream이 제공하는 API

API 설명
onSubscribe 데이터를 통지할 준비가 되었다고 알림
onNext 데이터 통지
onError 에러 통지
onComplete 완료 통지

 

Flowable에서 데이터 흐름

Flowable

RxJava 기본 구조

Observable과 Observer

기본적인 메커니즘은 Flowable과 Subscriber 구성과 거의 비슷하지만 아래와 같이 몇 가지 다른 점도 있다.

  - 통지되는 데이터의 개수를 제어하는 배압 기능이 없다. 그래서 배압 기능을 사용하려면 Flowable을 사용해야 한다.

  - 데이터의 개수를 요청하지 않는다.

  - Subscription을 사용하지 않고 Disposable 인터페이스를 사용한다. Disposable에는 구독을 해지할 수 있는 메서드(dispose())가 있다.

  - 구독을 시작하는 시점에 Observer의 onSubscribe()이 호출되는데 이때 Disposable이 인자로 전달된다. 그래서 전달받은 Disposable의 dispose()를 사용하면 구독을 해지할 수 있다.

 

Flowable/Observable과 Subscriber/Observer의 구분

구분 생산자 소비자
Reactive Stream 지원 Flowable Subscriber
Reactive Stream 미지원 Observable Observer

 

Disposable의 메서드

메서드 설명
dispose() 구독을 해지한다.
isDisposed() 구독이 해지되었는지 확인하기 위해 사용한다.
 - 해지 되었으면 : true 반환
 - 해지 되지 않았다면 : false 반환

함수형 인터페이스(Functional Interface)

RxJava가 함수형 프로그래밍이라고 보기는 어렵지만 함수형 프로그래밍의 영향을 받아 함수형 인터페이스를 많이 사용한다. io.reactive.functions package에 정의되어있다. 다양한 함수형 인터페이스는 여기에서 자세히 확인할 수 있다.

예) map(Func1<? super T,? extends R> func)

 

Method chain

함수형 프로그래밍의 영향

 - 함수형 인터페이스

 - 부가 작용을 피해야 함

 - 부가 작용이 발생하는 처리는 소비자 측에서

예) 1 ~ 9까지 숫자 발행(just) -> 짝수 데이터만 걸러냄(filter) -> 데이터에 100을 곱함(map) -> 구독된 데이터 출력(onNext)

fun main(argc: Array<String>) {
    val flowable: Flowable<Int> =
        // 인자의 데이터를 순서대로 통지하는 Flowable을 생성한다.
        Flowable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .filter { data -> data % 2 == 0 }// 짝수에 해당하는 데이터만 통지한다.
            .map { data -> data * 100 }// 데이터를 100배로 변환한다

    // 구독하고 받은 데이터를 출력한다
    flowable.subscribe { data -> println("data=$data") }
}

==결과==
data=200
data=400
data=600
data=800
data=1000

 

RxJava를 이해하는 도움이 되는 Designe Pattern

  • Observer Pattern

Observer Pattern

  • Iterator Pattern

Iterator Pattern

Marble Diagram

RxJava에서는 데이터의 발행과 구독을 데이터의 흐름과 함께 표현하기 위해 Marble Diagram을 사용한다. 데이터들은 도형과 색으로 구별되게 표현하며 데이터의 통지 순서는 화살표를 사용한다. 또한 완료는(Complete)는 '|'로 에러(Error)는 'X'로 표현한다. Marble Diagram을 사용하면 다양한 Operator들을 한눈에 쉽게 이해하는데 도움이 된다.

 

참고도서

Reactive Programming with Kotlin

RxJava 리액티브 프로그래밍