Dev/Reactive Programming

[Reactive Programming] Scheduler

김세진 2024. 9. 4. 17:23
반응형

 

 

 

 

 

스레드(Thread)의 개념 이해

 

Reactor에서 사용되는 Scheduler는 Reactor Sequence에서 사용되는 스레드를 관리해 주는 관리자 역할을 한다. 이 Scherduler에 대한 설명에 앞서, 스레드에 대한 개념을 간단히 살펴보자면 다음과 같다.

 

코어(Core)

실제 CPU의 코어를 지칭한다. 일반적으로 이 코어의 개수가 많으면 더 많은 수의 명령을 병렬로 빠르게 처리할 수 있다. 

 

물리적 스레드(Physical Thread) / 논리적 코어(Logical Core)

출처: https://www.linkedin.com/pulse/understanding-physical-logical-cpus-akshay-deshpande/

 

과거에는 한 코어 당 하나의 물리적 스레드가 존재하여 한 코어가 한 번에 하나의 스레드를 처리할 수 있었다. 인텔에서는 코어 하나로 두 스레드를 처리할 수 있는 기술인 하이퍼스레딩 개념을 도입하여 하나의 코어로 두 가지 스레드를 병렬 처리할 수 있도록 하였다.

 

즉, 코어 하나에 물리적 스레드를 두 개 할당하여, 실제 물리적인 CPU 코어는 하나지만 논리적으로 코어가 두 개 존재하는 것과 마찬가지이게 되는 것이다. 컴퓨터 성능을 얘기할 때 말하는 듀얼코어 4스레드, 쿼드코어 8스레드 등과 같은 용어에 이 같은 개념이 포함된 것이다.

 

위와 같은 내용으로 미루어 보아, 물리적 스레드와 논리적 코어 두 개념은 거의 같은 의미로 봐도 무방한 것 같다. 

 

논리적 스레드(Logical Thread)

출처: https://flylib.com/books/en/3.482.1.36/1/

소프트웨어적으로 생성되는 스레드를 의미하며, 흔히 프로그램이라고 부르는 프로세스 내에서 실행되는 세부 작업 단위를 일컫는다. 

 

논리적 스레드는 메모리가 허용하는 한 얼마든지 만들 수 있지만, 물리적 스레드의 가용 범위 내에서만 실행될 수 있다.

 

병렬성(Parallelism)과 동시성(Concurrency)

물리적 스레드는 병렬성으로 설명되는데, 실제로 물리적 스레드가 동시에 실행되어 여러 작업을 병렬 처리하기 때문이다.

 

반면 논리적 스레드는 동시성으로 설명되는데, 동시성이라는 것은 실제로 동시에 실행된다는 의미가 아니라 동시에 실행되는 것처럼 보이는 것을 의미한다. 멀티스레딩(Multithreading) 덕분에 이같은 일이 가능한데, 여러 논리적인 스레드가 아주 짧은 시간 동안 물리적 스레드를 번갈아 사용하여 처리하기 때문에 사람의 눈에는 동시에 처리되는 것처럼 보이는 것이다.

 

 


Scheduler란? / Scherduler를 위한 전용 Operator

 

Reactor에서의 Scherduler는 비동기 프로그래밍을 위해 사용되는 스레드를 관리해주는 역할을 한다.

 

개발자가 경쟁 조건(Race condition) 등을 고려하여 직접 멀티스레딩을 제어하는 것은 쉽지 않은 일인데, Reactor에서는 이 Scheduler를 통해 코드를 간결하게 하고, 대신 제어하여 개발자의 부담을 덜어주도록 했다.

 

 

이 Scherduler의 제어를 위한 전용 Operator로 subscribeOn(), publishOn(), parallel()을 사용할 수 있다.

 

subscribeOn()

subscribeOn() Operator는 구독이 발생한 직후 Publisher에서 실행될 스레드를 지정하는 Operator이다. 

 

구독이 발생할 경우 최초로 데이터를 Emit하는 Publisher의 동작을 수행하기 위한 스레드를 지정한다고 볼 수 있다. Operator 체인상에서 어느 위치에 있든 동일한 동작을 하며, publishOn() 등을 통해 다른 Scherduler를 지정하지 않는다면 subscribeOn()에서 지정한 것과 동일한 스레드로 나머지 작업이 수행된다.

 

@Slf4j
public class Example10_1 {
	public static void main(String[] args) throws InterruptedException {
		Flux.fromArray(new Integer[] {1, 3, 5, 7})
			.subscribeOn(Schedulers.boundedElastic())
			.doOnNext(data -> log.info("# doOnNext: {}", data))
			.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(500L);
	}
}

실행 결과

  • 예제 코드의 최초 실행 스레드는 main 스레드이므로, 구독이 발생한 시점에 동작하는 doOnSubscribe()는 main 스레드에서 실행된 것을 확인할 수 있다.
  • doOnNext() Operator로 emit되는 데이터를 로그로 출력하는데, subscribeOn()에서 지정한 대로 boundedElastic 스레드에서 실행되는 것을 확인할 수 있다.
  • subscribe() 이후 다른 Scheduler가 지정되지 않았으므로, subscribe에서 데이터를 전달받아 처리하는 onNext 로그도 boundedElastic 스레드에서 실행되었다.

 

