1. 개요

이 기사에서는 Netflix에서 개발한 Mantis 플랫폼을 살펴보겠습니다.

스트림 처리 작업을 생성, 실행 및 조사하여 주요 Mantis 개념을 탐색합니다.

2. 사마귀는 무엇입니까?

Mantis는 스트림 처리 애플리케이션 (작업) 을 구축하기 위한 플랫폼입니다 . 작업의 배포 및 수명 주기관리 하는 쉬운 방법을 제공합니다 . 또한 이러한 작업 간의 리소스 할당, 검색 및 통신을 용이하게 합니다.

따라서 개발자는 대용량, 짧은 대기 시간, 비차단 애플리케이션을 실행하기 위한 강력하고 확장 가능한 플랫폼을 지원하면서 실제 비즈니스 로직에 집중할 수 있습니다 .

Mantis 작업은 다음 세 부분으로 구성됩니다.

  • 소스 외부 소스에서 데이터를 검색하기위한 책임,
  • 들어오는 이벤트 스트림 처리를 담당하는 하나 이상의 단계
  • 처리된 데이터를 수집 하는 싱크

이제 각각을 살펴보겠습니다.

3. 설정 및 의존성

mantis-runtimejackson-databind 의존성 을 추가하여 시작하겠습니다 .

<dependency>
    <groupId>io.mantisrx</groupId>
    <artifactId>mantis-runtime</artifactId>
</dependency>

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
</dependency>

이제 작업의 데이터 소스를 설정하기 위해 Mantis Source 인터페이스를 구현해 보겠습니다 .

public class RandomLogSource implements Source<String> {

    @Override
    public Observable<Observable<String>> call(Context context, Index index) {
        return Observable.just(
          Observable
            .interval(250, TimeUnit.MILLISECONDS)
            .map(this::createRandomLogEvent));
    }

    private String createRandomLogEvent(Long tick) {
        // generate a random log entry string
        ...
    }

}

보시다시피 초당 여러 번 랜덤의 로그 항목을 생성합니다.

4. 첫 직장

이제 RandomLogSource 에서 단순히 로그 이벤트를 수집하는 Mantis 작업을 생성해 보겠습니다 . 나중에 더 복잡하고 흥미로운 결과를 위해 그룹 및 집계 변환을 추가할 것입니다.

먼저 LogEvent 엔터티를 생성해 보겠습니다 .

public class LogEvent implements JsonType {
    private Long index;
    private String level;
    private String message;

    // ...
}

그런 다음 TransformLogStage를 추가해 보겠습니다 .

ScalarComputation 인터페이스를 구현하고 로그 항목을 분할하여 LogEvent 를 빌드하는 간단한 단계입니다 . 또한 잘못된 형식의 문자열을 필터링합니다.

public class TransformLogStage implements ScalarComputation<String, LogEvent> {

    @Override
    public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
        return logEntry
          .map(log -> log.split("#"))
          .filter(parts -> parts.length == 3)
          .map(LogEvent::new);
    }

}

4.1. 작업 실행

이 시점에서 Mantis 작업을 구성하는 데 필요한 구성 요소가 충분합니다.

public class LogCollectingJob extends MantisJobProvider<LogEvent> {

    @Override
    public Job<LogEvent> getJobInstance() {
        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), new ScalarToScalar.Config<>())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }

}

우리의 직업을 자세히 살펴봅시다.

보시다시피 MantisJobProvider를 확장 합니다. 처음에는 RandomLogSource 에서 데이터를 가져오고 가져온 데이터에 TransformLogStage적용합니다 . 마지막으로 처리된 데이터를 SSE를 통해 데이터를 적극적으로 구독하고 전달하는 내장 싱크로 보냅니다 .

이제 시작 시 로컬에서 실행되도록 작업을 구성해 보겠습니다.

@SpringBootApplication
public class MantisApplication implements CommandLineRunner {

    // ...
 
    @Override
    public void run(String... args) {
        LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
    }
}

응용 프로그램을 실행해 보겠습니다. 다음과 같은 로그 메시지가 표시됩니다.

...
Serving modern HTTP SSE server sink on port: 86XX

이제 curl을 사용하여 싱크대에 연결해 보겠습니다 .

$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...

4.2. 싱크 구성

지금까지 처리된 데이터를 수집하기 위해 내장 싱크를 사용했습니다. 사용자 지정 싱크를 제공하여 시나리오에 더 많은 유연성을 추가할 수 있는지 봅시다 .

예를 들어 메시지 별로 로그를 필터링하려면 어떻게 합니까?

Sink<LogEvent> 인터페이스 를 구현 하는 LogSink만들어 보겠습니다 .

public class LogSink implements Sink<LogEvent> {
    @Override
    public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
        SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
          .withEncoder(LogEvent::toJsonString)
          .withPredicate(filterByLogMessage())
          .build();
        logEventObservable.subscribe();
        sink.call(context, portRequest, logEventObservable);
    }
    private Predicate<LogEvent> filterByLogMessage() {
        return new Predicate<>("filter by message",
          parameters -> {
            if (parameters != null && parameters.containsKey("filter")) {
                return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
            }
            return logEvent -> true;
        });
    }
}

