1. 개요
이 기사에서는 Netflix에서 개발한 Mantis 플랫폼을 살펴보겠습니다.
스트림 처리 작업을 생성, 실행 및 조사하여 주요 Mantis 개념을 탐색합니다.
2. 사마귀는 무엇입니까?
Mantis는 스트림 처리 애플리케이션 (작업) 을 구축하기 위한 플랫폼입니다 . 작업의 배포 및 수명 주기 를 관리 하는 쉬운 방법을 제공합니다 . 또한 이러한 작업 간의 리소스 할당, 검색 및 통신을 용이하게 합니다.
따라서 개발자는 대용량, 짧은 대기 시간, 비차단 애플리케이션을 실행하기 위한 강력하고 확장 가능한 플랫폼을 지원하면서 실제 비즈니스 로직에 집중할 수 있습니다 .
Mantis 작업은 다음 세 부분으로 구성됩니다.
- 소스 외부 소스에서 데이터를 검색하기위한 책임,
- 들어오는 이벤트 스트림 처리를 담당하는 하나 이상의 단계
- 처리된 데이터를 수집 하는 싱크
이제 각각을 살펴보겠습니다.
3. 설정 및 의존성
mantis-runtime 및 jackson-databind 의존성 을 추가하여 시작하겠습니다 .
<dependency>
<groupId>io.mantisrx</groupId>
<artifactId>mantis-runtime</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
이제 작업의 데이터 소스를 설정하기 위해 Mantis Source 인터페이스를 구현해 보겠습니다 .
public class RandomLogSource implements Source<String> {
@Override
public Observable<Observable<String>> call(Context context, Index index) {
return Observable.just(
Observable
.interval(250, TimeUnit.MILLISECONDS)
.map(this::createRandomLogEvent));
}
private String createRandomLogEvent(Long tick) {
// generate a random log entry string
...
}
}
보시다시피 초당 여러 번 랜덤의 로그 항목을 생성합니다.
4. 첫 직장
이제 RandomLogSource 에서 단순히 로그 이벤트를 수집하는 Mantis 작업을 생성해 보겠습니다 . 나중에 더 복잡하고 흥미로운 결과를 위해 그룹 및 집계 변환을 추가할 것입니다.
먼저 LogEvent 엔터티를 생성해 보겠습니다 .
public class LogEvent implements JsonType {
private Long index;
private String level;
private String message;
// ...
}
그런 다음 TransformLogStage를 추가해 보겠습니다 .
ScalarComputation 인터페이스를 구현하고 로그 항목을 분할하여 LogEvent 를 빌드하는 간단한 단계입니다 . 또한 잘못된 형식의 문자열을 필터링합니다.
public class TransformLogStage implements ScalarComputation<String, LogEvent> {
@Override
public Observable<LogEvent> call(Context context, Observable<String> logEntry) {
return logEntry
.map(log -> log.split("#"))
.filter(parts -> parts.length == 3)
.map(LogEvent::new);
}
}
4.1. 작업 실행
이 시점에서 Mantis 작업을 구성하는 데 필요한 구성 요소가 충분합니다.
public class LogCollectingJob extends MantisJobProvider<LogEvent> {
@Override
public Job<LogEvent> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), new ScalarToScalar.Config<>())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogEvent::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
우리의 직업을 자세히 살펴봅시다.
보시다시피 MantisJobProvider를 확장 합니다. 처음에는 RandomLogSource 에서 데이터를 가져오고 가져온 데이터에 TransformLogStage 를 적용합니다 . 마지막으로 처리된 데이터를 SSE를 통해 데이터를 적극적으로 구독하고 전달하는 내장 싱크로 보냅니다 .
이제 시작 시 로컬에서 실행되도록 작업을 구성해 보겠습니다.
@SpringBootApplication
public class MantisApplication implements CommandLineRunner {
// ...
@Override
public void run(String... args) {
LocalJobExecutorNetworked.execute(new LogCollectingJob().getJobInstance());
}
}
응용 프로그램을 실행해 보겠습니다. 다음과 같은 로그 메시지가 표시됩니다.
...
Serving modern HTTP SSE server sink on port: 86XX
이제 curl을 사용하여 싱크대에 연결해 보겠습니다 .
$ curl localhost:86XX
data: {"index":86,"level":"WARN","message":"login attempt"}
data: {"index":87,"level":"ERROR","message":"user created"}
data: {"index":88,"level":"INFO","message":"user created"}
data: {"index":89,"level":"INFO","message":"login attempt"}
data: {"index":90,"level":"INFO","message":"user created"}
data: {"index":91,"level":"ERROR","message":"user created"}
data: {"index":92,"level":"WARN","message":"login attempt"}
data: {"index":93,"level":"INFO","message":"user created"}
...
4.2. 싱크 구성
지금까지 처리된 데이터를 수집하기 위해 내장 싱크를 사용했습니다. 사용자 지정 싱크를 제공하여 시나리오에 더 많은 유연성을 추가할 수 있는지 봅시다 .
예를 들어 메시지 별로 로그를 필터링하려면 어떻게 합니까?
Sink<LogEvent> 인터페이스 를 구현 하는 LogSink 를 만들어 보겠습니다 .
public class LogSink implements Sink<LogEvent> {
@Override
public void call(Context context, PortRequest portRequest, Observable<LogEvent> logEventObservable) {
SelfDocumentingSink<LogEvent> sink = new ServerSentEventsSink.Builder<LogEvent>()
.withEncoder(LogEvent::toJsonString)
.withPredicate(filterByLogMessage())
.build();
logEventObservable.subscribe();
sink.call(context, portRequest, logEventObservable);
}
private Predicate<LogEvent> filterByLogMessage() {
return new Predicate<>("filter by message",
parameters -> {
if (parameters != null && parameters.containsKey("filter")) {
return logEvent -> logEvent.getMessage().contains(parameters.get("filter").get(0));
}
return logEvent -> true;
});
}
}
이 싱크 구현에서, 우리는 사용하는 술어 구성 필터 의 텍스트 세트가 포함 로그 검색에만 매개 변수를 필터 매개 변수를 :
$ curl localhost:8874?filter=login
data: {"index":93,"level":"ERROR","message":"login attempt"}
data: {"index":95,"level":"INFO","message":"login attempt"}
data: {"index":97,"level":"ERROR","message":"login attempt"}
...
참고 Mantis는 SQL 방식으로 스트림 데이터를 쿼리, 변환 및 분석하는 데 사용할 수 있는 강력한 쿼리 언어인 MQL 도 제공합니다 .
5. 스테이지 체이닝
이제 주어진 시간 간격에 얼마나 많은 ERROR , WARN 또는 INFO 로그 항목이 있는지 알고 싶다고 가정해 보겠습니다 . 이를 위해 작업에 두 단계를 더 추가하고 함께 연결합니다.
5.1. 그룹화
먼저 GroupLogStage를 생성해 보겠습니다 .
이 단계는 기존 TransformLogStage 에서 LogEvent 스트림 데이터 를 수신 하는 ToGroupComputation 구현입니다 . 그런 다음 로깅 수준별로 항목을 그룹화하고 다음 단계로 보냅니다.
public class GroupLogStage implements ToGroupComputation<LogEvent, String, LogEvent> {
@Override
public Observable<MantisGroup<String, LogEvent>> call(Context context, Observable<LogEvent> logEvent) {
return logEvent.map(log -> new MantisGroup<>(log.getLevel(), log));
}
public static ScalarToGroup.Config<LogEvent, String, LogEvent> config(){
return new ScalarToGroup.Config<LogEvent, String, LogEvent>()
.description("Group event data by level")
.codec(JacksonCodecs.pojo(LogEvent.class))
.concurrentInput();
}
}
또한 설명, 출력 직렬화에 사용할 코덱을 제공하여 사용자 지정 단계 구성을 생성했으며, concurrentInput() 을 사용하여 이 단계의 호출 메서드를 동시에 실행할 수 있도록 했습니다 .
한 가지 주목해야 할 점은 이 단계가 수평으로 확장 가능하다는 것입니다. 이 단계의 인스턴스를 필요한 만큼 실행할 수 있음을 의미합니다. 또한 언급할 가치가 있는 것은 Mantis 클러스터에 배포된 경우 이 단계에서 데이터를 다음 단계로 전송하여 특정 그룹에 속한 모든 이벤트가 다음 단계의 동일한 작업자에 도달하도록 한다는 점입니다.
5.2. 집계
다음 단계로 넘어가기 전에 먼저 LogAggregate 엔터티를 추가해 보겠습니다 .
public class LogAggregate implements JsonType {
private final Integer count;
private final String level;
}
이제 체인의 마지막 단계를 생성해 보겠습니다.
이 단계는 GroupToScalarComputation을 구현 하고 로그 그룹 스트림을 스칼라 LogAggregate로 변환 합니다. 각 유형의 로그가 스트림에 나타나는 횟수를 계산하여 이를 수행합니다. 또한 집계 창의 크기를 제어하는 데 사용할 수 있는 LogAggregationDuration 매개변수 도 있습니다.
public class CountLogStage implements GroupToScalarComputation<String, LogEvent, LogAggregate> {
private int duration;
@Override
public void init(Context context) {
duration = (int)context.getParameters().get("LogAggregationDuration", 1000);
}
@Override
public Observable<LogAggregate> call(Context context, Observable<MantisGroup<String, LogEvent>> mantisGroup) {
return mantisGroup
.window(duration, TimeUnit.MILLISECONDS)
.flatMap(o -> o.groupBy(MantisGroup::getKeyValue)
.flatMap(group -> group.reduce(0, (count, value) -> count = count + 1)
.map((count) -> new LogAggregate(count, group.getKey()))
));
}
public static GroupToScalar.Config<String, LogEvent, LogAggregate> config(){
return new GroupToScalar.Config<String, LogEvent, LogAggregate>()
.description("sum events for a log level")
.codec(JacksonCodecs.pojo(LogAggregate.class))
.withParameters(getParameters());
}
public static List<ParameterDefinition<?>> getParameters() {
List<ParameterDefinition<?>> params = new ArrayList<>();
params.add(new IntParameter()
.name("LogAggregationDuration")
.description("window size for aggregation in milliseconds")
.validator(Validators.range(100, 10000))
.defaultValue(5000)
.build());
return params;
}
}
5.3. 작업 구성 및 실행
이제 작업을 구성하는 일만 남았습니다.
public class LogAggregationJob extends MantisJobProvider<LogAggregate> {
@Override
public Job<LogAggregate> getJobInstance() {
return MantisJob
.source(new RandomLogSource())
.stage(new TransformLogStage(), TransformLogStage.stageConfig())
.stage(new GroupLogStage(), GroupLogStage.config())
.stage(new CountLogStage(), CountLogStage.config())
.sink(Sinks.eagerSubscribe(Sinks.sse(LogAggregate::toJsonString)))
.metadata(new Metadata.Builder().build())
.create();
}
}
애플리케이션을 실행하고 새 작업을 실행하자마자 몇 초마다 로그 카운트가 검색되는 것을 볼 수 있습니다.
$ curl localhost:8133
data: {"count":3,"level":"ERROR"}
data: {"count":13,"level":"INFO"}
data: {"count":4,"level":"WARN"}
data: {"count":8,"level":"ERROR"}
data: {"count":5,"level":"INFO"}
data: {"count":7,"level":"WARN"}
...
6. 결론
요약하자면 이 기사에서는 Netflix Mantis가 무엇이며 무엇에 사용할 수 있는지 살펴보았습니다. 또한 주요 개념을 살펴보고 이를 사용하여 작업을 구축하고 다양한 시나리오에 대한 사용자 지정 구성을 탐색했습니다.