티스토리 뷰

스프링 5부터 Spring Webflux를 통해 reactive 개발이 가능하게 됐습니다. 요청당 스레드가 하나씩 차지했던 기존의 패러다임과 달리 Webflux는 non-blocking 시스템이 가능하게 해줍니다.

하지만 non-blocking을 지원하지 않는 blocking persistence API (ex. jdbc)를 쓰는 경우에는 어떻게 해야할까요? non-blocking을 지원하는 nosql 데이터베이스를 사용하거나 r2dbc를 사용하는 것이 방법이 될 수 있겠으나 아직까진 mysql 등의 rdbms는 지원되지 않습니다. 이번 글에서는 Spring Webflux + JDBC 환경에서 어떤식으로 개발을 해야되는지에 대해 살펴보겠습니다.

일반적인 웹 구조인 controller -> service -> repository 형태로 예를 들어보겠습니다.

@Slf4j
@RestController
public class MemoController {
    private final MemoService memoService;

    public MemoController(MemoService memoService) {
        this.memoService = memoService;
    }

    @PostMapping("/memos")
    public Mono<MemoResponseDTO> save(@Valid @RequestBody MemoRequestDTO memoRequestDTO) {
        log.info("===== MemoController 시작 =====");

        return memoService.save(memoRequestDTO)
            .map(MemoResponseDTO::of)
            .log();
    }
}
@Slf4j
@Service
public class MemoService {
    private final MemoRepository memoRepository;

    public MemoService(MemoRepository memoRepository) {
        this.memoRepository = memoRepository;
    }

    public Mono<Memo> save(MemoRequestDTO memoRequestDTO) {
        log.info("===== MemoService 시작 =====");

        return Mono.just(memoRepository.save(Memo.of(memoRequestDTO)))
            .log();
    }
}
@Slf4j
@Repository
public class MemoRepository {
    public Memo save(Memo memo) {
        log.info("===== MemoRepository 시작 =====");
        // ...blocking persistence APIs (JPA, JDBC) or networking APIs to use
        return memo;
    }
}

위 예에서 MemoRepository는 단순히 Memo 객체를 반환하지만 blocking persistence API를 이용하는 레이어라고 생각하시면 됩니다. JPA 환경이라면 MemoRepository extends JpaRepository<Memo, ID> 이런 식이 되겠지요.

MemoService는 blocking I/O인 MemoRepository의 결과를 단순히 Mono.just로 감싸서 결과를 반환하도록 구현하였습니다. 이제 실행결과를 한번 살펴볼까요?

2018-12-21 14:09:20.705  INFO 67826 --- [ctor-http-nio-2] c.e.demo.interfaces.MemoController       : ===== MemoController 시작 =====
2018-12-21 14:09:20.706  INFO 67826 --- [ctor-http-nio-2] c.e.demo.domain.service.MemoService      : ===== MemoService 시작 =====
2018-12-21 14:09:20.711  INFO 67826 --- [ctor-http-nio-2] c.e.d.domain.repository.MemoRepository   : ===== MemoRepository 시작 =====
2018-12-21 14:09:20.728  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.CallableOnAssembly.1        : onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2018-12-21 14:09:20.729  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2018-12-21 14:09:20.729  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | request(unbounded)
2018-12-21 14:09:20.729  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.CallableOnAssembly.1        : request(unbounded)
2018-12-21 14:09:20.730  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.CallableOnAssembly.1        : onNext(com.example.demo.domain.model.Memo@3b09310d)
2018-12-21 14:09:20.730  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | onNext(com.example.demo.domain.dto.MemoResponseDTO@67c2f017)
2018-12-21 14:09:20.751  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.CallableOnAssembly.1        : onComplete()
2018-12-21 14:09:20.752  INFO 67826 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | onComplete()

결과를 보면 하나의 쓰레드(ctor-http-nio-2) 위에서 생산과 소비가 이루어졌습니다. 기존의 blocking 코드들과 크게 다를 바가 없어보입니다. synchronous, blocking call이 도중에 포함이 된다면 단순히 Mono, Flux를 이용하여 개발을 했다고 non-blocking이 되진 않습니다.

그럼 jdbc와 같이 blocking I/O가 있는 경우에는 어떻게 처리를 해야할까요? Spring 공식 문서를 살펴보면 다음과 같이 적혀져있습니다.

