Dev/Java

[Reactive Programming] 리액티브 스트림즈(Reactive Streams)

김세진 2024. 6. 26. 19:40
반응형

 

 

 

 

리액티브 스트림즈(Reactive Streams)란?

 

리액티브 스트림즈란 데이터 스트림을 Non-Blocking이면서 비동기적으로 처리하기 위한 리액티브 라이브러리의 표준 사양이다.

 

그 구현체로는 RxJava, Reactor, Akka Streams, Java 9 Flow API 등이 있다.

 

 


리액티브 스트림즈 구성요소

 

리액티브 스트림즈를 통해 구현해야 되는 API 컴포넌트는 아래와 같이 Publisher, Subscriber, Subscription, Processor 가 있다.

 

컴포넌트 설명
Publisher 데이터를 생성하고 통지(발행, 게시, 방출)하는 역할을 한다.
Subscriber 구독한 Publisher로부터 통지(발행, 게시 방출)된 데이터를 전달받아서 처리하는 역할을 한다.
Subscription Publisher에 요청할 데이터의 개수를 지정하고, 데이터의 구독을 취소하는 역할을 한다.
Processor Publisher와 Subscriber의 기능을 모두 가지고 있다. 즉, Subscriber로서 다른 Publisher를 구독할 수 있고, Publisher로서 다른 Subscirber가 구독할 수 있다.

 

 

동작 과정

  1. Subscriber가 전달받을 데이터를 구독(Subscribe)
  2. Publisher가 데이터를 통지(발행, 게시, 방출)할 준비가 되었음을 Subscriber에게 알림(onSubscribe)
  3. 알림을 받은 Subscriber가 전달받기 원하는 데이터의 개수를 Publisher에게 요청(Subscription.request)
  4. Publisher가 요청받은 만큼의 데이터를 통지(onNext)
  5. 위 과정을 반복하다가 모든 데이터가 통지되면 Publisher는 데이터 전송의 완료를 알림(onComplete). 만약 데이터를 처리하는 과정에서 에러가 발생하면 Subscriber에게 에러가 발생한 사실을 알림(onError)

 

Subscriber가 데이터의 요청 개수를 지정하는 이유

Publisher와 Subscriber는 비동기적으로 동작하는데, Publisher가 데이터를 통지하는 속도보다 Subscriber가 처리하는 속도가 느리다면 처리를 기다리는 데이터가 쌓이게 된다. 즉, 병목 현상이 발생하여 시스템 부하를 커지게 하므로, 데이터 개수를 적절하게 지정하여 Subscriber가 감당할 수 있는 만큼의 데이터를 받아 처리할 수 있도록 해야 한다.

 

 


리액티브 스트림즈 컴포넌트 실제 코드

 

리액티브 스트림즈 컴포넌트가 포함된 실제 코드는 자바 9 버전 이상을 사용할 경우 Flow 클래스 내부에서도 확인할 수 있다.

 

Publisher

public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

subscribe 메서드만 구현하면 된다. 이 메서드는 전달받은 subscriber를 등록하는 역할을 한다.

 

Subscriber

public interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}
  • onSubscribe: 전달받은 subscription 객체를 통해 구독 시작 시점에 특정 작업을 처리하는 역할. Publisher에게 요청할 데이터의 개수를 지정하거나 특정 조건에 맞지 않는 경우 구독 요청을 취소하는 등의 작업을 처리
  • onNext: Publisher가 통지한 데이터를 처리하는 역할
  • onError: Publisher가 데이터 통지를 위한 처리 과정 중 에러가 발생했을 때 해당 에러를 처리하는 역할
  • onComplete: Publisher가 데이터 통지를 완료했음을 알릴 때 특정 작업을 처리하는 역할

 

Subscription

public interface Subscription {
    public void request(long n);
    public void cancel();
}
  • request: 파라미터 만큼의 데이터를 요청
  • cancel: 구독 취소(데이터 요청의 취소)

 

Processor

public interface Processor<T,R> extends Subscriber<T>, Publisher<R> {
}

Subscriber와 Publisher를 상속하여 둘의 기능을 모두 사용할 수 있는 인터페이스이다. 별도로 구현해야 하는 메서드가 없다.

 

 

구체적인 동작 과정

  1. Publisher가 Subscriber 인터페이스 구현 객체를 subscribe 메서드의 파라미터로 전달
  2. Publisher 내부에서 전달받은 Subscriber 구현 객체의 onSubscribe 메서드를 호출하면서 Subscriber의 구독을 의미하는 Subscription 구현 객체를 Subscriber에게 전달
  3. 호출된 Subscriber 인터페이스 구현 객체의 onSubscribe 메서드에서 전달 받은 Subscription 객체를 통해 전달받은 데이터의 개수를 Publisher에게 요청
  4. Publisher는 Subscriber로부터 전달받은 요청 개수만큼의 데이터를 onNext 메서드를 호출하여 Subscriber에게 전달
  5. Publisher는 더 이상 통지할 데이터가 없을 경우 onComplete 메서드를 호출해서 Subscriber에게 데이터 처리 종료를 알림

 


리액티브 스트림즈 관련 용어 정의

 

Signal

Publisher와 Subscriber간에 주고받는 상호작용을 의미한다. 두 인터페이스의 onSubscribe, onNext, onComplete, onError, request 또는 cancel 메서드 모두 Signal 이라고 볼 수 있다.

 

서로에게 보내는 Signal을 메서드가 정의되는 쪽이 아닌, 실제로 호출해서 사용하는 주체를 기준으로 정리하면 다음과 같다.

 

