Dev/Reactive Programming

[Reactor] Cold Sequence와 Hot Sequence

김세진 2024. 7. 29. 09:34
반응형

 

 

 

 

 

개요

 

컴퓨터 시스템에서 사용되는 Cold와 Hot의 의미는 보통 Cold는 어떤 변경점 등을 적용할 때 시스템을 새로 시작(재기동)해야 하고, Hot은 새로 시작할 필요가 없는 것을 의미한다. Reactor에서의 Cold Sequence와 Hot Sequence의 Cold, Hot 역시 이와 비슷한 의미를 내포하고 있다.

 

설명에 앞서 Sequence의 의미를 되짚어보자면, 위 게시글에서 설명한 것과 같이  Publisher가 emit하는 데이터의 연속적인 흐름을 정의해 놓은 것을 의미하며, 코드로는 Operator 체인 형태로 정의된다.

 

 


Cold Sequence

 

Cold Sequence는 Subscriber가 구독할 때마다 데이터 흐름이 처음부터 다시 시작되는 Sequence이다. 이를 마블 다이어그램으로 표현하자면 다음과 같다.

 

 

출처: https://projectreactor.io/docs/core/release/reference/#reactor.hotCold

 

위 다이어그램을 보면 n 명의 구독자가 subscribe를 했을 때, 동일한 데이터를 Emit하는 과정부터 가공되어 반환되는 데이터까지 모두 동일한 것을 확인할 수 있다. 즉 최초의 데이터가 변경되지 않는 한, 구독 시점과 관계 없이 동일한 데이터가 보장될 것이다.

 

코드로 살펴보자면 다음과 같다.

 

public static void main(String[] args) throws InterruptedException {
    Flux<String> source = Flux.fromIterable(Arrays.asList("blue", "green", "orange", "purple"))
        .map(String::toUpperCase);

    source.subscribe(d -> System.out.println("Subscriber 1: "+d));
    Thread.sleep(2000L);
    System.out.println("------------------");
    source.subscribe(d -> System.out.println("Subscriber 2: "+d));
}

 

첫 번째 구독자가 subscribe 한 다음, 2초 뒤 두 번째 구독자가 subscribe를 해도 같은 결과를 반환받는 것을 볼 수 있다.

 

 


Hot Sequence

 

Cold Sequence와 반대로, Hot Sequence는 구독이 발생할 때 Publisher가 데이터를 처음부터 emit하지 않는다는 것을 의미한다.

 

즉, Publisher는 하나의 타임라인에 데이터를 Emit하며, 구독자는 구독한 시점 이후에 emit되는 데이터만 전달 받을 수 있다.

 

출처: https://projectreactor.io/docs/core/release/reference/#reactor.hotCold

 

위의 마블 다이어그램을 살펴 보면 맨 윗 줄은 데이터를 emit 하는 Publisher, 가운데와 아랫 줄은 각각 Subscriber1, 2에 해당한다. 처음부터 구독한 Subscriber1은 모든 데이터를 전달 받은 것에 반해 중간부터 구독한 Subscriber는 데이터를 두 개만 전달받은 것을 확인할 수 있다.

 

@Slf4j
public class Example7_2 {
	public static void main(String[] args) throws InterruptedException {
		String[] singers = {"Singer A", "Singer B", "Singer C", "Singer D", "Singer E"};
		log.info("# Begin concert:");
		Flux<String> concertFlux = Flux.fromArray(singers)
			.delayElements(Duration.ofSeconds(1)) // 각 데이터의 emit을 입력된 시간 만큼 지연한다.
			.share(); // Cold Sequence를 Hot Sequence로 변환하여 여러 Subscriber가 하나의 Flux를 공유하도록 함

		concertFlux.subscribe(
			singer -> log.info("# Subscriber1 is watching {}'s song", singer)
		);

		Thread.sleep(2500L);

		concertFlux.subscribe(
			singer -> log.info("# Subscriber2 is watching {}'s song", singer)
		);

		Thread.sleep(3000L);
	}
}

 

첫 번째 구독자가 구독하는 순간 데이터가 Emit 되기 시작하고, 이후에 중간부터 구독한 구독자는 이미 emit된 데이터는 전달받지 못하며 해당 시점부터 emit되는 데이터만 전달받는 모습을 볼 수 있다.

 

여기서 share() Operator는 Cold Sequence를 Hot Sequence로 변환하여 여러 구독자가 하나의 데이터 타임라인을 공유하도록 하는 역할을 한다. (Sequence에 cold, hot을 구분하는 타입이 명시적으로 있는 것은 아니고, 그러한 동작을 하게끔 한다는 추상적인 의미이다.)

 

또한 delayElements() Operator의 디폴트 스레드 스케줄러가 Parallel이기 때문에 위 로그에서 parallel 스레드로 실행된 것을 확인할 수 있다.

 

Http 요청 예제

@Slf4j
public class Example7_3 {
	public static void main(String[] args) throws InterruptedException {
		URI worldTimeUri = UriComponentsBuilder.newInstance().scheme("http")
			.host("worldtimeapi.org")
			.port(80)
			.path("/api/timezone/Asia/Seoul")
			.build()
			.encode()
			.toUri();

		Mono<String> mono = getWorldTime(worldTimeUri).cache();
		mono.subscribe(dateTime -> log.info("# datetime 1: {}", dateTime));
		Thread.sleep(2000);
		mono.subscribe(dateTime -> log.info("# datetime 2: {}", dateTime));

		Thread.sleep(2000);
	}

	private static Mono<String> getWorldTime(URI worldTimeUri) {
		return WebClient.create()
			.get()
			.uri(worldTimeUri)
			.retrieve()
			.bodyToMono(String.class)
			.map(response -> {
				DocumentContext jsonContext = JsonPath.parse(response);
				return jsonContext.read("$.datetime");
			});
	}
}

 

위는 현재 서울 시간을 불러오는 api를 호출하는 코드 조각이다.

 

여기서 12번 라인의 cache() Operator는 share()와 마찬가지로 Cold Sequence를 Hot Sequence로 변경하여 emit된 데이터를 캐시한 뒤, 구독이 발생할 때마다 캐시된 데이터를 전달한다. 즉, 이미 Emit된 데이터가 있을 경우 구독이 일어날 때 api를 다시 호출하지 않고 emit된 데이터를 전달한다.

 

cache() Operator의 대표적인 활용처는 REST API 요청을 위해 인증 토큰이 필요한 경우가 있다. 인증 토큰을 요청할 때마다 매번 새로운 인증 토큰을 요청하지 않고, 캐시된 인증 토큰을 사용하여 효율적인 동작 과정을 구성할 수 있다.

 

Reactor에서의 Hot

Reactor에서의 hot은 다음과 같이 두 종류가 있다.

  • Warm up: 최초 구독이 발생하기 전까지는 데이터의 emit이 발생하지 않음 (ex: share() 등의 Operator를 사용한 sequence)
  • Hot: 구독 여부와 상관 없이 데이터가 emit됨 (ex: DirectProcessor(), ReplayProcessor() 등)

 

 

 

반응형