publishOn()

publishOn() Operator는 코드상에서 publishOn() 아래에 위치한 downstream의 실행 스레드를 변경하는 역할을 한다. subscribeOn()과 달리 두 개 이상 사용할 수 있어, 실행 스레드를 목적에 맞게 적절하게 분리할 수 있다.

 

subscribeOn()과 마찬가지로, 이후에 publishOn()으로 다른 Scheduler를 지정하지 않을 경우, 해당 구문에서 지정된 것과 동일한 스레드로 downstream의 작업이 수행된다.

 

@Slf4j
public class Example10_2 {
	public static void main(String[] args) throws InterruptedException {
		Flux.fromArray(new Integer[] {1, 3, 5, 7})
			.doOnNext(data -> log.info("# doOnNext: {}", data))
			.doOnSubscribe(subscription -> log.info("# doOnSubscribe"))
			.publishOn(Schedulers.parallel())
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(500L);
	}
}

실행 결과

  • 최초 실행 스레드는 main 스레드이고, subscribeOn()을 사용하지 않았으므로, 데이터를 Emit하는 로그를 출력하는 부분인 doOnNext 동작 또한 main 스레드에서 실행되었음을 확인할 수 있다.
  • publishOn()을 기준으로 Downstream인 subscribe()의 동작이 publishOn()에서 지정된 대로 parallel 스레드에서 실행된 것을 확인할 수 있다.

 

다음은 publishOn()을 두 개 사용하여, 각 목적에 맞게 스레드를 분리한 예제이다.

 

@Slf4j
public class Example10_7 {
	public static void main(String[] args) throws InterruptedException {
		Flux.fromArray(new Integer[] {1, 3, 5, 7})
			.doOnNext(data -> log.info("# doOnNext fromArray: {}", data))
			.publishOn(Schedulers.parallel())
			.filter(data -> data > 3)
			.doOnNext(data -> log.info("# doOnNext filter: {}", data))
			.publishOn(Schedulers.parallel())
			.map(data -> data * 10)
			.doOnNext(data -> log.info("# doOnNext map: {}", data))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(500L);
	}
}

실행 결과

  • filter와 map을 기준으로 각각 publishOn()에서 Scheduler를 지정하여, 각 동작에 따라 다른 스레드가 사용된 모습을 볼 수 있다.
  • subscribe() 구문에서 로그로 출력하는 onNext의 경우, 별도로 지정한 Scheduler가 없으므로 map에서 사용한 스레드를 동일하게 사용한 것을 확인할 수 있다.

 

parallel()

subscribeOn()과 publishOn() Operator는 작업을 어떤 스레드에서 실행할지 논리적인 스레드를 지정하는 역할을 한다. 반면 parallel() Operator는 스레드의 실제 병렬 처리가 가능하도록 물리적 스레드를 할당하는 역할을 한다.

 

이 parallel()은 기본적으로 라운드 로빈(Round Robin) 방식으로 CPU 코어 개수만큼의 스레드를 병렬로 동작하게 하는데, 할당할 스레드 개수를 직접 파라미터로 넘겨 제한할 수도 있다.

 

단, parallel()은 Emit된 데이터를 물리적 스레드에 분배하는 역할만 하기 때문에, 실제 병렬 처리를 수행하기 위해 runOn(), publishOn() 등을 통해 병렬 처리에 적합한 Scheduler를 지정해야 정상적으로 병렬 처리가 동작한다.

 

@Slf4j
public class Example10_3 {
	public static void main(String[] args) throws InterruptedException {
		Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
			.parallel()
			.runOn(Schedulers.parallel())
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(500L);
	}
}

실행 결과

필자의 컴퓨터 CPU 사양은 8코어16스레드이다. 따라서 parallel()은 자동으로 CPU 사양만큼 emit된 데이터를 각 물리적 스레드에 할당할 수 있고, 실행 결과에서는 총 10개의 데이터를 10개의 스레드에 할당하여 병렬 처리하는 것을 확인할 수 있다.

 

만약, 작업을 처리하기 위해 물리적 스레드를 전부 사용할 필요가 없다면, 위에서 언급한 것과 같이 사용하고자 하는 물리적 스레드의 개수를 명시적으로 파라미터에 담아 전달할 수도 있다. 다음은 물리적 스레드를 4개까지만 활용하는 코드이다.

 

@Slf4j
public class Example10_4 {
	public static void main(String[] args) throws InterruptedException {
		Flux.fromArray(new Integer[] {1, 3, 5, 7, 9, 11, 13, 15, 17, 19})
			.parallel(4)
			.runOn(Schedulers.parallel())
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(500L);
	}
}

실행 결과

