sungwony

[RxJava] 리액티브 프로그래밍 소개 본문

language/RxJava

[RxJava] 리액티브 프로그래밍 소개

일상이상삼상 2021. 4. 20. 21:49

 

리액티브 프로그래밍이란?

여러 API를 취합해서 전달해야 하는 시스템에서는 각 API들의 경과 시간 전체 합산 시간만큼 필요하지만 
반대로 리액티브로 진행할 경우 여러 API 중 가장 긴 경과 시간이 전체 시간

- 데이터 흐름과 전달에 관한 프로그래밍 패러다임

- 데이터 스트림과 변경 사항에 대한 전파(Propagation)를 중심으로 하는 프로그래밍 패러다임

- 데이터가 통지될 때마다 관련 프로그램이 반응(Reaction)해 데이터를 처리하는 방식으로 데이터를 직접 가져와 처리(PULL)하는 것이 아니라 보내온 데이터(PUSH)를 받은 시점에 반응해 이를 처리하는 방식이다.

- 함수형 프로그래밍과 유사한 선언적 코드(declarative code)를 사용하여 비동기 처리 파이프라인을 생성할 수 있는 새로운 패러다임

 

명령형 프로그래밍 vs 리액티브 프로그래밍

명령형 프로그래밍 방식은 변경이 발생했다는 통지를 받아 합계를 새로 계산하는 PULL 방식

리액티브 프로그래밍 방식은 데이터 소스가 변경된 데이터를 밀어주는 PUSH 방식

 

1~5월의 합계를 계산하는 예시

변경 전 매출데이터
변경 후 매출데이터

매월 매출액은 리액티브 프로그래밍의 데이터 소스에 해당

 

명령형 프로그래밍 : 데이터를 다시 읽어와서 합계를 구한다

리액티브 프로그래밍 : 데이터 소스(B2)로 부터 변경된 데이터를 이벤트로 통지 받아 미리 정해놓은 수식을 통해 합계가 갱신

 

자바 언어와 리액티브 프로그래밍의 관계

1. 기존 pull 방식의 프로그래밍 개념을 push 방식의 프로그래밍 개념으로 바꾼다

 

2. 함수형 프로그래밍의 지원을 받는다

 

: RxJava 기반의 리액티브 프로그래밍을 위해서는 함수형 프로그래밍이 필요하다. 멀티 스레드 환경에서 콜백이나 옵저버 패턴은 같은 자원에 여러 스레드가 경쟁 조건(race condition)에 빠지는 부수 효과(side effect)를 일으킬 수 있다. 함수형 프로그래밍은 이런 부수 효과가 없으므로 자바 언어로 리액티브 프로그래밍을 하기 위해서는 함수형 프로그래밍의 지원이 필요하다.

 

RxJava란?

- ReactiveX(Reactive Extension)의 JVM 구현이다

- 자바에서 리액티브 프로그래밍을 구현하는 데 사용하는 라이브러리이다.

- 이벤트 처리와 같은 비동기 처리에 최적화 되었으며 2.0 버전부터 Reactive Streams 사양을 구현한다.

 

* Reactive Streams는 어떤 라이브러리나 프레임워크라도 데이터 스트림을 비동기로 처리하는 공통 메커니즘을 인터페이스로 제공한다. RxJava 2.0 부터는 Reactive Streams API를 의존하고 Reacvite Streams를 제외한 다른 라이브러리는 의존하지 않는다

 

RxJava를 만들게 된 이유

1. 동시성을 적극적으로 끌어안을 필요가 있다(Embrace Concurrency)

 

: 자바의 동시성(많은 쓰레드) 처리는 번거로움이 있다. 이를 해결하기 위해 넷플릭스는 클라이언트의 요청을 처리하는 서비스 계층(service layer)에서 동시성을 적극적으로 끌어안았다. 클라이언트의 요청을 처리할 때 다수의 비동기 실행 흐름(스레드 등)을 생성하고 그것의 결과를 취합하여 최종 리턴하는 방식으로 내부 로직을 변경했다.

 

2. 자바 Future를 조합하기 어렵다는 점을 해결해야 한다(Java Futures are Expensive to Compose)

 

: 자바 8에서 제공하는 CompletableFuture 같은 클래스가 제공되지 않았기 때문에 비동기 흐름을 조합할 방법이 거의 없었다. RxJava에서는 이를 해결하려고 비동기 흐름을 조합할 수 있는 방법을 제공한다. RxJava에서는 조합하는 실행 단위를 리액티브 연산자(Operators)라고 한다.

 

3. 콜백 방식의 문제점을 개선해야 한다(Callbacks Have Their Own Problems)

 

: 비동기 방식으로 동작하는 가장 대표적인 프로그래밍 패턴은 콜백이다. 콜백이 콜백을 부르는 콜백 지옥(Callback Hell) 상황이 코드의 가독성을 떨어뜨리고 문제 발생 시 디버깅을 어렵게 만들기 때문에 RxJava는 콜백을 사용하지 않는 방향으로 설계되어 이 문제를 개선했다.

