1. 소개

SEDA(Staged Event-Driven Architecture)는 Matt Welsh 박사 가 제안한 건축 스타일입니다 . 논문 . 주요 이점은 확장성, 고도의 동시 트래픽 지원 및 유지 관리 용이성입니다.

이 예제에서는 SEDA를 사용하여 Spring IntegrationApache Camel 이라는 두 가지 개별 구현을 사용하여 문장에서 고유한 단어를 계산합니다 .

2. 세다

SEDA는 온라인 서비스에 특정한 몇 가지 비기능적 요구 사항을 해결합니다 .

  1. 높은 동시성 : 아키텍처는 가능한 많은 동시 요청을 지원해야 합니다.
  2. 동적 콘텐츠 : 소프트웨어 시스템은 종종 복잡한 비즈니스 사용 사례를 지원해야 하며 사용자 요청을 처리하고 응답을 생성하는 데 많은 단계가 필요합니다.
  3. 로드에 대한 견고성 : 온라인 서비스에 대한 사용자 트래픽은 예측할 수 없으며 아키텍처는 트래픽 양의 변화를 원활하게 처리해야 합니다.

이러한 요구 사항을 해결하기 위해 SEDA는 복잡한 서비스를 이벤트 기반 단계로 분해합니다 . 이러한 단계는 Queue과 간접적으로 연결되므로 서로 완전히 분리될 수 있습니다. 또한 각 단계에는 들어오는 로드에 대처하기 위한 확장 메커니즘이 있습니다.

SEDA 개요

Matt Welsh의 논문에 있는 위 다이어그램은 SEDA로 구현된 웹 서버의 전체 구조를 보여줍니다. 각 사각형은 들어오는 HTTP 요청에 대한 단일 처리 단계를 나타냅니다. 스테이지는 수신 Queue에서 작업을 독립적으로 소비하고 일부 처리 또는 I/O 작업을 수행한 후 메시지를 다음 Queue로 전달할 수 있습니다.

2.1. 구성품

SEDA의 구성 요소를 더 잘 이해하기 위해 Matt Welsh의 논문에 있는 이 다이어그램이 단일 단계의 내부 작동을 어떻게 보여주는지 살펴보겠습니다.

세다 스테이지

보시다시피 각 SEDA 단계에는 다음 구성 요소가 있습니다.

  • 이벤트: 이벤트는 스테이지에서 처리를 수행하는 데 필요한 모든 데이터를 포함하는 데이터 구조 입니다. 예를 들어 HTTP 웹 서버의 경우 이벤트에는 본문, 헤더 및 요청 매개변수와 같은 사용자 데이터와 사용자의 IP, 요청 타임스탬프 등과 같은 인프라 데이터가 포함될 수 있습니다.
  • Event Queue : 스테이지의 들어오는 이벤트를 보관합니다.
  • 이벤트 핸들러 : 이벤트 핸들러는 스테이지의 절차적 논리입니다. 이것은 이벤트 큐에서 다른 관련 이벤트 큐로 데이터를 전달하는 단순한 라우팅 단계이거나 어떻게든 데이터를 처리하는 더 복잡한 단계일 수 있습니다. 이벤트 처리기는 이벤트를 개별적으로 또는 일괄적으로 읽을 수 있습니다. 후자는 하나의 쿼리로 여러 데이터베이스 레코드를 업데이트하는 것과 같이 일괄 처리에 성능상의 이점이 있을 때 유용합니다.
  • 발신 이벤트 : 비즈니스 사용 사례 및 흐름의 전체 구조를 기반으로 각 단계에서 새 이벤트를 0개 이상의 이벤트 Queue로 보낼 수 있습니다. 발신 메시지 작성 및 전송은 이벤트 핸들러 메소드에서 수행됩니다.
  • 스레드 풀 : 스레딩은 잘 알려진 동시성 메커니즘입니다. SEDA에서 스레딩은 각 단계에 대해 지역화되고 사용자 정의됩니다. 즉, 각 단계는 스레드 풀을 유지 관리합니다. 따라서 요청당 하나의 스레드 모델과 달리 각 사용자 요청은 SEDA에서 여러 스레드에 의해 처리됩니다. 이 모델을 사용하면 복잡성에 따라 각 단계를 독립적으로 조정할 수 있습니다.
  • 컨트롤러 : SEDA 컨트롤러는 스레드 풀 크기, 이벤트 큐 크기, 스케줄링 등과 같은 리소스 소비를 관리하는 메커니즘입니다. 컨트롤러는 SEDA의 탄력적인 동작을 담당합니다 . 간단한 컨트롤러는 각 스레드 풀의 활성 스레드 수를 관리할 수 있습니다. 보다 정교한 컨트롤러는 런타임 시 전체 애플리케이션을 모니터링하고 다양한 매개변수를 조정하는 복잡한 성능 조정 알고리즘을 구현할 수 있습니다. 또한 컨트롤러는 비즈니스 로직에서 성능 조정 로직을 분리합니다. 관심사를 분리하면 코드를 유지 관리하기가 더 쉬워집니다.