A simple way to evaluate an application is to check its dependencies. If you have blocking persistence APIs (JPA, JDBC) or networking APIs to use, Spring MVC is the best choice for common architectures at least. It is technically feasible with both Reactor and RxJava to perform blocking calls on a separate thread but you would not be making the most of a non-blocking web stack.

요약하자면, blocking call 부분을 별도의 쓰레드에서 background로 돌려라고 적혀져있습니다. 그럼 구체적으로 어떻게 쓰면 될까요? Reactor 공식 문서를 살펴보면 How do I wrap a synchronous, blocking call?라는 주제로 자세히 잘 설명을 해주고 있습니다.

Mono blockingWrapper = Mono.fromCallable(() -> { 
    return /* make a remote synchronous call */ 
}).subscribeOn(Schedulers.elastic());

Mono.fromCallable를 사용하여 blocking call 부분의 실행을 미루고, 이를 Schedulers.elastic()를 사용하여 blocking 자원들을 기다리는 별도의 쓰레드를 생성하여 실행시키라고 설명되어 있습니다(직접 custom scheduler를 만들어 사용하여도 무관합니다).

reactor.core.scheduler.Schedulers에서 기본으로 제공하는 스케쥴러들은 다음과 같습니다.

  • Schedulers.immediate() : Current thread.
  • Schedulers.single() : A single, reusable thread.
  • Schedulers.newSingle() : A per-call dedicated thread.
  • Schedulers.elastic() : An elastic thread pool. It creates new worker pools as needed, and reuse idle ones. This is a good choice for I/O blocking work for instance.
  • Schedulers.parallel() : A fixed pool of workers that is tuned for parallel work.

그럼 이제 처음에 살펴봤던 예제 코드로 다시 돌아가 MemoSerivce 부분을 변경해보겠습니다.

@Slf4j
@Service
public class MemoService {
    private final MemoRepository memoRepository;

    public MemoService(MemoRepository memoRepository) {
        this.memoRepository = memoRepository;
    }

    public Mono<Memo> save(MemoRequestDTO memoRequestDTO) {
        log.info("===== MemoService 시작 =====");

        return Mono.fromCallable(() -> memoRepository.save(Memo.of(memoRequestDTO)))
                .subscribeOn(Schedulers.elastic())
                .log();
    }
}

그리고 다시 실행결과를 살펴봅시다.

2018-12-21 13:34:36.926  INFO 52520 --- [ctor-http-nio-2] c.e.demo.interfaces.MemoController       : ===== MemoController 시작 =====
2018-12-21 13:34:36.926  INFO 52520 --- [ctor-http-nio-2] c.e.demo.domain.service.MemoService      : ===== MemoService 시작 =====
2018-12-21 13:34:36.982  INFO 52520 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.1                : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2018-12-21 13:34:36.982  INFO 52520 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | onSubscribe([Fuseable] FluxOnAssembly.OnAssemblySubscriber)
2018-12-21 13:34:36.983  INFO 52520 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.2                : | request(unbounded)
2018-12-21 13:34:36.983  INFO 52520 --- [ctor-http-nio-2] reactor.Mono.OnAssembly.1                : | request(unbounded)
2018-12-21 13:34:36.990  INFO 52520 --- [      elastic-2] c.e.d.domain.repository.MemoRepository   : ===== MemoRepository 시작 =====
2018-12-21 13:34:41.994  INFO 52520 --- [      elastic-2] reactor.Mono.OnAssembly.1                : | onNext(com.example.demo.domain.model.Memo@3c0aa92b)
2018-12-21 13:34:41.995  INFO 52520 --- [      elastic-2] reactor.Mono.OnAssembly.2                : | onNext(com.example.demo.domain.dto.MemoResponseDTO@1c14c9b0)
2018-12-21 13:34:42.013  INFO 52520 --- [      elastic-2] reactor.Mono.OnAssembly.1                : | onComplete()
2018-12-21 13:34:42.013  INFO 52520 --- [      elastic-2] reactor.Mono.OnAssembly.2                : | onComplete()

blocking call 부분인 MemoRepository 호출이 별도의 스레드(elastic-2)에서 돌아가는 것을 확인할 수 있습니다.

Spring Webflux와 jdbc(혹은 blocking call이 포함된 로직)를 사용하고 계시다면 나의 생각과 달리 blocking으로 돌아가고 있지 않은지 잘 살펴보는 습관이 필요할 것 같습니다.

참고

댓글