Dev/Reactive Programming

[Reactive Programming] Backpressure

김세진 2024. 8. 6. 15:32
반응형

 

 

 

 

개요

 

만약 Publisher가 많은 데이터를 Emit 하는데 Publisher의 처리 속도가 현저히 느려 데이터가 계속해서 쌓인다면 오버플로우가 발생할 수도 있을 것이다.

 

Backpressure는 우리말로 배압, 역압이라고 한다. 이름과 같이, Backpressure는 리액티브 프로그래밍에서 Publisher가 수많은 데이터를 Emit하는 상황에서 과부하가 걸리지 않도록 하는 역할을 한다.

 

 


Reactor에서의 Backpressure 처리 방식

 

Reactor에서의 Backpressure 처리 방식은 크게 두 가지가 있다.

 

1. 데이터 개수 제어

Subscriber가 적절히 처리할 수 있는 수준의 데이터 개수를 Publisher에게 요청하는 방식이다. Subscriber를 상속한 BaseSubscriber를 구현하여 다음과 같이 데이터 요청 개수를 적절하게 제어할 수 있다.

 

@Slf4j
public class Example {
	public static void main(String[] args) {
		Flux.range(1, 5)
			.doOnRequest(data -> log.info("# doOnRequest: {}", data))
			.subscribe(new BaseSubscriber<Integer>() {
				private int count = 0;

				@Override
				protected void hookOnSubscribe(Subscription subscription) {
					request(3); // 3개의 데이터 요청
				}

				@SneakyThrows
				@Override
				protected void hookOnNext(Integer value) {
					Thread.sleep(2000L); // 데이터 처리 시간
					log.info("# hookOnNext: {}", value);
					count++;
					if (count % 3 == 0) {
						request(3); // 다음 3개의 데이터 요청
					}
				}
			});
	}
}

 

위는 데이터 개수를 명시한 만큼 가져온 다음 처리하는 예제 코드이다. 코드에서는 구독이 시작될 때, 그리고 데이터를 처리했을 때의 hook을 제어하여 데이터를 3개씩 불러와 처리하도록 하고 있다. 주요 사항은 다음과 같다.

 

  • subscribe() Operator의 파라미터로 구독과 관련된 이벤트인 hookOnSubscribe와 hookOnNext를 재정의하기 위해 BaseSubscriber 객체 리턴
  •  hookOnSubscribe() 메서드는 Subscriber 인터페이스에 정의된 onSubscribe()를 대신해 request() 메서드를 통해 불러올 최초 데이터 개수를 제어한다.
  • hookOnNext() 메서드는 Subscriber 인터페이스에 정의된 onNext()를 대신해, 데이터가 처리된 다음 Publisher에게 데이터를 요청하는 역할을 한다. 마찬가지로 request() 메서드를 통해 불러올 데이터 개수를 제어한다.

 

실행 결과

 

 

위와 같이 요청 당 3개의 데이터를 받아 처리하는 것을 확인할 수 있다.

 

 

2. Backpressure 전략 사용

종류 설명
IGNORE 전략 Backpressure를 적용하지 않는다.
ERROR 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, Exception을 발생시키는 전략
DROP 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 먼저 emit된 데이터부터 Drop시키는 전략
LATEST 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 밖에서 대기하는 가장 최근에(나중에) emit된 데이터부터 버퍼에 채우는 전략
BUFFER 전략 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, 버퍼 안에 있는 데이터부터 Drop시키는 전략

 

IGNORE 전략

Backpressure를 적용하지 않는 전략이다. 이 경우, Downstream에서의 Backpressure 요청이 무시되기 때문에 결국 데이터의 버퍼 초과로 인해 IllegalStateException이 발생할 수 있다.

 

ERROR 전략

Downstream의 데이터 처리 속도가 느려 Upstream의 emit 속도를 따라가지 못할 경우 IllegalStateException을 발생시킨다. 이 경우 Publisher는 Error Signal을 Subscriber에게 전송하고 삭제한 데이터는 폐기한다.

 