SEDA는 이러한 모든 구성 요소를 결합하여 높고 변동이 심한 트래픽 부하를 처리하기 위한 강력한 솔루션을 제공합니다.

3. 샘플 문제

다음 섹션에서는 SEDA를 사용하여 동일한 문제를 해결하는 두 가지 구현을 생성합니다.

예제 문제는 간단 합니다. 주어진 문자열 내에서 각 단어가 대소문자를 구분하지 않고 나타나는 횟수를 세 십시오.

공백이 없는 일련의 문자로 단어를 정의하고 구두점과 같은 다른 복잡함은 무시합니다. 우리의 출력은 단어를 키로 포함하고 개수를 값으로 포함하는 맵이 될 것입니다. 예를 들어 " 내 이름은 Hesam입니다 "라는 입력이 주어지면 출력은 다음과 같습니다.

{
  "my": 1,
  "name": 1,
  "is": 1,
  "hesam": 1
}

3.1. 문제를 SEDA에 적용

SEDA 단계 측면에서 문제를 살펴보겠습니다. 확장성은 SEDA의 핵심 목표이므로 특히 I/O 집약적인 작업이 있는 경우 특정 작업에 초점을 맞춘 작은 단계를 설계하는 것이 좋습니다 . 게다가 작은 스테이지는 각 스테이지의 규모를 더 잘 조율하는 데 도움이 됩니다.

단어 수 문제를 해결하기 위해 다음 단계로 솔루션을 구현할 수 있습니다.

단어 수 흐름의 예

이제 스테이지 디자인이 있으므로 두 가지 다른 엔터프라이즈 통합 기술을 사용하여 다음 섹션에서 이를 구현해 보겠습니다. 이 표에서 구현 시 SEDA가 어떻게 표시되는지 미리 볼 수 있습니다.

SEDA 구성요소 스프링 통합 아파치 카멜
이벤트
org.springframework.messaging.Message org.apache.camel.Exchange
이벤트 Queue

org.springframework.integration.channel

URI 문자열로 정의된 엔드포인트
이벤트 핸들러
기능적 인터페이스의 인스턴스 Camel 프로세서, Camel 유틸리티 클래스 및 Function s
스레드 풀
TaskExecutor 의 스프링 추상화 SEDA 엔드포인트의 기본 지원

4. Spring Integration을 이용한 솔루션

첫 번째 구현에서는 Spring Integration을 사용합니다. Spring Integration은 인기 있는 엔터프라이즈 통합 패턴을 지원하기 위해 Spring 모델을 기반으로 합니다 .

Spring Integration에는 세 가지 주요 구성 요소가 있습니다.

  1. 메시지는 헤더와 본문을 포함하는 데이터 구조입니다.
  2. 채널은 한 Endpoints에서 다른 Endpoints으로 메시지를 전달합니다. Spring Integration에는 두 가지 종류의 채널이 있습니다.
    • point-to-point: 하나의 Endpoints만 이 채널의 메시지를 소비할 수 있습니다.
    • 게시-구독: 여러 엔드포인트가 이 채널의 메시지를 사용할 수 있습니다.
  3. Endpoints은 일부 비즈니스 논리를 수행하는 응용 프로그램 구성 요소로 메시지를 라우팅합니다. 변환기, 라우터, 서비스 액티베이터 및 필터와 같은 다양한 엔드포인트가 Spring Integration에 있습니다.

Spring 통합 솔루션의 개요를 살펴보겠습니다.

단어 수 EIP 다이어그램

4.1. 의존성

Spring Integration, Spring Boot TestSpring Integration Test 에 대한 의존성을 추가하여 시작하겠습니다 .

<dependencies>
    <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-integration</artifactId>
    </dependency>
    <dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-test</artifactId>
	<scope>test</scope>
    </dependency>
    <dependency>
	<groupId>org.springframework.integration</groupId>
	<artifactId>spring-integration-test</artifactId>
	<scope>test</scope>
    </dependency>