step1(function (err, value1) {
    if (err) {
        console.log(err);
        return;
    }
    step2(function (err, value2) {
        if (err) {
            console.log(err);
            return;
        }
        step3(function (err, value3) {
            if (err) {
                console.log(err);
                return;
            }
            step4(function (err, value4) {
                // 정신 건강을 위해 생략
            });
        });
    });
});

 

Hello RxJava

import io.reactivex.Observable;

public class FirstExample {

	public void emit(){
		Observable.just("Hello", "RxJava 2 !!")
				  .subscribe(System.out::println);
	}

	public static void main(String[] args) {
		FirstExample firstExample = new FirstExample();
		firstExample.emit();
	}
}

io.reactivex

: RxJava2의 기본 패키지 명은 io.reactivex 이다. ReactiveX의 홈페이지 주소(http://reactivex.io)를 거꾸로 쓴 것과 같다.

 

Observable

: 데이터의 변화가 발생하는 데이터 소스(data source)이다.

 

just() 함수

: 가장 단순한 Observable 선언 방식이다. 위 예에서는 데이터 소스에서 'Hello'와 'RxJava 2 !!'를 발행했다. Integer와 같은 래퍼 타입부터 Order 같은 사용자 정의 클래스의 객체도 인자로 넣을 수 있다.

 

subscribe() 함수

: subscribe() 함수는 Observable을 구독한다. Observable은 subscribe() 함수를 호출해야 비로소 변화한 데이터를 구독자에게 발행한다(just() 함수만 호출하면 데이터가 발행되지 않는다). 이 부분은 옵저버 패턴과 동일하다고 생각하면 된다. 반드시 데이터를 수신할 구독자가 subscribe() 함수를 호출해야 Observable에서 데이터가 발행된다.

 

emit() 메서드

: 사용자가 만든 메서드로 어떤 것을 내보낸다 라는 의미. RxJava 개발 문서에서는 Observable이 subscribe() 함수를 호출한 구독자에게 데이터를 발행하는 것을 표현하는 용어로 사용한다.

 

RxJava를 어떻게 공부할 것인가?

 

1. Observable 클래스를 명확하게 이해하자. 특히 Hot ObservableCold Observable의 개념을 꼭 이해해야 한다.

 

Cold Observable

 

: Cold Observable은 옵저버블은 구독(subscribe)되기 전까지 동작하지 않는다. Cold Observable은 구독되기 이전에는 데이터 스트림을 방출(emit)하지 않으며 Cold Observable을 옵저버가 구독하면 처음부터 동작하기 시작한다. 따라서 옵저버는 옵저버블이 방출하는 모든 데이터 스트림을 빠짐없이 처음부터 받을 수 있다.

 

옵저버블을 구독하는 옵저버는 하나 이상일 수 있으며 Cold Observable을 구독하는 모든 옵저버들은 구독하는 시점과 상관없이 Cold Observable이 방출하는 모든 데이터를 처음부터 빠짐없이 모두 받을 수 있다. 이것은 Cold Observable을 구독하는 모든 옵저버는 자신만을 위해 독립적으로 실행하는 옵저버블을 갖게 된다고 볼 수 있다. 이러한 특징을 유니캐스트(unicast)라고 한다.

 

Hot Observable

 

: Hot Observable은 옵저버블을 생성하자마자 구독과 상관없이 바로 데이터 스트림을 방출(emit)하기 시작한다. 따라서 일정 시간이 경과한 시점에 옵저버블을 구독하면 데이터 스트림의 앞부분은 구독할 수 없고 중간 부분부터 구독하게 된다. Hot observable은 구독 시점부터 방출되는 데이터를 받는 것을 기본으로 한다.

 

2. 간단한 예제로 map(), filter(), reduce(), flatMap() 함수의 사용법을 익힌다.

3. 생성 연산자, 결합 연산자, 변환 연산자 등 카테고리별 주요 함수를 공부한다.

4. 스케줄러의 의미를 배우고 subscribeOn()observeOn() 함수의 차이를 알아 둔다.

5. 그 밖의 디버깅, 흐름 제어 함수를 익힌다.

 

마블 다이어그램

- 리액티브 프로그래밍을 통해 발생하는 비동기적인 데이터의 흐름을 시간의 흐름에 따라 시각적으로 표시한 다이어그램

- map(), flatMap() 함수 등 수많은 리액티브 연산자들을 이해하는 데 큰 도움을 준다.

- ReactiveX 홈페이지(http://reactivex.io)에서 제공

- reactivex.io/RxJava/2.x/javadoc/io/reactivex/Observable.htm

- rxmarbles.com/#from

 

Map 연산자의 마블 다이어그램

 

 

더 알아보기

 

Reactor

 

Pivotal의 오픈소스 프로젝트로, JVM 위에서 동작하는 논블러킹 애플리케이션을 만들기 위한 리액티브 라이브러리이다. RxJava2와 함께 Reactive Stream의 구현체이기도 하고, Spring Framework 5부터 리액티브 프로그래밍을 위해 지원되는 라이브러리이다. RxJava와 많은 공통점이 있지만 최소 Java8에서 동작하며 Java8의 피쳐를 잘 지원한다는 큰 차이점이 있다.

 

리액티브 프로그래밍의 단점

 

- 코드 가독성이 쓰레드 모델보다 읽기 힘들다