Ignore 전략과 다른 점은, Ignore 전략은 버퍼 초과로 인해 IllegalStateException이 발생하지만, Error 전략은 Backpressure 신호를 감지하여 예외를 발생시킨다는 점이다.

 

@Slf4j
public class Example {
	public static void main(String[] args) throws InterruptedException {
		Flux
			.interval(Duration.ofMillis(1L)) // 0.001초마다 emit
			.onBackpressureError()
			.doOnNext(data -> log.info("# doOnNext: {}", data)) // Publisher 가 emit 하는 데이터 화인
			.publishOn(Schedulers.parallel()) // Reactor Sequence를 별도의 parallel 스레드에서 실행
			.subscribe(data -> {
					try {
						Thread.sleep(5L); // Subscriber 가 전달받은 데이터를 처리하는 시간
					} catch (InterruptedException e) {}
					log.info("# onNext: {}", data);
				},
				error -> log.error("# onError: ", error));
		Thread.sleep(2000L); // main 스레드를 종료하지 않기 위해 block 하는 역할
	}
}

 

위는 Error 전략 예제 코드이다.

 

  • 0.005초마다 처리되는 데이터를 0.001초마다 emit 한다.
  • 데이터를 emit하는 속도보다 처리되는 속도가 느리기 때문에 결국 버퍼가 가득 차게 되어 Backpressure 신호가 발생하게 된다. (참고로 publishOn() Operator에 별도의 사이즈를 지정하지 않으면 버퍼 사이즈는 256이 기본값으로 설정된다.)
  • Backpressure 전략이 Error이므로 IllegalStateExcepion이 발생한다.

 

실행 결과

 

결과를 보면 데이터를 emit 하는 doOnNext가 255까지 찍힌 다음 더 이상 찍히지 않다가, onNext가 255까지 찍힌 뒤 OverFlowException(IllegalStateException을 상속한 하위 클래스임)이 반환되는 것을 볼 수 있다. 이는 버퍼 사이즈를 초과하여 데이터가 더 이상 Emit 되지 않고 Subscriber에게 Backpressure 신호를 전달하여, 마지막 데이터를 처리한 뒤 위에서 설정한 전략에 따라 예외를 발생시키는 것을 의미한다.

 

DROP 전략

Drop 전략은 Publisher가 Downstream으로 전달할 데이터가 버퍼에 가득 찰 경우, emit된 데이터를 모두 폐기하는 전략이다.

 

책의 설명에서는 버퍼 밖에서 대기하는 데이터 Pool이 있는 것처럼 느껴졌으나, 다른 커뮤니티에서 저자의 부연 설명이나 다른 문서를 참고한 결과 버퍼에 여유 공간이 생겼다고 판단되기 전까지는 emit 되는 데이터를 모두 버린다는 게 맞는 표현인 것 같다.

 

@Slf4j
public class Example8_3 {
	public static void main(String[] args) throws InterruptedException {
		Flux
			.interval(Duration.ofMillis(1L))
			.onBackpressureDrop(dropped -> log.info("# dropped: {}", dropped))
			.publishOn(Schedulers.parallel())
			.subscribe(data -> {
				try {
					Thread.sleep(5L);
				} catch (InterruptedException e) {}
				log.info("# onNext: {}", data);
			},
				error -> log.error("# onError", error));
		Thread.sleep(2000L);
	}
}

 

onBackpressureDrop()은 Drop된 데이터를 파라미터로 전달받아 처리가 가능하다.

 

실행 결과

 

 

실행 결과를 살펴보면 onNext가 255까지 진행된 다음 1098부터 다시 시작되는 것을 확인할 수 있다. 버퍼가 255까지 가득 찬 다음 emit 된 데이터는 Drop 되다가, 여유 공간이 생긴 1098 데이터부터 다시 버퍼에 들어왔음을 뜻한다.

 

버퍼의 여유 공간은 내부 구현에 따라 다르겠지만, Reactor에서는 버퍼 크기의 75% 이하로 데이터가 있을 경우 여유가 있다고 판단한다. 실제로 아래 결과를 살펴보면 버퍼 크기 256의 75%인 192까지 처리한 다음, 이후 Emit되는 데이터를 다시 버퍼에 넣음을 알 수 있다.

 

 

 