</dependencies>

4.2. 메시지 게이트웨이

메시징 게이트웨이메시지를 통합 플로우로 전송하는 복잡성을 숨기는 프록시입니다. Spring 통합 흐름에 대해 하나를 설정해 보겠습니다.

@MessagingGateway
public interface IncomingGateway {
    @Gateway(requestChannel = "receiveTextChannel", replyChannel = "returnResponseChannel")
    public Map<String, Long> countWords(String input);
}

나중에 이 게이트웨이 메서드를 사용하여 전체 흐름을 테스트할 수 있습니다.

incomingGateway.countWords("My name is Hesam");

Spring 은 org.springframework.messaging.Message 의 인스턴스 내 에서 "My name is Hesam" 입력을 래핑하고 그것을 receiveTextChannel 에 전달하고 나중에 returnResponseChannel 의 최종 결과를 제공합니다 .

4.3. 메시지 채널

이 섹션에서는 게이트웨이의 초기 메시지 채널인 receiveTextChannel 을 설정하는 방법을 살펴보겠습니다 .

SEDA에서 채널은 연결된 스레드 풀을 통해 확장 가능해야 하므로 스레드 풀을 생성하여 시작하겠습니다.

@Bean("receiveTextChannelThreadPool")
TaskExecutor receiveTextChannelThreadPool() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(5);
    executor.setThreadNamePrefix("receive-text-channel-thread-pool");
    executor.initialize();
    return executor;
}

다음으로 스레드 풀을 사용하여 채널을 만듭니다.

@Bean(name = "receiveTextChannel")
MessageChannel getReceiveTextChannel() {
    return MessageChannels.executor("receive-text", receiveTextChannelThreadPool)
      .get();
}

MessageChannels 는 다양한 유형의 채널을 만드는 데 도움이 되는 Spring 통합 클래스입니다. 여기서는 executor() 메서드를 사용하여  스레드 풀에서 관리하는 채널 인 ExecutorChannel 을 만듭니다.

다른 채널과 스레드 풀은 위와 동일한 방식으로 설정됩니다.

4.4. 문자 수신 단계

채널이 설정되면 단계 구현을 시작할 수 있습니다. 초기 단계를 만들어 보겠습니다.

@Bean
IntegrationFlow receiveText() {
    return IntegrationFlows.from(receiveTextChannel)
      .channel(splitWordsChannel)
      .get();
}

IntegrationFlows 는 흐름의 단계를 나타내는 IntegrationFlow 개체 를 생성하기 위한 유창한 Spring 통합 API입니다  . from() 메서드 는스테이지의 수신 채널을 구성하고 channel() 은 발신 채널을 구성합니다.

이 예에서 스테이지는 게이트웨이의 입력 메시지를 splitWordsChannel 로 전달 합니다. 이 단계는 프로덕션 애플리케이션에서 더 복잡하고 I/O 집약적일 수 있으며 영구 Queue에서 또는 네트워크를 통해 메시지를 읽습니다.

4.5. 단어 분리 단계

다음 단계에서는 입력 문자열 을  문장의 개별 단어의 문자열 배열로 분할하는 단일 책임이 있습니다 .

@Bean
IntegrationFlow splitWords() {
    return IntegrationFlows.from(splitWordsChannel)
      .transform(splitWordsFunction)
      .channel(toLowerCaseChannel)
      .get();
}

이전에 사용한 from()channel() 호출 외에도 여기서는 제공된 함수 를 입력 메시지에 적용하는 transform() 도 사용합니다. splitWordsFunction  구현은 매우 간단합니다 .

final Function<String, String[]> splitWordsFunction = sentence -> sentence.split(" ");

4.6. 소문자 단계로 변환

이 단계는 문자열 배열 의 모든 단어 를 소문자로 변환합니다.

@Bean
IntegrationFlow toLowerCase() {
    return IntegrationFlows.from(toLowerCaseChannel)
      .split()
      .transform(toLowerCase)
      .aggregate(aggregatorSpec -> aggregatorSpec.releaseStrategy(listSizeReached)
        .outputProcessor(buildMessageWithListPayload))
      .channel(countWordsChannel)
      .get();
}

여기서 사용 하는 첫 번째 새 IntegrationFlows 메서드는 split() 입니다. split() 메서드는 splitter 패턴 을 사용하여 입력 메시지의 각 요소를  개별 메시지 로 toLowerCase 에 보냅니다.

