Dev/Reactive Programming

[Reactive Programming] Context

김세진 2024. 9. 12. 15:28
반응형

 

 

 

Context란?

 

Reactor에서 Context는 Reactor 구성요소(Operator 등) 간에 전파되는 key/value 형태의 저장소이다. ServletContext, SecurityContext 등과 유사한 개념이다.

 

  • Downstream에서 Upstream으로 Context가 전파되어 Operator 체인상의 각 Operator가 해당 Context 정보를 동일하게 이용할 수 있음
  • ThreadLocal과 유사하지만, Thread와 매핑되는 것이 아닌 Subscriber와 매핑된다. 즉, 구독이 발생할 때마다 구독과 연결된 하나의 Context가 생성된다.

 

사용 예시

@Slf4j
public class Example11_1 {
	public static void main(String[] args) throws InterruptedException {
		Mono
			.deferContextual(ctx ->
				Mono
					.just("Hello " + ctx.get("firstName"))
					.doOnNext(data -> log.info("# just doOnNext: {}", data))
			)
			.subscribeOn(Schedulers.boundedElastic())
			.publishOn(Schedulers.parallel())
			.transformDeferredContextual(
				(mono, ctx) -> mono.map(data -> data + " " + ctx.get("lastName"))
			)
			.contextWrite(context -> context.put("lastName", "Jobs")) // key/value 형태
			.contextWrite(context -> context.put("firstName", "Steve"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(100L);
	}
}

실행 결과

Context에 데이터 쓰기

위의 예시 코드에서와 같이 contextWrite() Operator를 통해 Context에 데이터를 쓰는 작업을 처리할 수 있다.

 

예시와 같이 람다 표현식 내 put()을 활용하여 key/value 형태로 값을 입력할 수 있다. 이외에 여러 key/value를 한 번에 입력하는 of() 메서드로 입력할 수도 있다.

 

contextWrite()의 구현은 다음과 같다.

public final Mono<T> contextWrite(Function<Context, Context> contextModifier) {
    return onAssembly(new MonoContextWrite(this, contextModifier));
}

contextWrite() 시그니처의 함수형 인터페이스에서 알 수 있듯, 파라미터 타입과 리턴 값 모두 Context로 되어 있어 Context 객체를 활용하여 값을 입력하는 작업을 처리함을 알 수 있다.

 

여기서 Context의 인스턴스 함수인 put()을 활용해 실제로 값을 입력한 뒤, 불변 객체로 전달하게 된다.

 

Context의 데이터 읽기

Context에서 데이터를 읽는 방법은 아래와 같이 크게 두 가지로 나뉜다.

  1. 원본 데이터 소스 레벨에서 읽는 방식: deferContextual() 사용
  2. Operator 체인의 중간에서 읽는 방식: transformDeferredContextual() 사용

 

그런데 Context에 데이터를 쓸 때와 다른 점이 있는데, 우선 아래의 deferContextual()의 구현을 살펴보도록 하자.

public static <T> Mono<T> deferContextual(Function<ContextView, ? extends Mono<? extends T>> contextualMonoFactory) {
    return onAssembly(new MonoDeferContextual(contextualMonoFactory));
}

아까와 달리 ContextView 타입을 사용하고 있는데, 읽기 작업에서는 원본 객체인 Context 대신 ContextView를 사용하여 처리함을 알 수 있다. transformDeferredContextual() 또한 마찬가지이다.

 

이는 Context를 조회하는 것이 목적인 조회성 API를 사용할 때, 읽기 전용 인터페이스인 ContextView를 통해 데이터를 조회하게 함으로써 개발자가 불변성을 유지할 수 있도록 돕는 역할을 한다.

 

 


자주 사용되는 Context 관련 API

 

Context API

Context API 설명
put(key, value) key/value 형태로 Context에 값을 쓴다.
of(key1, value1, key2, value2, ...) 정적 팩터리 메서드로, key/value 형태로 여러 개의 값을 받아 Context 객체로 반환한다. 
- 최대 5개의 데이터를 파라미터로 입력할 수 있고, 6개 이상의 데이터는 putAll()을 사용해야 한다.
putAll(ContextView) 현재 Context와 파라미터로 입력된 ContextView를 merge한다.
delete(key) Context에서 key에 해당하는 value를 삭제한다.

 

사용 예제

@Slf4j
public class Example11_3 {
	public static void main(String[] args) throws InterruptedException {
		final String key1 = "company";
		final String key2 = "firstName";
		final String key3 = "lastName";

		Mono
			.deferContextual(ctx ->
				Mono.just(ctx.get(key1) + ", " + ctx.get(key2) + " " + ctx.get(key3)))
			.publishOn(Schedulers.parallel())
			.contextWrite(context ->
				context.putAll(Context.of(key2, "Steve", key3, "Jobs").readOnly()))
			.contextWrite(context -> context.put(key1, "Apple"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(2000L);
	}
}

실행 결과

 

putAll()에는 파라미터로 Context가 아닌 ContextView 타입을 넘겨야 하는데, Context.of()는 Context를 반환한다. 따라서 ContextView로 변환이 필요한데, 이 작업을 readOnly() 메서드로 수행할 수 있다.

 

여담으로, 사실 readOnly()를 사용하지 않고 Context 타입으로 전달해도 예외가 발생하지는 않는데, 이는 putAll()에 Context 타입을 전달할 경우 내부적으로 ContextView로 알아서 변환해주기 때문이다. 단, API 의도에 명확하게 사용하는 형태가 아니므로 Deprecated되어 있는 메서드이기 때문에 IDE에서 경고를 주기는 한다.

 

ContextView API

ContextView API 설명
get(key) ContextView에서 key에 해당하는 value를 반환한다.
getOrEmpty(key) ContextView에서 key에 해당하는 value를 Optional로 래핑해서 반환한다.
getOrDefault(key, default value) ContextView에서 key에 해당하는 value를 가져온다. key에 해당하는 value가 없으면 default value를 반환한다.
hasKey(key) ContextView에서 특정 key가 존재하는지를 확인한다.
isEmpty() Context가 비어 있는지 확인한다.
size() Context 내에 있는 key/value의 개수를 반환한다.

 

사용 예제

@Slf4j
public class Example11_4 {
	public static void main(String[] args) throws InterruptedException {
		final String key1 = "company";
		final String key2 = "firstName";
		final String key3 = "lastName";

		Mono
			.deferContextual(ctx ->
				Mono.just(
					ctx.get(key1) + ", " +
						ctx.getOrEmpty(key2).orElse("no firstName") + " " +
						ctx.getOrDefault(key3, "no lastName"))
			)
			.publishOn(Schedulers.parallel())
			.contextWrite(context -> context.put(key1, "Apple"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(2000L);
	}
}

실행 결과

 

 


Context의 특징

 

1. Context는 구독이 발생할 때마다 하나의 Context가 해당 구독에 연결된다.

@Slf4j
public class Example11_5 {
	public static void main(String[] args) throws InterruptedException {
		final String key1 = "company";

		Mono<String> mono = Mono.deferContextual(ctx ->
				Mono.just("Company: " + ctx.get(key1))
			)
			.publishOn(Schedulers.parallel());

		mono.contextWrite(context -> context.put(key1, "Apple"))
			.subscribe(data -> log.info("# subscribe1 onNext: {}", data));

		mono.contextWrite(context -> context.put(key1, "Microsoft"))
			.subscribe(data -> log.info("# subscribe2 onNext: {}", data));

		Thread.sleep(100L);
	}
}

 

Context가 공통적으로 관리되어 한 가지 값으로 출력될 것 같지만, Context의 company가 각 구독 시점마다 다르게 출력하는 것을 확인할 수 있다.

 

만약 두 번째 구독을 시작하는 부분에서 context에 Microsoft를 넣는 부분을 지우고 구독만 하게 된다면, context 내부에 key1에 해당하는 값이 없어 NoSuchElementException이 발생하게 된다.

 

 

2. Context는 Operator 체인의 아래에서 위로 전파된다. 

이같은 특성 때문에 만약 동일한 키에 대한 값을 중복해서 저장한다면 가장 위에 위치한 contextWrite()가 저장한 값으로 덮어쓴다.

@Slf4j
public class Example11_6 {
	public static void main(String[] args) throws InterruptedException {
		String key1 = "company";
		String key2 = "name";

		Mono
			.deferContextual(ctx ->
				Mono.just(ctx.get(key1))
			)
			.publishOn(Schedulers.parallel())
			.contextWrite(context -> context.put(key2, "Bill"))
			.transformDeferredContextual((mono, ctx) ->
				mono.map(data -> data + ", " + ctx.getOrDefault(key2, "Steve"))
			)
			.contextWrite(context -> context.put(key1, "Apple"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(100L);
	}
}

실행 결과

 

만약 일반적인 절차적 프로그래밍 방식으로 코드가 동작할 것이라고 기대하고 context에 데이터를 쓰는 부분을 해석하려 한다면 다음과 같이 예상할 것이다.

  1. 첫 번째 contextWrite()에 의해 Context에 key2(name)의 Value로 Bill이 저장된다.
  2. transformDeferredContextual() 내부의 getOrDefuault()가 동작할 때, key2에 Bill이 저장되어 있으므로 그 값인 Bill을 Mono의 data에 이어붙여 저장한다.
  3. subscribe()로 구독이 될 때, mono의 데이터로 지정된 Bill을 출력한다.

하지만 실행 결과는 Bill이 아닌 Steve가 출력되었다. 분명 Bill을 저장했음에도, getOrDefault()에서 key2에 대한 값이 없다고 판단했다는 것인데, 이는 위에서 언급했듯 Context는 Operator 체인의 아래에서 위로 전파되는 특성이 있기 때문이다.

 

따라서 일반적으로 모든 Operator에서 Context에 저장된 데이터를 읽을 수 있도록, contextWrite()를 Operator 체인의 맨 마지막에 둔다.

 

3. Inner Sequence 내부에서는 외부 Context의 데이터를 읽을 수 있지만, Inner Sequence 외부에서는 내부 Context에 저장된 데이터를 읽을 수 없다.

@Slf4j
public class Example11_7 {
	public static void main(String[] args) throws InterruptedException {
		String key1 = "company";

		Mono
			.just("Steve")
			// .transformDeferredContextual((stringMono, ctx) ->
			// 	ctx.get("role"))
			.flatMap(name ->
				Mono.deferContextual(ctx ->
					Mono
						.just(ctx.get(key1) + ", " + name)
						.transformDeferredContextual((mono, innerCtx) ->
							mono.map(data -> data + ", " + innerCtx.get("role"))
						)
						.contextWrite(context -> context.put("role", "CEO"))
				)
			)
			.publishOn(Schedulers.parallel())
			.contextWrite(context -> context.put(key1, "Apple"))
			.subscribe(data -> log.info("# onNext: {}", data));

		Thread.sleep(100L);
	}
}

실행 결과

 

flatMap() Operator 내부에 있는 Sequence를 Inner Sequence라고 한다. 예제 코드를 보면 Inner Sequence 외부에서 key1에 "Apple"을 세팅하고, flatMap() 내부에서 ctx.get(key1)로 그 값을 가져와 사용하는 것을 볼 수 있다. 따라서 위 실행 결과와 같이 출력이 가능하다.

 

주석되어있는 부분을 살펴보면 ctx.get("role") 구문으로 Inner Sequence에 세팅된 Context 값을 가져오려고 한다는 것을 알 수 있다. 만약 이 주석을 해제한다면 다음과 같이 role을 key값으로 하는 Context 값이 존재하지 않기 때문에 NoSuchElementException이 발생하게 된다.

 

 

4. Context는 인증 정보 같은 직교성(독립성)을 가지는 정보를 전송하는 데 적합하다.

Context에 저장된 정보는 Operator 체인에 독립적으로 존재하기 때문에, 데이터 처리 로직에 간섭 없이 따로 관리될수 있어 독립적인 정보를 전송하는데 적합하다.

 

@Slf4j
public class Example11_8 {
	public static final String HEADER_AUTH_TOKEN = "authToken";

	public static void main(String[] args) {
		Mono<String> mono =
			postBook(Mono.just(
				new Book("abcd-1111-3533-2809"
					, "Reactor's Bible"
					, "Kevin"))
				)
				.contextWrite(Context.of(HEADER_AUTH_TOKEN, "eyJhbGci0i"));

		mono.subscribe(data -> log.info("# onNext: {}", data));
	}

	private static Mono<String> postBook(Mono<Book> book) {
		return Mono
			.zip(book,
				Mono
					.deferContextual(ctx ->
						Mono.just(ctx.get(HEADER_AUTH_TOKEN)))
			)
			.flatMap(tuple -> {
				String response = "POST the book(" + tuple.getT1().getBookName() +
					"," + tuple.getT1().getAuthor() + ") with token: " + tuple.getT2();
				return Mono.just(response); // HTTP POST 전송을 했다고 가정
			});
	}
}

@AllArgsConstructor
@Data
class Book {
	private String isbn;
	private String bookName;
	private String author;
}

 

 

HTTP 통신을 모방한 예제 코드이다. Mono가 어떤 과정을 거치든 상관없이 가장 마지막에 리턴된 Mono를 구독하기 직전에 contextWrite()로 Operator 체인의 아래에서 위쪽으로 데이터를 전파하기 때문에, Operator 체인의 어느 위치에서든 Context에 접근할 수 있다.

 

 

 

 

 

 

 

반응형

'Dev > Reactive Programming' 카테고리의 다른 글

[Reactive Programming] Debugging  (2) 2024.09.26
[Reactive Programming] Scheduler  (0) 2024.09.04
[Reactive Programming] Sinks  (0) 2024.08.21
[Reactive Programming] Backpressure  (0) 2024.08.06
[Reactor] Cold Sequence와 Hot Sequence  (0) 2024.07.29