LATEST 전략

Latest 전략은 기본적으로 Drop 전략과 크게 다르지 않다. 차이점은 아래와 같다.

 

  1. 버퍼에 여유 공간이 없어도 최신 데이터 한 개는 버리지 않고 보관한다. 이후 버퍼에 여유 공간이 생기면 해당 데이터를 먼저 넣는다.
  2. onBackpressureDrop()과 달리 onBackpressureLatest()는 drop 되는 데이터를 파라미터로 전달받지는 못한다.

 

@Slf4j
public class Example8_4 {
	public static void main(String[] args) throws InterruptedException {
		Flux
			.interval(Duration.ofMillis(1L))
			.onBackpressureLatest()
			.publishOn(Schedulers.parallel())
			.subscribe(data -> {
				try {
					Thread.sleep(5L);
				} catch (InterruptedException e) {}
				log.info("# onNext: {}", data);
			},
				error -> log.error("# onError", error));
		Thread.sleep(2000L);
	}
}

 

실행 결과

 

 

BUFFER 전략

Buffer 전략은 내부적으로 다음과 같이 세 가지가 존재한다.

 

  1. ERROR: 버퍼가 가득 찰 경우 IllegalStateException 발생
  2. DROP_LATEST: 버퍼가 가득 찼을 때 Emit 되는 데이터가 있다면, 해당 데이터를 Drop함
  3. DROP_OLDEST: 버퍼가 가득 찼을 때 Emit 되는 데이터가 있다면, 버퍼 내부에 가장 오래된 데이터를 제거하고 신규 데이터를 버퍼에 채움

 

BUFFER DROP_LATEST

@Slf4j
public class Example8_5 {
	public static void main(String[] args) throws InterruptedException {
		Flux
			.interval(Duration.ofMillis(300L))
			.doOnNext(data -> log.info("# emitted by original Flux: {}", data)) // emit 되는 원본 데이터
			.onBackpressureBuffer(2,
				dropped -> log.info("** Overflow & Dropped: {} **", dropped),
				BufferOverflowStrategy.DROP_LATEST)
			.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) // 버퍼에서 Downstream으로 emit 하는 데이터
			.publishOn(Schedulers.parallel(), false, 1) // Prefetch 파라미터는 한 번에 요청할 데이터 개수를 의미
			.subscribe(data -> {
					try {
						Thread.sleep(1000L);
					} catch (InterruptedException e) {}
					log.info("# onNext: {}", data);
				},
				error -> log.error("# onError", error));
		Thread.sleep(5000L);
	}
}

 

 

버퍼가 가득 찼을 때, Emit 되는 데이터가 Overflow를 발생시켜 Drop 처리되는 것을 확인할 수 있다.

 

BUFFER DROP_OLDEST

@Slf4j
public class Example8_6 {
	public static void main(String[] args) throws InterruptedException {
		Flux
			.interval(Duration.ofMillis(300L))
			.doOnNext(data -> log.info("# emitted by original Flux: {}", data)) // emit 되는 원본 데이터
			.onBackpressureBuffer(2,
				dropped -> log.info("** Overflow & Dropped: {} **", dropped),
				BufferOverflowStrategy.DROP_OLDEST)
			.doOnNext(data -> log.info("[ # emitted by Buffer: {} ]", data)) // 버퍼에서 Downstream으로 emit 하는 데이터
			.publishOn(Schedulers.parallel(), false, 1) // Prefetch 파라미터는 한 번에 요청할 데이터 개수를 의미
			.subscribe(data -> {
					try {
						Thread.sleep(1000L);
					} catch (InterruptedException e) {}
					log.info("# onNext: {}", data);
				},
				error -> log.error("# onError", error));
		Thread.sleep(5000L);
	}
}

 

버퍼 내부에 남아있는 데이터 중, 가장 오래된 것부터 Drop하는 것을 확인할 수 있다.

 

 

 

반응형