우리가 볼 다음 새 메서드는 집계 패턴을 구현하는 aggregate  () 입니다. 애그리게이터 패턴 에는 두 가지 필수 인수가 있습니다.

  1. 메시지를 하나로 결합할 시기를 결정하는 릴리스 전략
  2. 메시지를 단일 메시지로 결합하는 방법을 결정하는 프로세서

릴리스 전략 함수는  입력 배열의 모든 요소가 수집되었을 때 집계를 시작하도록 집계자에게 지시하는 listSizeReached 를 사용합니다.

final ReleaseStrategy listSizeReached = r -> r.size() == r.getSequenceSize();

그러면 buildMessageWithListPayload 프로세서가 소문자 결과를 List 로 패키징합니다 .

final MessageGroupProcessor buildMessageWithListPayload = messageGroup ->
  MessageBuilder.withPayload(messageGroup.streamMessages()
      .map(Message::getPayload)
      .toList())
    .build();

4.7. 단어 세기 단계

최종 단계에서는 단어 수를 Map 으로 패키징합니다 . 여기서 키는 원래 입력의 단어이고 값은 각 단어의 발생 횟수입니다.

@Bean
IntegrationFlow countWords() {
    return IntegrationFlows.from(countWordsChannel)
      .transform(convertArrayListToCountMap)
      .channel(returnResponseChannel)
      .get();
}

여기에서 카운트를 Map 으로 패키징하기 위해 convertArrayListToCountMap 함수를 사용합니다 .

final Function<List<String>, Map<String, Long>> convertArrayListToCountMap = list -> list.stream()
  .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));

4.8. 흐름 테스트

게이트웨이 메서드에 초기 메시지를 전달하여 흐름을 테스트할 수 있습니다.

public class SpringIntegrationSedaIntegrationTest {
    @Autowired
    TestGateway testGateway;

    @Test
    void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenCallingCountWordOnGateway_thenWordCountReturnedAsMap() {
        Map<String, Long> actual = testGateway.countWords("My name is Hesam");
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);

        assertEquals(expected, actual);
    }
}

5. Apache Camel을 사용한 솔루션

Apache Camel은 인기 있고 강력한 오픈 소스 통합 프레임워크입니다. 네 가지 기본 개념을 기반으로 합니다.

  1. Camel 컨텍스트: Camel 런타임은 서로 다른 부분을 결합합니다.
  2. 경로: 경로는 메시지를 처리하는 방법과 다음으로 이동할 위치를 결정합니다.
  3. 프로세서: 다양한 엔터프라이즈 통합 패턴을 바로 사용할 수 있는 구현입니다.
  4. 구성 요소: 구성 요소는 JMS, HTTP, 파일 IO 등을 통해 외부 시스템을 통합하기 위한 확장 지점입니다.

Apache Camel에는 SEDA 기능 전용 구성 요소가 있어 SEDA 애플리케이션을 간단하게 구축할 수 있습니다.

5.1. 의존성

Apache Camel  및 Apache Camel Test 에 필요한 Maven 의존성을 추가해 보겠습니다 .

<dependencies>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-core</artifactId>
        <version>3.18.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.camel</groupId>
        <artifactId>camel-test-junit5</artifactId>
        <version>3.18.0</version>
        <scope>test</scope>
    </dependency>
</dependencies>

5.2. SEDA Endpoints 정의

먼저 엔드포인트를 정의해야 합니다. Endpoints은 URI 문자열로 정의된 구성 요소입니다. SEDA Endpoints은 " seda:[endpointName] " 으로 시작해야 합니다 .

static final String receiveTextUri = "seda:receiveText?concurrentConsumers=5";
static final String splitWordsUri = "seda:splitWords?concurrentConsumers=5";
static final String toLowerCaseUri = "seda:toLowerCase?concurrentConsumers=5";
static final String countWordsUri = "seda:countWords?concurrentConsumers=5";
static final String returnResponse = "mock:result";

보시다시피 각 엔드포인트는 5명의 동시 소비자를 갖도록 구성됩니다. 이는 각 Endpoints에 대해 최대 5개의 스레드를 갖는 것과 같습니다.

테스트를 위해 returnResponse 는 모의 엔드포인트입니다.

5.3. RouteBuilder 확장

다음으로 Apache Camel의 RouteBuilder 를 확장 하고 해당 configure() 메서드를 재정의하는 클래스를 정의해 보겠습니다. 이 클래스는 모든 SEDA Endpoints을 연결합니다.

