들어가며
페스타고 서비스에서는 특정 학교의 재학생임을 인증하기 위해, 학교 웹메일 인증 방식을 채택하였다.
사용자가 자신의 학교웹메일 주소를 작성하고 ‘인증 메일 전송’ 버튼을 클릭하면, 클라이언트에서 서버에게 인증 메일 전송 API 요청을 보낸다.
그러면 서버에서는 학생 인증을 위한 각종 조건을 검증한 후, 랜덤한 인증코드 생성 후 이를 담은 메일을 보낸다.
메일을 전송하는 과정은 평균 3.5초정도 소요되었다.
사용자가 응답을 위해 3.5초를 기다리기 보다는, 검증을 마치고 응답을 보낸 후 이메일 전송은 비동기로 진행하는 것이 더 나은 사용자 경험이라 판단했다.
(만약 메일 전송 과정에서 실패해 메일이 수신되지 않으면 사용자는 메일 재전송 버튼을 누를 것 이다.)
이메일 전송을 비동기로 처리하기 위해 Spring의 @Async 어노테이션을 활용했는데, 이번 포스팅에서는 @Async에 대해 자세히 알아보고자 한다.
비동기 처리 without @Async
아래와 같이 매번 Thread를 생성해 메서드를 별도의 스레드에서 실행시켜줘야한다.
@Override
public void send(MailPayload payload) {
new Thread(() -> {
SimpleMailMessage mail = new SimpleMailMessage();
mail.setFrom(fromMail);
mail.setTo(payload.email());
mail.setSubject("인증 코드");
mail.setText(payload.code());
mailSender.send(mail);
}).start();
}
이는 번거롭기도 하고, 핵심 기능과 부가 기능(비동기 로직)이 한 메서드에 섞여있어 개발자가 핵심 기능 작성에만 집중하기 어렵다.
또한, 위 코드는 스레드를 스레드풀에서 가져오는게 아니라 매번 새롭게 생성하기 때문에 문제가 발생하기 쉽다.
@Async를 활용한 비동기 처리
@Async를 활용한 비동기 처리는 아주 간단하다.
- @EnableAsync 어노테이션으로 Spring에게 우리가 @Async 어노테이션을 활용할 것임을 알린다.
- 비동기로 동작하기 바라는 메서드에 @Async 어노테이션을 붙인다.
아래와 같이 메서드에 @Async를 붙임으로써 개발자는 핵심 기능에만 집중할 수 있다.
@Async
public void send(MailPayload payload) {
SimpleMailMessage mail = new SimpleMailMessage();
mail.setFrom(fromMail);
mail.setTo(payload.email());
mail.setSubject("인증 코드");
mail.setText(payload.code());
mailSender.send(mail);
}
@Async의 원리
@Async는 기본적으로 스프링 AOP에 의해 프록시 패턴 기반으로 동작한다.
실행 과정은 다음과 같다.
- @Async 어노테이션이 붙은 메서드가 호출되면, 스프링은 해당 호출을 가로채서 비동기 실행을 처리하기 위한 프록시 객체를 생성한다.
- 해당 메서드는 TaskExecutor에 의해 스레드풀에 작업으로 등록한다.
- 해당 메서드는 호출자와 별도의 스레드에서 작업이 진행되며, 호출자 메서드는 블러킹되지 않고 즉시 리턴된다.
따라서 @Async가 올바르게 동작하기 위해서 두 가지 조건이 존재한다.
- public 메서드에서만 적용 가능하다.
- self-invocation은 불가하다. (같은 클래스의 메서드를 호출할 수 없다.)
참고로, 이는 EnableAsync의 기본 AdviceMode가 PROXY인 경우이다.
EnableAsync의 AdviceMode를 ASPECTJ로 설정할 경우 동작방식이 다르고 제약조건도 없어진다. (ASPECTJ 모드를 사용하기 위해서는 추가적인 설정이 필요하다.)
@EnableAsync 적용 방법
1. 구성 클래스에 직접 적용
@EnableAsync 어노테이션은 아래와 같이 구성 클래스에 붙일 수 있다.
별도의 TaskExecutor를 설정하지 않으면, SimpleAsyncTaskExecutor가 활용되므로 권장되지 않는 방법이다.
(사실 SpringBoot에서는 autoConfiguration을 통해 ThreadPoolTaskExecutor가 활용되는데 이는 밑에서 자세히 설명하겠다.)
@EnableAsync
@SpringBootApplication
public class EmailApplication {
public static void main(String[] args) {
SpringApplication.run(EmailApplication.class, args);
}
}
2. AsyncConfigurer 인터페이스 활용
AsyncConfigurer 인터페이스를 구현하여 별도의 TaskExecutor를 설정해줄 수 있다.
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(5);
executor.setQueueCapacity(5);
executor.setKeepAliveSeconds(30);
executor.setThreadNamePrefix("async-executor-");
executor.initialize();
return executor;
}
}
3. @Bean 활용
@Bean을 사용해 TaskExecutor를 직접 빈으로 등록하여 사용할 수 있다.
이 때, 다수의 Bean을 등록해서 활용할 수 있다.
프로필(dev/prod)별로 다른 스레드풀을 활용하고 싶을 때 유리하다.
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean
public Executor asyncExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setThreadNamePrefix("exec1-");
executor.initialize();
return executor;
}
@Bean
public Executor asyncExecutor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setThreadNamePrefix("exec2-");
executor.initialize();
return executor;
}
}
ThreadPoolTaskExecutor를 활용한 스레드 관리
@Async 어노테이션을 사용할 때, 별도의 TaskExecutor를 설정을 해주지 않으면, SimpleAsyncTaskExecutor가 기본적으로 사용된다.
SimpleAsyncTaskExecutor는 스레드 풀을 사용하지 않고, 매 요청마다 새로운 스레드를 생성해 작업을 수행한다.
만약 1000명의 사용자가 동시에 인증 메일 전송 요청을 한다면, 1000개의 스레드를 생성하려하고 이는 리소스 부족 문제로 이어지기 쉽다.
따라서 ThreadPoolTaskExecutor와 같은 스레드 풀 기반의 TaskExecutor을 사용하도록 설정해야한다.
ThreadPoolTaskExecutor의 주요 옵션들
ThreadPoolTaskExecutor의 주요 옵션의 기본 설정은 아래와 같다. setter를 통해 본인이 원하는 값으로 커스터마이징할 수 있다.
- corePoolSize = 1
- 스레드 풀에 항상 살아있는 최소 스레드 수
- 예상 최대 동시 작업 수에 가까운 값으로 설정하는 것이 좋다. (ex. 8코어 CPU → corePoolSize 8)
- maxPoolSize = Integer.MAX_VALUE
- 스레드 풀이 확장할 수 있는 최대 스레드 수
- [스레드 풀 크기 + 초과 요청을 담는 큐의 크기]가 넘는 요청이 들어온 경우, 스레드풀이 얼만큼 확장할 수 있는지.
- queueCapacity = Integer.MAX_VALUE
- 스레드 풀에서 사용할 최대 큐의 크기
- threadNamePrefix = “클래스이름-”
- 생성된 각 스레드의 이름 접두사
- keepAliveSeconds = 60
- 스레드풀 내 스레드 개수가 corePoolSize 초과인 상태에서, 대기 상태의 스레드가 종료되기까지의 대기 시간
밑에서 자세히 설명하겠지만, SpringBoot를 사용할 경우 autoConfiguration으로 ThreadPoolTaskExecutor가 자동으로 등록되기 때문에 application.yml으로 executor의 옵션을 지정해주는 방법도 있다.
spring:
task:
execution:
pool:
core-size: 5
max-size: 5
queue-capacity: 5
keep-alive: 30s
ThreadPoolTaskExecutor의 동작 방식
위에서 ThreadPoolTaskExecutor의 옵션들을 살펴봤다.
이제 ThreadPoolTaskExecutor가 위 옵션들을 바탕으로 어떻게 동작하는지 알아보자.
- 스레드풀에 작업(task)를 등록하면, 스레드풀에 corePoolSize 만큼의 스레드가 존재하는지 확인한다.
- 스레드에 작업을 할당한다.
- 스레드풀의 스레드 개수가 corePoolSize보다 적으면, 스레드풀에 새로운 스레드를 생성하고 작업을 할당한다. (corePoolSize는 스레드풀에 항상 유지되어야하는 스레드의 최소 수로, 대기중인 기존 스레드가 존재해도 새롭게 생성한다.)
- 스레드풀의 스래드 개수가 corePoolSize보다 크면, 스레드풀의 대기 상태 스레드에게 작업을 할당한다.
- 스레드풀에 존재하는 모든 스레드가 작업중이면(대기중인 스레드가 없으면) BlockingQueue에 작업을 넣어 작업을 대기시킨다.
- BlockingQueue가 가득 찬 경우, 현재 스레드풀의 스레드 수가 maxPoolSize를 넘지 않으면 새로운 스레드를 생성하여 작업을 할당한다.
- 스레드 풀의 스레드 수가 maxPoolSize에 도달한 상태에서 새로운 요청이 들어오면, 더 이상 스레드를 생성할 수 없고 큐에도 대기시킬 수 없다. → TaskRejectedException이 발생한다.
- 작업중인 스레드가 작업을 마치면, BlockingQueue에 대기중인 작업이 있는지 확인한다.
- 대기중인 작업이 있으면, 해당 작업을 가져와 다시 작업을 수행한다.
- 대기중인 작업이 없으면, 해당 스레드는 대기 상태로 돌아간다. 만약, 스레드풀의 스레드 개수가 corePoolSize보다 크면 keepAliveTime이 지나고 해당 스레드는 스레드풀에서 제거된다.
ThreadPoolTaskExecutor 예외 처리
RejectedExecutionHandler를 통해 ThreadPoolTaskExecutor에서 스레드풀 내에서 더 이상 작업을 처리할 수 없을때의 예외 처리 전략을 설정할 수 있다.
RejectedExecutionHandler의 기본 전략은 AbortPolicy이다.
Spring에서 제공하는 전략과 그 특징들은 아래와 같다.
- AbortPolicy: TaskRejectedException을 발생시키며 종료한다.
- CallerRunsPolicy: 스레드풀을 호출한 스레드에서 처리한다. (톰캣에서 스레드풀을 호출했다면, 톰캣 스레드가 요청을 처리한다.)
- DiscardPolicy: 해당 요청들을 무시한다.
- DiscardOldestPolicy: 큐에 있는 가장 오래된 요청을 삭제하고 새로운 요청을 받아들인다. (queueCapacity가 0인 경우 StackOverFlowError가 발생한다.)
아래와 같이 전략을 설정해줄 수 있다.
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
executor.initialize();
return executor;
}
}
Spring에서 제공하는 전략을 활용하는 방식 이외에도 직접 핸들링하는 방법도 있다.
@EnableAsync
@Configuration
public class AsyncConfig implements AsyncConfigurer {
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setRejectedExecutionHandler((r, exec) -> {
throw new IllegalArgumentException("더 이상 요청을 처리할 수 없습니다.");
});
executor.initialize();
return executor;
}
}
실험하기
아래 메서드로 실험을 진행해보았다.
@Component
@Slf4j
public class MockMailClient implements MailClient {
@Async
@Override
public void send(MailPayload payload) {
try {
Thread.sleep(3500);
log.info("MockMailTest");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
실험을 진행한 테스트코드는 아래와 같다.
@SpringBootTest
class AsyncTest {
@Autowired
MailClient mailClient;
@Test
void asyncTest() throws InterruptedException {
for (int i = 0; i < 100; i++) {
mailClient.send(new MailPayload("a", "b"));
}
Thread.sleep(100000);
}
}
마지막에 Thread.sleep(100000);를 해준 이유는, 해당 구문이 없으면 비동기 로직이 완료될 때 까지 기다리지 않고 테스트 함수가 끝나버리기 때문이다.
별도의 TaskExecutor를 설정하지 않은 경우
- 예상 결과: 100개의 스레드를 생성한다. (생성하다 리소스 부족으로 예외가 발생한다.)
- 실제 결과: 8개의 스레드를 재활용한다. (실행 시각을 보면 작업이 8개씩 나눠서 진행되었음을 알 수 있다.)
즉, 스레드풀의 정의가 없어서 SimpleAsyncTaskExecutor을 사용하는 줄 알았지만, corePoolSize가 8인 ThreadPoolTaskExecutor을 사용했음을 알 수 있다.
실제로 디버깅 결과도 그러했다.
그렇다면 왜 SimpleAsyncTaskExecutor이 아닌 ThreadPoolTaskExecutor을 사용했을까?!
@SpringBootApplication 어노테이션을 타고 가면 @EnableAutoConfiguration 어노테이션이 있다.
spring.factories 내부에 수많은 Configuration이 들어있고, 이 어노테이션은 스프링 어플리케이션이 실행 될 때 이 설정파일들을 토대로 빈을 등록해준다.
그리고 수많은 Configuration 중 하나가 TaskExecutionAutoConfiguration이다.
TaskExecutionAutoConfiguration 내부를 살펴보면, TaskExecutor로 ThreadPoolTaskExecutor을 등록함을 알 수 있다.
이 때, ThreadPoolTaskExecutor의 속성들은 TaskExecutionProperties 기반으로 이루어지는데, 여기서 corePoolSize(8)와 threadNamePrefix(”task-”) 등을 확인할 수 있다.
즉, Spring으로 해당 테스트를 진행했으면 SimpleAsyncTaskExecutor를 사용했겠지만 SpringBoot 환경에서 테스트를 진행했기 때문에 ThreadPoolTaskExecutor을 사용했다.
ThreadPoolTaskExecutor 1개를 빈으로 등록
첫번째 실험
- 실험 환경
- corePoolSize: 5
- maxPoolSize: 10
- queueCapacity: 5
- 작업 개수: 15
- 예상 결과: 성공
- [1~5] 1~5번 스레드 생성 → [6~10] queue에 대기 → [11~15] 6~10번 스레드 생성
- 실제 결과: 예상과 일치
실행시각을 보면 1~5번, 11~15번 작업은 거의 동시에 실행되고, 6~10번 작업은 큐에서 대기하다 앞선 작업들이 끝난 후 해당 스레드들에서 순차적으로 실행되었음을 확인할 수 있다.
두번째 실험
- 실험 환경
- corePoolSize: 5
- maxPoolSize: 5
- queueCapacity: Integer.MAX
- 작업 개수: 15
- 예상 결과: 성공
- [1~5] 1~5번 스레드 생성 → [6~15] queue에서 대기 후 실행
- 실제 결과: 예상과 일치
세번째 실험
- 실험 환경
- corePoolSize: 5
- maxPoolSize: 5
- queueCapacity: 5
- 작업 개수: 15
- 예상 결과: 실패
- [1~5] 1~5번 스레드 생성 → [6~10] queue에서 대기 → [11] TaskRejectedException 발생
- 실제 결과: 예상과 일치
11번째 작업에서 큐도 가득차고, 더 이상 스레드의 수도 늘릴 수 없어서 실패했다.
ThreadPoolTaskExecutor 2개를 빈으로 등록
아래는 @EnableAsync의 주석 중 일부이다.
By default, Spring will be searching for an associated thread pool definition: either a unique org.springframework.core.task.TaskExecutor bean in the context, or an java.util.concurrent.Executor bean named "taskExecutor" otherwise. If neither of the two is resolvable, a org.springframework.core.task.SimpleAsyncTaskExecutor will be used to process async method invocations. Besides, annotated methods having a void return type cannot transmit any exception back to the caller. By default, such uncaught exceptions are only logged.
즉, 빈으로 등록된 TaskExecutor이 하나인 경우 그를 사용하고, 2개 이상인 경우 이름이 “taskExecutor”인 빈을 사용한다. 만약 이름이 “taskExecutor”인 빈이 없을 경우, SimpleAsyncTaskExecutor을 대신 사용한다.
아래와 같이 2개의 ThreadPoolTaskExecutor 빈을 등록한 후, 1000개의 작업에 대한 테스트를 실행해보았다.
@EnableAsync
@Configuration
public class AsyncConfig {
@Bean
public Executor asyncExecutor1() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setThreadNamePrefix("exec1-");
executor.initialize();
return executor;
}
@Bean
public Executor asyncExecutor2() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(15);
executor.setThreadNamePrefix("exec2-");
executor.initialize();
return executor;
}
}
아래와 같은 메세지와 함께 테스트 결과가 출력되었다.
2023-08-25T15:06:24.473+09:00 INFO 68398 --- [ Test worker] .s.a.AnnotationAsyncExecutionInterceptor : More than one TaskExecutor bean found within the context, and none is named 'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly as an alias) in order to use it for async processing: [asyncExecutor1, asyncExecutor2]
작업의 개수(1000개) 만큼의 스레드가 생성되었다.
디버깅을 찍어보니 SimpleAsyncTaskExecutor가 실행되었다.
이외에도 다양한 실험을 진행하였으나, 포스팅이 한없이 길어질 것 같아서 여기서 마무리하겠다.
마무리
@Async와 ThreadPoolTaskExecutor에 대해 자세히 알아보았다.
기존에는 ThreadPoolTaskExecutor의 설정을 별 고민 없이 설정했는데, API 스펙과 서버 스펙 등을 따져보고 신중하게 결정해야함을 깨달았다.
특히 corePoolSize가 너무 작으면 너무 낮은 처리량을 보이고, 크면 그 만큼의 스레드는 사용되지 않더라도 항상 대기상태로 유지되기 때문에 리소스 낭비로 이어지기 쉽다.
또한 큐 사이즈를 크게하고 maxPoolSize를 작게 하면 항상 적은수의 스레드가 사용됨을 보장한다는 장점은 있지만 그만큼 낮은 처리량을 보인다는 단점이 있다. 그 반대이면 리소스 낭비로 이어지기 쉽다.
아직 적절한 값을 산정하는 방법은 잘 모르겠다. 경험이 쌓이면 산정하는 방법을 익힐 수 있을까?
참고 자료
'프로그래밍' 카테고리의 다른 글
[오픈소스] 톰캣에 컨트리뷰트하기 🐱 (13) | 2023.09.05 |
---|---|
[Java] I/O Stream (3) | 2023.09.02 |
[Spring] Oauth2 소셜 로그인 확장에 유리하게 개선하기 (Kakao, Google, Naver) (1) | 2023.08.23 |
[Docker / MySQL] Docker로 띄운 mySQL 컨테이너에 employee sample DB 적재하기 (2) | 2023.08.15 |
Git-flow란? (1) | 2023.07.12 |