이전과 달리, 총 4개의 스레드를 사용하여 작업을 처리하는 것을 확인할 수 있다.

 

 


Scheduler의 종류

 

Scheduler.immediate()

별도의 스레드를 생성하지 않고, 현재 호출된 스레드에서 작업을 즉시 처리하고자 할 때 사용한다.

 

사실 해당 구문을 호출하지 않아도 Scheduler를 별도로 지정하지 않는다면 downstream의 작업은 현재 호출된 스레드에서 처리가 되지만, API 호출에서 Scheduler를 파라미터로 전달해야 할 때, 스레드 전환 없이 즉시 실행하고 싶은 경우 해당 구문을 사용하여 해결할 수 있다.

 

Scheduler.single()

스레드를 하나만 생성하여 Scheduler가 제거되기 전까지 재사용하는 방식이다.

 

@Slf4j
public class Example10_10 {
	public static void main(String[] args) throws InterruptedException {
		doTask("task1")
			.subscribe(data -> log.info("# onNext: {}", data));

		doTask("task2")
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(200L);
	}

	private static Flux<Integer> doTask(String taskName) {
		return Flux.fromArray(new Integer[] {1, 3, 5, 7})
			.publishOn(Schedulers.single())
			.filter(data -> data > 3)
			.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
			.map(data -> data * 10)
			.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
	}
}

실행 결과

doTask()를 두 번 호출하더라도 첫 번째 호출에서 이미 생성된 스레드를 재사용하는 모습을 볼 수 있다.

 

Scheduler.newSingle()

Scheduler.single()과 달리, 호출할 때마다 매번 새로운 스레드를 하나 생성하여 사용하는 방식이다.

 

@Slf4j
public class Example10_11 {
	public static void main(String[] args) throws InterruptedException {
		doTask("task1")
			.subscribe(data -> log.info("# onNext: {}", data));

		doTask("task2")
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(200L);
	}

	private static Flux<Integer> doTask(String taskName) {
		return Flux.fromArray(new Integer[] {1, 3, 5, 7})
			.publishOn(Schedulers.newSingle("new-single", true)) // thread이름, daemon 스레드 여부
			.filter(data -> data > 3)
			.doOnNext(data -> log.info("# {} doOnNext filter: {}", taskName, data))
			.map(data -> data * 10)
			.doOnNext(data -> log.info("# {} doOnNext map: {}", taskName, data));
	}
}

실행 결과

doTask()가 호출될 때, 새로운 스레드를 생성하여 할당함을 확인할 수 있다.

 

* Daemon 스레드: 주 스레드가 종료될 때, 자동으로 종료되는 스레드. 보조 스레드라고도 함

 

Schedulers.boundedElastic()

ExecutorService 기반의 스레드 풀(Thread Pool)을 생성하여 작업을 처리하는 방식이다.

 

기본적으로 CPU * 10만큼의 스레드를 생성하고, 모든 스레드가 사용중이라면 이용 가능한 스레드가 생길 때까지 최대 10만개의 작업이 큐에서 대기할 수 있다.

 

CPU 대비 유휴 스레드 수가 많고 유연한 특성이 있어 대기 시간이 긴 Blocking I/O 작업을 처리하는 데 효과적이다. 실행 시간이 긴 Blocking I/O 작업을 전용 스레드에서 처리하여 비동기 작업과 병행하면서도 Non-Blocking 처리에 영향을 주지 않도록 한다.

 

Schedulers.parallel()

boundedElastic과 달리 Non-Blocking I/O에 최적화되어 있는 Scheduler이다.

 

기본적으로 CPU 수 만큼의 고정 크기 스레드 풀을 사용한다. 스레드 수가 제한되어 있기 때문에, 대기 시간이 긴 Blocking 작업을 수행할 경우, 병렬 처리 성능이 떨어져 다른 작업들이 지연될 수 있다.

 

Schedulers.fromExecutorService()

기존에 이미 사용하고 있는 ExecutorService가 있다면 이 ExecutorService로부터 Scheduler를 생성하는 방식이다.

 

해당 기능을 사용하지 않고 ExecutorService에서 직접 생성할 수도 있지만, Reactor에서는 권장하지 않는다.

 

Schedulers.newXXXX()

Schedulers.newSingle, Schedulers.newBoundedElastic(), Schedulers.newParallel() 등의 메서드를 사용하여 사용자 정의 Scheduler 인스턴스를 생성할 수 있다. 

 

스레드 이름, 생성 가능한 디폴트 스레드의 개수, 스레드 유휴 시간, 데몬 스레드로의 동작 여부 등을 지정할 수 있다.

 

 

 

 

반응형

'Dev > Reactive Programming' 카테고리의 다른 글

[Reactive Programming] Debugging  (2) 2024.09.26
[Reactive Programming] Context  (0) 2024.09.12
[Reactive Programming] Sinks  (0) 2024.08.21
[Reactive Programming] Backpressure  (0) 2024.08.06
[Reactor] Cold Sequence와 Hot Sequence  (0) 2024.07.29