개요
위 리액티브 스트림즈를 설명한 게시글에서 Processor에 대해 언급한 적이 있다. Processor는 Publisher와 Subscriber의 기능을 모두 지니는 클래스인데, Reactor에서도 Processor 인터페이스의 구현 클래스인 FluxProcessor, MonoProcessor, EmitterProcessor 등을 지원한다.
하지만 Reactor 3.4.0 부터는 Processor의 기능을 개선한 Sinks가 지원되며, 3.5.0 부터는 Processor와 관련된 API가 제거되어 Sinks가 Processor를 완전히 대체하게 되었다.
Sinks의 주요 특징
1. Signal을 프로그래밍 방식으로 전송할 수 있는 구조이며, Flux 또는 Mono의 의미 체계를 가짐
Flux 혹은 Mono가 일반적으로 onNext같은 Signal을 내부적으로 전송하는 방식과 달리, Sinks는 프로그래밍 방식을 통해 명시적으로 Signal을 전송할 수 있다.
2. generate(), create() 등의 Operator와 달리 멀티스레드 환경에서 스레드 안전성을 보장받을 수 있음
기존에도 generate(), craete() 등의 Operator를 통해 명시적으로 Singal을 전송할 수 있었지만, 이 Operator들은 싱글 스레드 기반에서 Signal을 전송하기 때문에, 멀티스레드 환경에서 예기치 않은 동작으로 이어질 수 있다.
반면, Sinks는 멀티스레드 방식으로 Signal을 전송해도 스레드 안전성을 보장한다.
create() Operator 사용 예제
@Slf4j
public class Example9_1 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Flux
// 처리해야 할 작업의 개수만큼 doTask()를 호출하여 처리
.create((FluxSink<String> sink) -> {
IntStream
.range(1, tasks)
.forEach(n -> sink.next(doTask(n)));
})
.subscribeOn(Schedulers.boundedElastic())
.doOnNext(n -> log.info("# create(): {}", n))
// map으로 가공 처리 후 Subscriber에게 전달
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
// 최종적으로 전달 받은 데이터 출력
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking
// complete to task.
return "task " + taskNumber + " result";
}
}
doTask(), map(), subscribe에서의 처리 세 가지 작업은 각각 총 3개의 스레드에서 처리되는 것을 확인할 수 있다. 이렇게 create Operator를 사용하여 프로그래밍 방식으로 Signal을 전송할 수 있고, Reactor Sequence를 단계적으로 나누어서 여러 개의 스레드로 처리할 수 있다.
단, 멀티스레드 환경에서 doTask() 메서드가 여러 스레드에서 동시에 호출되면서 데이터를 발행할 때 경쟁 조건이 발생할 수 있다. 이러한 동시성 문제를 방지하고 안전하게 시그널을 발행하기 위해서는 Sinks를 사용하는 것이 더 적절하다.
Sinks 사용 예제
@Slf4j
public class Example9_2 {
public static void main(String[] args) throws InterruptedException {
int tasks = 6;
Sinks.Many<String> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<String> fluxView = unicastSink.asFlux();
IntStream
.range(1, tasks)
.forEach(n -> {
try {
// 순환할 때마다 새로운 스레드 생성
new Thread(() -> {
unicastSink.emitNext(doTask(n),
Sinks.EmitFailureHandler.FAIL_FAST);
log.info("# emitted: {}", n);
}).start();
Thread.sleep(100L);
} catch (InterruptedException e) {
log.error(e.getMessage());
}
});
fluxView
// map으로 가공 처리 후 Subscriber에게 전달
.publishOn(Schedulers.parallel())
.map(result -> result + " success!")
.doOnNext(n -> log.info("# map(): {}", n))
// 최종적으로 전달 받은 데이터 출력
.publishOn(Schedulers.parallel())
.subscribe(data -> log.info("# onNext: {}", data));
Thread.sleep(500L);
}
private static String doTask(int taskNumber) {
// now tasking
// complete to task.
return "task " + taskNumber + " result";
}
}
Sinks 예제의 경우 unicastSink.emitNext() 부분에서 doTask() 메서드의 작업 처리 결과를 Sinks를 통해서 Downstream에 emit 하는 모습을 볼 수 있다.
그리고 doTask() 메서드는 순환할 때마다 새로운 스레드에서 실행되고, 나머지 작업은 각각 Parallel-1, 2에서 실행되는 것을 확인할 수 있다.
Sinks 종류 및 특징
Sinks의 종류에는 크게 Sinks.One, Sinks.Many 두 가지가 존재한다.
Sinks.One
public final class Sinks {
...
public static <T> One<T> one() {
return SinksSpecs.DEFAULT_ROOT_SPEC.one();
}
...
}
Sinks.One은 위와 같이 Sinks.one() 메서드를 사용해서 한 건의 데이터를 프로그래밍 방식으로 Emit 하거나 Mono 방식으로 Subscriber가 소비할 수 있도록 정의된 Spec의 Sinks를 요청하는 역할을 한다.
사용 예제
@Slf4j
public class Example9_4 {
public static void main(String[] args) {
Sinks.One<String> sinkOne = Sinks.one();
Mono<String> mono = sinkOne.asMono();
sinkOne.emitValue("Hello Reactor", Sinks.EmitFailureHandler.FAIL_FAST);
sinkOne.emitValue("Hi Reactor", Sinks.EmitFailureHandler.FAIL_FAST);
mono.subscribe(data -> log.info("# Subscriber1 {}", data));
mono.subscribe(data -> log.info("# Subscriber2 {}", data));
}
}
FAIL_FAST는 EmitFailureHandler를 통해 emit 도중 에러가 발생했을 때, 재시도하지 않고 즉시 실패 처리를 하라는 의미이다. 이렇게 함으로써 스레드 간의 경합 등으로 발생하는 교착 상태 등을 방지하고 스레드 안전성을 보장할 수 있다.
두 번째 라인을 보면 "Hi Reactor" 라는 문자를 드랍했음을 알 수 있다. 그리고 Subscriber는 첫 번째 Emit된 데이터만 출력하고 있다. 이를 통해 처음 Emit한 데이터는 정상적으로 Emit 되지만, 나머지 데이터들은 Drop된다는 사실을 알 수 있다.
Sinks.Many
public final class Sinks {
...
public static ManySpec many() {
return SinksSpecs.DEFAULT_ROOT_SPEC.many();
}
...
}
Sinks.Many는 Sinks.One과 다르게 Sinks.Many가 아닌 ManySpec이라는 인터페이스를 리턴한다. 이 인터페이스 내부에 데이터 emit을 위한 여러 가지 Spec이 존재하는데, 다음과 같이 세 종류가 있다.
public final class Sinks {
...
public interface ManySpec {
UnicastSpec unicast();
MulticastSpec multicast();
MulticastReplaySpec replay();
}
...
}
1. Unicast
@Slf4j
public class Example9_8 {
public static void main(String[] args) {
Sinks.Many<Integer> unicastSink = Sinks.many().unicast().onBackpressureBuffer();
Flux<Integer> fluxView = unicastSink.asFlux();
unicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
unicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
unicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
위는 ManySpec의 unicast() 메서드를 활용하는 예제이다. unicast()는 UnicastSpec을 사용하여 단 하나의 Subscrbier에게만 데이터를 Emit한다. 따라서 위 결과에서는 두 번째 Subscriber가 구독을 시도할 때, IllegalStateException이 발생한 것을 확인할 수 있다.
2. Multicast
@Slf4j
public class Example9_9 {
public static void main(String[] args) {
Sinks.Many<Integer> multicastSink = Sinks.many().multicast().onBackpressureBuffer();
Flux<Integer> fluxView = multicastSink.asFlux();
multicastSink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
multicastSink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
multicastSink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
}
}
ManySpec의 multicast() 메서드를 호출하는 예제이다. 이름에서 알 수 있듯, 하나 이상의 Subscriber에게 데이터를 Emit할 수 있다.
결과를 보면 Subscriber2는 구독 이후 Emit된 데이터만 전달받았음을 알 수 있는데, 이는 Sinks가 Publisher의 역할을 할 경우 기본적으로 Hot Publisher로 동작하기 때문이다.
3. Replay
@Slf4j
public class Example9_10 {
public static void main(String[] args) {
Sinks.Many<Integer> replaySink = Sinks.many().replay().limit(2);
Flux<Integer> fluxView = replaySink.asFlux();
replaySink.emitNext(1, Sinks.EmitFailureHandler.FAIL_FAST);
replaySink.emitNext(2, Sinks.EmitFailureHandler.FAIL_FAST);
replaySink.emitNext(3, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber1: {}", data));
replaySink.emitNext(4, Sinks.EmitFailureHandler.FAIL_FAST);
fluxView.subscribe(data -> log.info("# Subscriber2: {}", data));
}
}
replay() 메서드를 호출하는 예제이다. replay() 메서드를 호출하면 MulticastReplaySpec을 리턴하여 해당 Spec에서 제공하는 기능을 사용할 수 있다. 이름과 같이, 이미 Emit되었던 데이터를 새로운 구독자도 Cold Sequence 처럼 다시 전달받을 수 있다.
예제의 경우 limit() 을 사용하여 마지막으로 emit된 데이터 2개만 전달하였고, 위와 같이 그 결과를 확인할 수 있다.
정리
- Sinks는 Publisher와 Subscriber의 기능을 모두 지닌 Processor의 향상된 기능을 제공한다.
- 데이터를 Emit하는 Sinks에는 크게 Sinks.One, Sinks.Many가 있다.
- Sinks.One은 한 건의 데이터를, Sinks.Many는 여러 건의 데이터를 프로그래밍 방식으로 Emit 한다.
- Sinks.Many에서 ManySpec을 구현한 세 가지 타입의 구현체의 기능을 사용할 수 있다.
- UnicastSpec: unicast()를 통해 단 하나의 Subscriber에게만 데이터를 Emit한다.
- MulticastSpec: multicast()를 통해 하나 이상의 Subscriber에게 데이터를 Emit한다.
- MulticastReplaySpec: replay()를 통해 emit된 데이터 중에서 특정 시점으로 되돌린 데이터부터 Emit한다.
'Dev > Reactive Programming' 카테고리의 다른 글
[Reactive Programming] Context (0) | 2024.09.12 |
---|---|
[Reactive Programming] Scheduler (0) | 2024.09.04 |
[Reactive Programming] Backpressure (0) | 2024.08.06 |
[Reactor] Cold Sequence와 Hot Sequence (0) | 2024.07.29 |
[Reactor] 마블 다이어그램(Marble Diagram) (0) | 2024.07.15 |