1. 소개

Spring Cloud Data Flow는 데이터 통합 ​​및 실시간 데이터 처리 파이프라인을 구축하기 위한 툴킷입니다.  

이 경우 파이프라인은 Spring Cloud Stream  또는 Spring Cloud Task 프레임워크를 사용하여 빌드된 Spring Boot 애플리케이션입니다.

이 예제에서는 Apache Spark 와 함께 Spring Cloud Data Flow를 사용하는 방법을 보여줍니다 .

2. 데이터 흐름 로컬 서버

먼저 작업을 배포 하려면 Data Flow Server 를 실행해야 합니다.

Data Flow Server를 로컬에서 실행 하려면 spring-cloud-starter-dataflow-server-local 의존성 을 사용하여 새 프로젝트를 생성해야 합니다 .

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-dataflow-server-local</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

그런 다음 서버의 기본 클래스에 @EnableDataFlowServer 어노테이션을 추가해야 합니다 .

@EnableDataFlowServer
@SpringBootApplication
public class SpringDataFlowServerApplication {
 
    public static void main(String[] args) {
        SpringApplication.run(
          SpringDataFlowServerApplication.class, args);
    }
}

이 애플리케이션을 실행하면 포트 9393에 로컬 데이터 흐름 서버가 생깁니다.

3. 프로젝트 생성

Spark 작업실행하는 데 클러스터가 필요하지 않도록 독립 실행형 로컬 애플리케이션으로 생성합니다.

3.1. 의존성

먼저 Spark 의존성을 추가합니다 .

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.10</artifactId>
    <version>2.4.0</version>
</dependency>

3.2. 직업 만들기

그리고 우리의 작업을 위해 파이를 근사화해 보겠습니다.

public class PiApproximation {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("BaeldungPIApproximation");
        JavaSparkContext context = new JavaSparkContext(conf);
        int slices = args.length >= 1 ? Integer.valueOf(args[0]) : 2;
        int n = (100000L * slices) > Integer.MAX_VALUE ? Integer.MAX_VALUE : 100000 * slices;

        List<Integer> xs = IntStream.rangeClosed(0, n)
          .mapToObj(element -> Integer.valueOf(element))
          .collect(Collectors.toList());

        JavaRDD<Integer> dataSet = context.parallelize(xs, slices);

        JavaRDD<Integer> pointsInsideTheCircle = dataSet.map(integer -> {
           double x = Math.random() * 2 - 1;
           double y = Math.random() * 2 - 1;
           return (x * x + y * y ) < 1 ? 1: 0;
        });

        int count = pointsInsideTheCircle.reduce((integer, integer2) -> integer + integer2);

        System.out.println("The pi was estimated as:" + count / n);

        context.stop();
    }
}

4. 데이터 흐름 셸

Data Flow Shell은 서버와 상호 작용할 수 있게 해주는 애플리케이션입니다 . Shell은 DSL 명령을 사용하여 데이터 흐름을 설명합니다.

데이터 흐름 셸을 사용 하려면 실행할 수 있는 프로젝트를 만들어야 합니다. 먼저 spring-cloud-dataflow-shell 의존성 이 필요 합니다 .

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dataflow-shell</artifactId>
    <version>1.7.4.RELEASE</version>
</dependency>

의존성을 추가한 후 데이터 흐름 셸을 실행할 클래스를 만들 수 있습니다.

@EnableDataFlowShell
@SpringBootApplication
public class SpringDataFlowShellApplication {
     
    public static void main(String[] args) {
        SpringApplication.run(SpringDataFlowShellApplication.class, args);
    }
}

5. 프로젝트 배포

프로젝트를 배포하기 위해 Apache Spark에서 cluster , yarnclient 의 세 가지 버전으로 사용할 수 있는 태스크 러너를 사용합니다 . 로컬 클라이언트 버전 으로 진행할 예정 입니다.

작업 실행기는 Spark 작업을 실행하는 것입니다.

이를 위해서는 먼저 Data Flow Shell을 사용하여 작업을 등록 해야 합니다 .

app register --type task --name spark-client --uri maven://org.springframework.cloud.task.app:spark-client-task:1.0.0.BUILD-SNAPSHOT

이 작업을 통해 여러 매개변수를 지정할 수 있습니다. 그 중 일부는 선택 사항이지만 일부 매개변수는 Spark 작업을 올바르게 배포하는 데 필요합니다.

  • spark.app-class , 제출된 작업의 기본 클래스
  • spark.app-jar , 작업이 포함된 fat jar의 경로
  • spark.app- name , 작업에 사용할 이름
  • spark.app-args , 작업에 전달될 인수

등록된 작업  spark-client 를 사용하여 필수 매개 변수를 제공하는 것을 기억하면서 작업을 제출할 수 있습니다.

task create spark1 --definition "spark-client \
  --spark.app-name=my-test-pi --spark.app-class=com.baeldung.spring.cloud.PiApproximation \
  --spark.app-jar=/apache-spark-job-0.0.1-SNAPSHOT.jar --spark.app-args=10"

spark.app -jar 는 작업이 포함된 fat-jar의 경로입니다.

작업을 성공적으로 생성한 후 다음 명령을 사용하여 실행할 수 있습니다.

task launch spark1

그러면 작업 실행이 호출됩니다.

6. 요약

이 예제에서는 Spring Cloud Data Flow 프레임워크를 사용하여 Apache Spark로 데이터를 처리하는 방법을 보여주었습니다. Spring Cloud Data Flow 프레임워크에 대한 자세한 내용은 설명서 에서 찾을 수 있습니다 .

모든 코드 샘플 은 GitHub에서 찾을 수 있습니다.

Generic footer banner