1. 소개
SEDA(Staged Event-Driven Architecture)는 Matt Welsh 박사 가 제안한 건축 스타일입니다 . 논문 . 주요 이점은 확장성, 고도의 동시 트래픽 지원 및 유지 관리 용이성입니다.
이 예제에서는 SEDA를 사용하여 Spring Integration 과 Apache Camel 이라는 두 가지 개별 구현을 사용하여 문장에서 고유한 단어를 계산합니다 .
2. 세다
SEDA는 온라인 서비스에 특정한 몇 가지 비기능적 요구 사항을 해결합니다 .
- 높은 동시성 : 아키텍처는 가능한 많은 동시 요청을 지원해야 합니다.
- 동적 콘텐츠 : 소프트웨어 시스템은 종종 복잡한 비즈니스 사용 사례를 지원해야 하며 사용자 요청을 처리하고 응답을 생성하는 데 많은 단계가 필요합니다.
- 로드에 대한 견고성 : 온라인 서비스에 대한 사용자 트래픽은 예측할 수 없으며 아키텍처는 트래픽 양의 변화를 원활하게 처리해야 합니다.
이러한 요구 사항을 해결하기 위해 SEDA는 복잡한 서비스를 이벤트 기반 단계로 분해합니다 . 이러한 단계는 Queue과 간접적으로 연결되므로 서로 완전히 분리될 수 있습니다. 또한 각 단계에는 들어오는 로드에 대처하기 위한 확장 메커니즘이 있습니다.
Matt Welsh의 논문에 있는 위 다이어그램은 SEDA로 구현된 웹 서버의 전체 구조를 보여줍니다. 각 사각형은 들어오는 HTTP 요청에 대한 단일 처리 단계를 나타냅니다. 스테이지는 수신 Queue에서 작업을 독립적으로 소비하고 일부 처리 또는 I/O 작업을 수행한 후 메시지를 다음 Queue로 전달할 수 있습니다.
2.1. 구성품
SEDA의 구성 요소를 더 잘 이해하기 위해 Matt Welsh의 논문에 있는 이 다이어그램이 단일 단계의 내부 작동을 어떻게 보여주는지 살펴보겠습니다.
보시다시피 각 SEDA 단계에는 다음 구성 요소가 있습니다.
- 이벤트: 이벤트는 스테이지에서 처리를 수행하는 데 필요한 모든 데이터를 포함하는 데이터 구조 입니다. 예를 들어 HTTP 웹 서버의 경우 이벤트에는 본문, 헤더 및 요청 매개변수와 같은 사용자 데이터와 사용자의 IP, 요청 타임스탬프 등과 같은 인프라 데이터가 포함될 수 있습니다.
- Event Queue : 스테이지의 들어오는 이벤트를 보관합니다.
- 이벤트 핸들러 : 이벤트 핸들러는 스테이지의 절차적 논리입니다. 이것은 이벤트 큐에서 다른 관련 이벤트 큐로 데이터를 전달하는 단순한 라우팅 단계이거나 어떻게든 데이터를 처리하는 더 복잡한 단계일 수 있습니다. 이벤트 처리기는 이벤트를 개별적으로 또는 일괄적으로 읽을 수 있습니다. 후자는 하나의 쿼리로 여러 데이터베이스 레코드를 업데이트하는 것과 같이 일괄 처리에 성능상의 이점이 있을 때 유용합니다.
- 발신 이벤트 : 비즈니스 사용 사례 및 흐름의 전체 구조를 기반으로 각 단계에서 새 이벤트를 0개 이상의 이벤트 Queue로 보낼 수 있습니다. 발신 메시지 작성 및 전송은 이벤트 핸들러 메소드에서 수행됩니다.
- 스레드 풀 : 스레딩은 잘 알려진 동시성 메커니즘입니다. SEDA에서 스레딩은 각 단계에 대해 지역화되고 사용자 정의됩니다. 즉, 각 단계는 스레드 풀을 유지 관리합니다. 따라서 요청당 하나의 스레드 모델과 달리 각 사용자 요청은 SEDA에서 여러 스레드에 의해 처리됩니다. 이 모델을 사용하면 복잡성에 따라 각 단계를 독립적으로 조정할 수 있습니다.
- 컨트롤러 : SEDA 컨트롤러는 스레드 풀 크기, 이벤트 큐 크기, 스케줄링 등과 같은 리소스 소비를 관리하는 메커니즘입니다. 컨트롤러는 SEDA의 탄력적인 동작을 담당합니다 . 간단한 컨트롤러는 각 스레드 풀의 활성 스레드 수를 관리할 수 있습니다. 보다 정교한 컨트롤러는 런타임 시 전체 애플리케이션을 모니터링하고 다양한 매개변수를 조정하는 복잡한 성능 조정 알고리즘을 구현할 수 있습니다. 또한 컨트롤러는 비즈니스 로직에서 성능 조정 로직을 분리합니다. 관심사를 분리하면 코드를 유지 관리하기가 더 쉬워집니다.
SEDA는 이러한 모든 구성 요소를 결합하여 높고 변동이 심한 트래픽 부하를 처리하기 위한 강력한 솔루션을 제공합니다.
3. 샘플 문제
다음 섹션에서는 SEDA를 사용하여 동일한 문제를 해결하는 두 가지 구현을 생성합니다.
예제 문제는 간단 합니다. 주어진 문자열 내에서 각 단어가 대소문자를 구분하지 않고 나타나는 횟수를 세 십시오.
공백이 없는 일련의 문자로 단어를 정의하고 구두점과 같은 다른 복잡함은 무시합니다. 우리의 출력은 단어를 키로 포함하고 개수를 값으로 포함하는 맵이 될 것입니다. 예를 들어 " 내 이름은 Hesam입니다 "라는 입력이 주어지면 출력은 다음과 같습니다.
{
"my": 1,
"name": 1,
"is": 1,
"hesam": 1
}
3.1. 문제를 SEDA에 적용
SEDA 단계 측면에서 문제를 살펴보겠습니다. 확장성은 SEDA의 핵심 목표이므로 특히 I/O 집약적인 작업이 있는 경우 특정 작업에 초점을 맞춘 작은 단계를 설계하는 것이 좋습니다 . 게다가 작은 스테이지는 각 스테이지의 규모를 더 잘 조율하는 데 도움이 됩니다.
단어 수 문제를 해결하기 위해 다음 단계로 솔루션을 구현할 수 있습니다.
이제 스테이지 디자인이 있으므로 두 가지 다른 엔터프라이즈 통합 기술을 사용하여 다음 섹션에서 이를 구현해 보겠습니다. 이 표에서 구현 시 SEDA가 어떻게 표시되는지 미리 볼 수 있습니다.
SEDA 구성요소 | 스프링 통합 | 아파치 카멜 |
---|---|---|
이벤트 |
org.springframework.messaging.Message | org.apache.camel.Exchange |
이벤트 Queue |
org.springframework.integration.channel |
URI 문자열로 정의된 엔드포인트 |
이벤트 핸들러 |
기능적 인터페이스의 인스턴스 | Camel 프로세서, Camel 유틸리티 클래스 및 Function s |
스레드 풀 |
TaskExecutor 의 스프링 추상화 | SEDA 엔드포인트의 기본 지원 |
4. Spring Integration을 이용한 솔루션
첫 번째 구현에서는 Spring Integration을 사용합니다. Spring Integration은 인기 있는 엔터프라이즈 통합 패턴을 지원하기 위해 Spring 모델을 기반으로 합니다 .
Spring Integration에는 세 가지 주요 구성 요소가 있습니다.
- 메시지는 헤더와 본문을 포함하는 데이터 구조입니다.
- 채널은 한 Endpoints에서 다른 Endpoints으로 메시지를 전달합니다. Spring Integration에는 두 가지 종류의 채널이 있습니다.
- point-to-point: 하나의 Endpoints만 이 채널의 메시지를 소비할 수 있습니다.
- 게시-구독: 여러 엔드포인트가 이 채널의 메시지를 사용할 수 있습니다.
- Endpoints은 일부 비즈니스 논리를 수행하는 응용 프로그램 구성 요소로 메시지를 라우팅합니다. 변환기, 라우터, 서비스 액티베이터 및 필터와 같은 다양한 엔드포인트가 Spring Integration에 있습니다.
Spring 통합 솔루션의 개요를 살펴보겠습니다.
4.1. 의존성
Spring Integration, Spring Boot Test 및 Spring Integration Test 에 대한 의존성을 추가하여 시작하겠습니다 .
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
4.2. 메시지 게이트웨이
메시징 게이트웨이 는 메시지를 통합 플로우로 전송하는 복잡성을 숨기는 프록시입니다. Spring 통합 흐름에 대해 하나를 설정해 보겠습니다.
@MessagingGateway
public interface IncomingGateway {
@Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
public Map<String, Long> countWords(String input);
}
나중에 이 게이트웨이 메서드를 사용하여 전체 흐름을 테스트할 수 있습니다.
incomingGateway.countWords("My name is Hesam");
Spring 은 org.springframework.messaging.Message 의 인스턴스 내 에서 "My name is Hesam" 입력을 래핑하고 그것을 receiveTextChannel 에 전달하고 나중에 returnResponseChannel 의 최종 결과를 제공합니다 .
4.3. 메시지 채널
이 섹션에서는 게이트웨이의 초기 메시지 채널인 receiveTextChannel 을 설정하는 방법을 살펴보겠습니다 .
SEDA에서 채널은 연결된 스레드 풀을 통해 확장 가능해야 하므로 스레드 풀을 생성하여 시작하겠습니다.
@Bean("receiveTextChannelThreadPool")
TaskExecutor receiveTextChannelThreadPool() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(5);
executor.setThreadNamePrefix("receive-text-channel-thread-pool");
executor.initialize();
return executor;
}
다음으로 스레드 풀을 사용하여 채널을 만듭니다.
@Bean(name = "receiveTextChannel")
MessageChannel getReceiveTextChannel() {
return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
.get();
}
MessageChannels 는 다양한 유형의 채널을 만드는 데 도움이 되는 Spring 통합 클래스입니다. 여기서는 executor() 메서드를 사용하여 스레드 풀에서 관리하는 채널 인 ExecutorChannel 을 만듭니다.
다른 채널과 스레드 풀은 위와 동일한 방식으로 설정됩니다.
4.4. 문자 수신 단계
채널이 설정되면 단계 구현을 시작할 수 있습니다. 초기 단계를 만들어 보겠습니다.
@Bean
IntegrationFlow receiveText() {
return IntegrationFlows.from(receiveTextChannel)
.channel(splitWordsChannel)
.get();
}
IntegrationFlows 는 흐름의 단계를 나타내는 IntegrationFlow 개체 를 생성하기 위한 유창한 Spring 통합 API입니다 . from() 메서드 는스테이지의 수신 채널을 구성하고 channel() 은 발신 채널을 구성합니다.
이 예에서 스테이지는 게이트웨이의 입력 메시지를 splitWordsChannel 로 전달 합니다. 이 단계는 프로덕션 애플리케이션에서 더 복잡하고 I/O 집약적일 수 있으며 영구 Queue에서 또는 네트워크를 통해 메시지를 읽습니다.
4.5. 단어 분리 단계
다음 단계에서는 입력 문자열 을 문장의 개별 단어의 문자열 배열로 분할하는 단일 책임이 있습니다 .
@Bean
IntegrationFlow splitWords() {
return IntegrationFlows.from(splitWordsChannel)
.transform(splitWordsFunction)
.channel(toLowerCaseChannel)
.get();
}
이전에 사용한 from() 및 channel() 호출 외에도 여기서는 제공된 함수 를 입력 메시지에 적용하는 transform() 도 사용합니다. splitWordsFunction 구현은 매우 간단합니다 .
final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");
4.6. 소문자 단계로 변환
이 단계는 문자열 배열 의 모든 단어 를 소문자로 변환합니다.
@Bean
IntegrationFlow toLowerCase() {
return IntegrationFlows.from(toLowerCaseChannel)
.split()
.transform(toLowerCase)
.aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
.outputProcessor(buildMessageWithListPayload))
.channel(countWordsChannel)
.get();
}
여기서 사용 하는 첫 번째 새 IntegrationFlows 메서드는 split() 입니다. split() 메서드는 splitter 패턴 을 사용하여 입력 메시지의 각 요소를 개별 메시지 로 toLowerCase 에 보냅니다.
우리가 볼 다음 새 메서드는 집계 패턴을 구현하는 aggregate () 입니다. 애그리게이터 패턴 에는 두 가지 필수 인수가 있습니다.
- 메시지를 하나로 결합할 시기를 결정하는 릴리스 전략
- 메시지를 단일 메시지로 결합하는 방법을 결정하는 프로세서
릴리스 전략 함수는 입력 배열의 모든 요소가 수집되었을 때 집계를 시작하도록 집계자에게 지시하는 listSizeReached 를 사용합니다.
final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();
그러면 buildMessageWithListPayload 프로세서가 소문자 결과를 List 로 패키징합니다 .
final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->
MessageBuilder.withPayload(messageGroup.streamMessages()
.map(Message::getPayload)
.toList())
.build();
4.7. 단어 세기 단계
최종 단계에서는 단어 수를 Map 으로 패키징합니다 . 여기서 키는 원래 입력의 단어이고 값은 각 단어의 발생 횟수입니다.
@Bean
IntegrationFlow countWords() {
return IntegrationFlows.from(countWordsChannel)
.transform(convertArrayListToCountMap)
.channel(returnResponseChannel)
.get();
}
여기에서 카운트를 Map 으로 패키징하기 위해 convertArrayListToCountMap 함수를 사용합니다 .
final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
4.8. 흐름 테스트
게이트웨이 메서드에 초기 메시지를 전달하여 흐름을 테스트할 수 있습니다.
public class SpringIntegrationSedaIntegrationTest {
@Autowired
TestGateway testGateway;
@Test
void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
Map<String, Long> actual = testGateway.countWords("My name is Hesam");
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
assertEquals(expected, actual);
}
}
5. Apache Camel을 사용한 솔루션
Apache Camel은 인기 있고 강력한 오픈 소스 통합 프레임워크입니다. 네 가지 기본 개념을 기반으로 합니다.
- Camel 컨텍스트: Camel 런타임은 서로 다른 부분을 결합합니다.
- 경로: 경로는 메시지를 처리하는 방법과 다음으로 이동할 위치를 결정합니다.
- 프로세서: 다양한 엔터프라이즈 통합 패턴을 바로 사용할 수 있는 구현입니다.
- 구성 요소: 구성 요소는 JMS, HTTP, 파일 IO 등을 통해 외부 시스템을 통합하기 위한 확장 지점입니다.
Apache Camel에는 SEDA 기능 전용 구성 요소가 있어 SEDA 애플리케이션을 간단하게 구축할 수 있습니다.
5.1. 의존성
Apache Camel 및 Apache Camel Test 에 필요한 Maven 의존성을 추가해 보겠습니다 .
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>3.18.0</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test-junit5</artifactId>
<version>3.18.0</version>
<scope>test</scope>
</dependency>
</dependencies>
5.2. SEDA Endpoints 정의
먼저 엔드포인트를 정의해야 합니다. Endpoints은 URI 문자열로 정의된 구성 요소입니다. SEDA Endpoints은 " seda:[endpointName] " 으로 시작해야 합니다 .
static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
static final String returnResponse = "mock:result";
보시다시피 각 엔드포인트는 5명의 동시 소비자를 갖도록 구성됩니다. 이는 각 Endpoints에 대해 최대 5개의 스레드를 갖는 것과 같습니다.
테스트를 위해 returnResponse 는 모의 엔드포인트입니다.
5.3. RouteBuilder 확장
다음으로 Apache Camel의 RouteBuilder 를 확장 하고 해당 configure() 메서드를 재정의하는 클래스를 정의해 보겠습니다. 이 클래스는 모든 SEDA Endpoints을 연결합니다.
public class WordCountRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
}
}
다음 섹션에서는 RouteBuilder 에서 상속한 편리한 메서드를 사용하여 이 configure() 메서드에 줄을 추가하여 단계를 정의합니다 .
5.4. 문자 수신 단계
이 단계는 SEDA Endpoints에서 메시지를 수신하고 처리 없이 다음 단계로 라우팅합니다.
from(receiveTextUri).to(splitWordsUri);
여기에서 상속된 from() 메서드를 사용하여 들어오는 Endpoints을 지정하고 to() 를 사용하여 나가는 Endpoints을 설정했습니다.
5.5. 단어 분리 단계
입력 텍스트를 개별 단어로 분할하는 단계를 구현해 보겠습니다.
from(splitWordsUri)
.transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))
.to(toLowerCaseUri);
transform() 메서드는 함수를 입력 메시지에 적용 하여 배열 로 분할합니다.
5.6. 소문자 단계로 변환
다음 작업은 입력의 각 단어를 소문자로 변환하는 것입니다. 변환 함수를 메시지의 각 문자열 과 배열 자체에 적용해야 하기 때문에 split() 메서드를 사용하여 처리를 위해 입력 메시지를 분할하고 나중에 결과를 다시 ArrayList 로 집계합니다 .
from(toLowerCaseUri)
.split(body(), new ArrayListAggregationStrategy())
.transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))
.end()
.to(countWordsUri);
end() 메서드는 분할 프로세스의 끝을 표시합니다 . List의 각 항목이 변환되면 Apache Camel은 우리가 지정한 집계 전략 ArrayListAggregationStrategy 를 적용합니다.
ArrayListAggregationStrategy 는 Apache Camel의 AbstractListAggregationStrategy 를 확장 하여 집계해야 하는 메시지 부분을 정의합니다. 이 경우 메시지 본문은 새로 소문자로 된 단어입니다.
class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
@Override
public String getValue(Exchange exchange) {
return exchange.getIn()
.getBody(String.class);
}
}
5.7. 단어 세기 단계
마지막 단계에서는 변환기를 사용하여 배열을 단어 수와 단어 수의 맵으로 변환합니다.
from(countWordsUri)
.transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
.to(returnResponse);
5.8. 경로 테스트
경로를 테스트해 보겠습니다.
public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
@Test
public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
throws InterruptedException {
Map<String, Long> expected = new HashMap<>();
expected.put("my", 1L);
expected.put("name", 1L);
expected.put("is", 1L);
expected.put("hesam", 1L);
getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");
assertMockEndpointsSatisfied();
}
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
RoutesBuilder wordCountRoute = new WordCountRoute();
return wordCountRoute;
}
}
CamelTestSupport 수퍼클래스는 흐름을 테스트하는 데 도움 이 되는 많은 필드와 메서드를 제공합니다. getMockEndpoint() 및 expectedBodiesReceived() 를 사용하여 예상 결과를 설정하고 template.sendBody() 를 사용하여 모의 엔드포인트에 테스트 데이터를 제출합니다. 마지막으로 assertMockEndpointsSatisfied() 를 사용하여 예상 결과가 실제 결과와 일치하는지 테스트합니다.
6. 결론
이 기사에서는 SEDA와 그 구성 요소 및 사용 사례에 대해 배웠습니다. 이후 SEDA를 사용하여 첫 번째 Spring Integration과 Apache Camel을 사용하여 동일한 문제를 해결하는 방법을 탐색했습니다.
항상 그렇듯이 예제의 소스 코드는 GitHub에서 사용할 수 있습니다 .