public class WordCountRoute extends RouteBuilder {
    @Override
    public void configure() throws Exception {
    }
}

다음 섹션에서는 RouteBuilder 에서 상속한 편리한 메서드를 사용하여 이 configure() 메서드에 줄을 추가하여 단계를 정의합니다 .

5.4. 문자 수신 단계

이 단계는 SEDA Endpoints에서 메시지를 수신하고 처리 없이 다음 단계로 라우팅합니다.

from(receiveTextUri).to(splitWordsUri);

여기에서 상속된 from() 메서드를 사용하여 들어오는 Endpoints을 지정하고 to() 를 사용하여 나가는 Endpoints을 설정했습니다.

5.5. 단어 분리 단계

입력 텍스트를 개별 단어로 분할하는 단계를 구현해 보겠습니다.

from(splitWordsUri)
  .transform(ExpressionBuilder.bodyExpression(s -> s.toString().split(" ")))
  .to(toLowerCaseUri);

transform() 메서드는 함수를 입력 메시지에 적용 하여  배열 로 분할합니다.

5.6. 소문자 단계로 변환

다음 작업은 입력의 각 단어를 소문자로 변환하는 것입니다. 변환 함수를 메시지의 각 문자열 과 배열 자체에 적용해야 하기 때문에  split() 메서드를 사용하여 처리를 위해 입력 메시지를 분할하고 나중에 결과를 다시 ArrayList 로 집계합니다 .

from(toLowerCaseUri)
  .split(body(), new ArrayListAggregationStrategy())
  .transform(ExpressionBuilder.bodyExpression(body -> body.toString().toLowerCase()))
  .end()
  .to(countWordsUri);

end() 메서드는 분할 프로세스의 끝을 표시합니다 . List의 각 항목이 변환되면 Apache Camel은 우리가 지정한 집계 전략 ArrayListAggregationStrategy 를 적용합니다.

ArrayListAggregationStrategy 는 Apache Camel의 AbstractListAggregationStrategy 를 확장 하여 집계해야 하는 메시지 부분을 정의합니다. 이 경우 메시지 본문은 새로 소문자로 된 단어입니다.

class ArrayListAggregationStrategy extends AbstractListAggregationStrategy<String> {
    @Override
    public String getValue(Exchange exchange) {
        return exchange.getIn()
          .getBody(String.class);
    }
}

5.7. 단어 세기 단계

마지막 단계에서는 변환기를 사용하여 배열을 단어 수와 단어 수의 맵으로 변환합니다.

from(countWordsUri)
  .transform(ExpressionBuilder.bodyExpression(List.class, body -> body.stream()
    .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))))
  .to(returnResponse);

5.8. 경로 테스트

경로를 테스트해 보겠습니다.

public class ApacheCamelSedaIntegrationTest extends CamelTestSupport {
    @Test
    public void givenTextWithCapitalAndSmallCaseAndWithoutDuplicateWords_whenSendingTextToInputUri_thenWordCountReturnedAsMap()
      throws InterruptedException {
        Map<String, Long> expected = new HashMap<>();
        expected.put("my", 1L);
        expected.put("name", 1L);
        expected.put("is", 1L);
        expected.put("hesam", 1L);
        getMockEndpoint(WordCountRoute.returnResponse).expectedBodiesReceived(expected);
        template.sendBody(WordCountRoute.receiveTextUri, "My name is Hesam");

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RoutesBuilder createRouteBuilder() throws Exception {
        RoutesBuilder wordCountRoute = new WordCountRoute();
        return wordCountRoute;
    }
}

CamelTestSupport 수퍼클래스는 흐름을 테스트하는 데 도움 이 되는 많은 필드와 메서드를 제공합니다. getMockEndpoint()expectedBodiesReceived() 를 사용하여 예상 결과를 설정하고 template.sendBody() 를 사용하여 모의 엔드포인트에 테스트 데이터를 제출합니다. 마지막으로 assertMockEndpointsSatisfied() 를 사용하여 예상 결과가 실제 결과와 일치하는지 테스트합니다.

6. 결론

이 기사에서는 SEDA와 그 구성 요소 및 사용 사례에 대해 배웠습니다. 이후 SEDA를 사용하여 첫 번째 Spring Integration과 Apache Camel을 사용하여 동일한 문제를 해결하는 방법을 탐색했습니다.

항상 그렇듯이 예제의 소스 코드는 GitHub에서 사용할 수 있습니다 .

Generic footer banner