이 싱크 구현에서, 우리는 사용하는 술어 구성 필터 의 텍스트 세트가 포함 로그 검색에만 매개 변수를 필터 매개 변수를 :

$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...

참고 Mantis는 SQL 방식으로 스트림 데이터를 쿼리, 변환 및 분석하는 데 사용할 수 있는 강력한 쿼리 언어인 MQL 도 제공합니다 .

5. 스테이지 체이닝

이제 주어진 시간 간격에 얼마나 많은 ERROR , WARN 또는 INFO 로그 항목이 있는지 알고 싶다고 가정해 보겠습니다 . 이를 위해 작업에 두 단계를 더 추가하고 함께 연결합니다.

5.1. 그룹화

먼저 GroupLogStage를 생성해 보겠습니다 .

이 단계는 기존 TransformLogStage 에서 LogEvent 스트림 데이터 를 수신 하는 ToGroupComputation 구현입니다 . 그런 다음 로깅 수준별로 항목을 그룹화하고 다음 단계로 보냅니다.

public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {

    @Override
    public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
        return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
    }

    public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
        return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
          .description("Group event data by level")
          .codec(JacksonCodecs.pojo(LogEvent.class))
          .concurrentInput();
    }
    
}

또한 설명, 출력 직렬화에 사용할 코덱을 제공하여 사용자 지정 단계 구성을 생성했으며,   concurrentInput() 을 사용하여 이 단계의 호출 메서드를 동시에 실행할 수 있도록 했습니다 .

한 가지 주목해야 할 점은 이 단계가 수평으로 확장 가능하다는 것입니다. 이 단계의 인스턴스를 필요한 만큼 실행할 수 있음을 의미합니다. 또한 언급할 가치가 있는 것은 Mantis 클러스터에 배포된 경우 이 단계에서 데이터를 다음 단계로 전송하여 특정 그룹에 속한 모든 이벤트가 다음 단계의 동일한 작업자에 도달하도록 한다는 점입니다.

5.2. 집계

다음 단계로 넘어가기 전에 먼저 LogAggregate 엔터티를 추가해 보겠습니다 .

public class LogAggregate implements JsonType {

    private final Integer count;
    private final String level;

}

이제 체인의 마지막 단계를 생성해 보겠습니다.

이 단계는 GroupToScalarComputation을 구현 하고 로그 그룹 스트림을 스칼라 LogAggregate로 변환 합니다. 각 유형의 로그가 스트림에 나타나는 횟수를 계산하여 이를 수행합니다. 또한 집계 창의 크기를 제어하는 ​​데 사용할 수 있는 LogAggregationDuration 매개변수 있습니다.

public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {

    private int duration;

    @Override
    public void init(Context context) {
        duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
    }

    @Override
    public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
        return mantisGroup
          .window(duration, TimeUnit.MILLISECONDS)
          .flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
            .flatMap(group -> group.reduce(0, (count, value) ->  count = count + 1)
              .map((count) -> new LogAggregate(count, group.getKey()))
            ));
    }

    public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
        return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
          .description("sum events for a log level")
          .codec(JacksonCodecs.pojo(LogAggregate.class))
          .withParameters(getParameters());
    }

    public static List<ParameterDefinition<?>> getParameters() {
        List<ParameterDefinition<?>> params = new ArrayList<>();

        params.add(new IntParameter()
          .name("LogAggregationDuration")
          .description("window size for aggregation in milliseconds")
          .validator(Validators.range(100, 10000))
          .defaultValue(5000)
          .build());

        return params;
    }
    
}

5.3. 작업 구성 및 실행

이제 작업을 구성하는 일만 남았습니다.

public class LogAggregationJob extends MantisJobProvider<LogAggregate> {

    @Override
    public Job<LogAggregate> getJobInstance() {

        return MantisJob
          .source(new RandomLogSource())
          .stage(new TransformLogStage(), TransformLogStage.stageConfig())
          .stage(new GroupLogStage(), GroupLogStage.config())
          .stage(new CountLogStage(), CountLogStage.config())
          .sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
          .metadata(new Metadata.Builder().build())
          .create();
    }
}

애플리케이션을 실행하고 새 작업을 실행하자마자 몇 초마다 로그 카운트가 검색되는 것을 볼 수 있습니다.

$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}

data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...

6. 결론

요약하자면 이 기사에서는 Netflix Mantis가 무엇이며 무엇에 사용할 수 있는지 살펴보았습니다. 또한 주요 개념을 살펴보고 이를 사용하여 작업을 구축하고 다양한 시나리오에 대한 사용자 지정 구성을 탐색했습니다.

항상 그렇듯이 전체 코드는 GitHub 에서 사용할 수 있습니다 .

Generic footer banner