데이터 방향 메서드 종류
Publisher → Subscriber onSubscribe, onNext, onComplete, onError
Subscriber → Publisher  request, cancel

 

Demand

사전적 정의로 수요, 요구 등을 의미한다. Subscriber가 Publisher에게 요청하였으나, 아직 전달받지 않은 데이터들을 의미한다.

 

Emit

사전적 정의로 통지, 발행, 게시, 방출 등을 의미한다. Publisher가 Subscriber에게 데이터를 전달하는 것을 의미한다.

 

Upstream/Downstream

데이터가 흐르는 방향을 의미한다. 아래 코드를 살펴보자.

 

public class Example2_5 {
	public static void main(String[] args) {
		Flux
			.just(1, 2, 3, 4, 5, 6)
			.filter(n -> n % 2 == 0)
			.map(n -> n * 2)
			.subscribe(System.out::println);
	}
}


메서드 체인으로 이루어진 함수형 프로그래밍 코드이다. 각 메서드의 반환 타입이 모두 Flux 타입의 객체이므로 가능한 기법이다.

 

just에서 filter를 바라볼 때, just의 입장에선 filter 에서 반환하는 Flux가 본인보다 하위에 있으므로 Downstream이다. 반대로 filter에서 just를 바라볼 때 just에서 반환되는 Flux가 본인보다 상위에 있으므로 Upstream이다.

 

Sequence

Publisher가 emit하는 데이터의 연속적인 흐름을 정의해 놓은 것을 의미하며, Operator 체인 형태로 정의된다. 위 코드에서 just → filter → map 같은 데이터 변환 과정 자체를 Sequence라 볼 수 있다.

 

Operator

연산자를 의미한다. 앞에서 살펴본 just, filter, map 같은 메서드들을 Operator라고 볼 수 있다. 

 

Source

보통 최초에 생성된 데이터를 의미하며, 비슷한 의미로 Original이라는 용어도 사용된다.

 

 


리액티브 스트림즈의 구현 규칙

 

Publisher 구현을 위한 규칙

번호 규칙
1 Publisher가 Subscriber에게 보내는 onNext signal의 총 개수는 항상 해당 Subscriber의 구독을 통해 요청된 데이터의 총 개수보다 더 작거나 같아야 한다.
2 Publisher는 요청된 것보다 적은 수의 onNext signal을 보내고 onComplete 또는 onError를 호출하여 구독을 종료할 수 있다.
3 Publisher의 데이터 처리가 실패하면 onError signal을 보내야 한다. (Subscriber가 에러를 처리할 수 있도록)
4 Publisher의 데이터 처리가 성공적으로 종료되면 onComplete signal을 보내야 한다. (Subscriber가 후처리를 할 수 있도록)
5 Publisher가 Subscriber에게 onError 또는 onComplete signal을 보내는 경우 해당 Subscriber의 구독은 취소된 것으로 간주되어야 한다.
6 일단 종료 상태 signal을 받으면(onError, onComplete) 더 이상 signal이 발생되지 않아야 한다.
7 구독이 취소되면 Subscriber는 결국 signal을 받는 것을 중지해야 한다.

 

Subscriber 구현을 위한 규칙

번호 규칙
1 Subscriber는 Publisher로부터 onNext signal을 수신하기 위해 Subscription.request(n)를 통해 Demand signal을 Publisher에게 보내야 한다.
2 Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 Subscription 또는 Publisher의 메서드를 호출해서는 안 된다. (순환 및 경쟁 조건(Race Condition) 방지)
3 Subscriber.onComplete() 및 Subscriber.onError(Throwable t)는 signal을 수신한 후 구독이 취소된 것으로 간주해야 한다.
4 구독이 더 이상 필요하지 않은 경우 Subscriber는 Subscription.cancel()을 호출해야 한다. (리소스 반환)
5 Subscriber.onSubscribe()는 지정된 Subscriber에 대해 최대 한 번만 호출되어야 한다.

 

Subscription 구현을 위한 규칙

번호 규칙
1 구독은 Subscriber가 onNext 또는 onSubscribe 내에서 동기적으로 Subscription.request를 호출하도록 허용해야 한다. (request와 onNext 사이의 순환 참조를 방지)
2 구독이 취소된 후 추가적으로 호출되는 Subscription.request(long n)는 효력이 없어야 한다.
3 구독이 취소된 후 추가적으로 호출되는 Subscription.cancel()은 효력이 없어야 한다.
4 구독이 취소되지 않은 동안 Subscription.request(long n)의 매개변수가 0보다 작거나 같으면 java.lang.IllegalArgumentException과 함께 onError signal을 보내야 한다.
5 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher가 Subscriber에게 보내는 signal을 결국 중지하도록 요청해야 한다.
6 구독이 취소되지 않은 동안 Subscription.cancel()은 Publisher에게 해당 구독자에 대한 참조를 결국 삭제하도록 요청해야 한다.
7 Subscription.cancel(), Subscription.request() 호출에 대한 응답으로 예외를 던지는 것은 허용하지 않는다. (Return normally)
8 구독은 무제한 수의 request 호출을 지원해야 하고 최대 2^63 -1 개의 Demand를 지원해야 한다. (무한 스트림)

 

 


리액티브 스트림즈 구현체

 

  • RxJava
  • Project Reactor
  • Akka Streams
  • Java Flow API (Java 9부터)
  • 그 외 (RxAndroid, RxJS 등)

 

 

 

 

 

반응형