1. 소개
Spring Cloud Data Flow는 데이터 통합 및 실시간 데이터 처리 파이프라인을 구축하기 위한 툴킷입니다.
이 경우 파이프라인은 Spring Cloud Stream 또는 Spring Cloud Task 프레임워크를 사용하여 빌드된 Spring Boot 애플리케이션입니다.
이 예제에서는 Apache Spark 와 함께 Spring Cloud Data Flow를 사용하는 방법을 보여줍니다 .
2. 데이터 흐름 로컬 서버
먼저 작업을 배포 하려면 Data Flow Server 를 실행해야 합니다.
Data Flow Server를 로컬에서 실행 하려면 spring-cloud-starter-dataflow-server-local 의존성 을 사용하여 새 프로젝트를 생성해야 합니다 .
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
그런 다음 서버의 기본 클래스에 @EnableDataFlowServer 어노테이션을 추가해야 합니다 .
@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
public static void main(String[] args) {
SpringApplication.run(
SpringDataFlowServerApplication.class, args);
}
}
이 애플리케이션을 실행하면 포트 9393에 로컬 데이터 흐름 서버가 생깁니다.
3. 프로젝트 생성
Spark 작업 을 실행하는 데 클러스터가 필요하지 않도록 독립 실행형 로컬 애플리케이션으로 생성합니다.
3.1. 의존성
먼저 Spark 의존성을 추가합니다 .
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>2.4.0</version>
</dependency>
3.2. 직업 만들기
그리고 우리의 작업을 위해 파이를 근사화해 보겠습니다.
public class PiApproximation {
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
JavaSparkContext context = new JavaSparkContext(conf);
int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;
List<Integer> xs = IntStream.rangeClosed(0, n)
.mapToObj(element -> Integer.valueOf(element))
.collect(Collectors.toList());
JavaRDD<Integer> dataSet = context.parallelize(xs, slices);
JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
double x = Math.random() * 2 - 1;
double y = Math.random() * 2 - 1;
return (x * x + y * y ) < 1 ? 1: 0;
});
int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);
System.out.println("The pi was estimated as:" + count / n);
context.stop();
}
}
4. 데이터 흐름 셸
Data Flow Shell은 서버와 상호 작용할 수 있게 해주는 애플리케이션입니다 . Shell은 DSL 명령을 사용하여 데이터 흐름을 설명합니다.
데이터 흐름 셸을 사용 하려면 실행할 수 있는 프로젝트를 만들어야 합니다. 먼저 spring-cloud-dataflow-shell 의존성 이 필요 합니다 .
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dataflow-shell</artifactId>
<version>1.7.4.RELEASE</version>
</dependency>
의존성을 추가한 후 데이터 흐름 셸을 실행할 클래스를 만들 수 있습니다.
@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
public static void main(String[] args) {
SpringApplication.run(SpringDataFlowShellApplication.class, args);
}
}
5. 프로젝트 배포
프로젝트를 배포하기 위해 Apache Spark에서 cluster , yarn 및 client 의 세 가지 버전으로 사용할 수 있는 태스크 러너를 사용합니다 . 로컬 클라이언트 버전 으로 진행할 예정 입니다.
작업 실행기는 Spark 작업을 실행하는 것입니다.
이를 위해서는 먼저 Data Flow Shell을 사용하여 작업을 등록 해야 합니다 .
app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT
이 작업을 통해 여러 매개변수를 지정할 수 있습니다. 그 중 일부는 선택 사항이지만 일부 매개변수는 Spark 작업을 올바르게 배포하는 데 필요합니다.
- spark.app-class , 제출된 작업의 기본 클래스
- spark.app-jar , 작업이 포함된 fat jar의 경로
- spark.app- name , 작업에 사용할 이름
- spark.app-args , 작업에 전달될 인수
등록된 작업 spark-client 를 사용하여 필수 매개 변수를 제공하는 것을 기억하면서 작업을 제출할 수 있습니다.
task create spark1 --definition "spark-client \
--spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
--spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"
spark.app -jar 는 작업이 포함된 fat-jar의 경로입니다.
작업을 성공적으로 생성한 후 다음 명령을 사용하여 실행할 수 있습니다.
task launch spark1
그러면 작업 실행이 호출됩니다.
6. 요약
이 예제에서는 Spring Cloud Data Flow 프레임워크를 사용하여 Apache Spark로 데이터를 처리하는 방법을 보여주었습니다. Spring Cloud Data Flow 프레임워크에 대한 자세한 내용은 설명서 에서 찾을 수 있습니다 .
모든 코드 샘플 은 GitHub에서 찾을 수 있습니다.