1. 개요
RxJava 는 비동기 및 이벤트 기반 프로그램을 만들기 위한 인기 있는 라이브러리로, Reactive Extensions 이니셔티브에서 제시한 주요 아이디어에서 영감을 얻었습니다.
Eclipse 의 산하 프로젝트인 Vert.x 는 처음부터 반응형 패러다임을 완전히 활용하도록 설계된 여러 구성 요소를 제공합니다.
함께 사용하면 반응성이 필요한 모든 Java 프로그램 의 유효한 기반이 될 수 있습니다.
이 문서에서는 도시 이름 List이 있는 파일을 로드하고 각각에 대해 일출부터 일몰까지 하루가 몇 개인지 출력합니다.
우리는 공개 www.metaweather.com REST API 에서 게시된 데이터를 사용 하여 일광의 길이를 계산하고 Vert.x 와 함께 RxJava 를 사용하여 순전히 반응적인 방식으로 계산합니다.
2. 메이븐 의존성
vertx-rx-java2 를 가져오는 것으로 시작하겠습니다 .
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>3.5.0-Beta1</version>
</dependency>
작성 당시 Vert.x 와 최신 RxJava 2 간의 통합 은 베타 릴리스로만 사용할 수 있습니다. 즉, 우리가 구축하고 있는 프로그램에 대해 충분히 안정적입니다.
io.vertx :vertx-rx-java2 는 io.reactivex.rxjava2:rxjava 에 의존 하므로 RxJava 관련 패키지를 명시적으로 가져올 필요가 없습니다 .
RxJava 와의 Vert.x 통합 의 최신 버전은 Maven Central 에서 찾을 수 있습니다 .
3. 설정
Vert.x 를 사용하는 모든 애플리케이션에서와 마찬가지로 모든 Vert.x 기능 에 대한 기본 진입점인 vertx 개체를 만들기 시작 합니다.
Vertx vertx = io.vertx.reactivex.core.Vertx.vertx();
vertx-rx-java2 라이브러리는 io.vertx.core.Vertx 및 io.vertx.reactivex.core.Vertx 의 두 가지 클래스를 제공합니다 . 첫 번째는 고유하게 Vert.x 를 기반으로 하는 애플리케이션의 일반적인 진입점이지만 후자는 RxJava 와의 통합을 위해 사용하는 것 입니다.
나중에 사용할 개체를 계속 정의합니다.
FileSystem fileSystem = vertx.fileSystem();
HttpClient httpClient = vertx.createHttpClient();
Vert.x 의 FileSystem 은 반응적인 방식으로 파일 시스템에 대한 액세스를 제공하는 반면 Vert.x 의 HttpClient 는 HTTP 에 대해 동일한 작업을 수행합니다 .
4. 리액티브 체인
의미 있는 계산을 얻기 위해 몇 가지 간단한 반응 연산자를 연결하는 것은 반응적 맥락에서 쉽습니다.
우리의 예 를 위해 그렇게 합시다 :
fileSystem
.rxReadFile("cities.txt").toFlowable()
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)
.map(extractingWoeid())
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())
.map(Buffer::toJsonObject)
.map(toCityAndDayLength())
.subscribe(System.out::println, Throwable::printStackTrace);
이제 코드의 각 논리적 청크가 어떻게 작동하는지 살펴보겠습니다.
5. 도시 이름
첫 번째 단계는 도시 이름 List이 포함된 파일을 한 줄에 하나씩 읽는 것입니다.
fileSystem
.rxReadFile("cities.txt").toFlowable()
.flatMap(buffer -> Flowable.fromArray(buffer.toString().split("\\r?\\n")))
rxReadFile() 메서드 는 반응적으로 파일을 읽고 RxJava 의 Single<Buffer> 를 반환합니다 . 그래서 우리는 RxJava 의 데이터 구조에서 Vert.x 의 비동기성이라는 우리가 찾고 있는 통합을 얻었습니다 .
파일이 하나뿐이므로 파일 의 전체 콘텐츠가 포함 된 Buffer 의 단일 방출을 얻습니다. 그 입력을 RxJava 의 Flowable 로 변환하고 대신 각 도시 이름에 대한 이벤트를 내보내는 Flowable 을 갖도록 파일의 라인을 평면 매핑합니다 .
6. JSON 도시 설명자
도시 이름이 있으면 다음 단계는 Metaweather REST API 를 사용하여 해당 도시의 식별자 코드를 얻는 것입니다. 이 식별자는 도시의 일출 및 일몰 시간을 가져오는 데 사용됩니다. 일련의 호출을 계속합시다.
일련의 호출을 계속합시다.
.flatMap(city -> searchByCityName(httpClient, city))
.flatMap(HttpClientResponse::toFlowable)
searchByCityName() 메서드는 첫 번째 단계에서 만든 HttpClient 를 사용 하여 도시 식별자를 제공하는 REST 서비스 를 호출합니다 . 그런 다음 두 번째 flatMap() 을 사용 하여 응답을 포함하는 Buffer 를 얻습니다 .
searchByCityName() 의 본문 을 작성하는 이 단계를 완료해 보겠습니다 .
Flowable<HttpClientResponse> searchByCityName(HttpClient httpClient, String cityName) {
HttpClientRequest req = httpClient.get(
new RequestOptions()
.setHost("www.metaweather.com")
.setPort(443)
.setSsl(true)
.setURI(format("/api/location/search/?query=%s", cityName)));
return req
.toFlowable()
.doOnSubscribe(subscription -> req.end());
}
Vert.x 의 HttpClient 는 반응형 HTTP 응답 을 내보내는 RxJava 의 Flowable 을 반환합니다 . 이것은 차례로 Buffers 에서 분할된 Response body을 내보냅니다 .
적절한 URL에 대한 새로운 반응형 요청을 만들었지만 Vert.x 는 요청을 보낼 수 있다는 신호를 보내기 위해 HttpClientRequest.end() 메서드를 호출 해야 하며 end() 가 전송되기 전에 적어도 하나의 구독이 필요하다는 점에 주목했습니다. 성공적으로 호출됩니다.
이를 달성하기 위한 솔루션은 RxJava 의 doOnSubscribe() 를 사용 하여 소비자가 구독하는 즉시 end( ) 를 호출 하는 것입니다.
7. 도시 식별자
이제 사용자 지정 메서드를 통해 도시를 고유하게 식별하는 반환된 JSON 개체의 woeid 속성 값을 가져와야 합니다.
.map(extractingWoeid())
extractingWoeid() 메서드 는 REST 서비스 응답 에 포함된 JSON 에서 도시 식별자를 추출하는 함수를 반환합니다 .
private static Function<Buffer, Long> extractingWoeid() {
return cityBuffer -> cityBuffer
.toJsonArray()
.getJsonObject(0)
.getLong("woeid");
}
Buffer 에서 제공 하는 편리한 toJson…() 메서드를 사용하여 필요한 속성에 빠르게 액세스할 수 있습니다.
8. 도시 세부 정보
REST API 에서 필요한 세부 정보를 검색하기 위해 반응 체인을 계속 진행해 보겠습니다 .
.flatMap(cityId -> getDataByPlaceId(httpClient, cityId))
.flatMap(toBufferFlowable())
getDataByPlaceId() 메서드 를 자세히 살펴보겠습니다 .
static Flowable<HttpClientResponse> getDataByPlaceId(
HttpClient httpClient, long placeId) {
return autoPerformingReq(
httpClient,
format("/api/location/%s/", placeId));
}
여기에서는 이전 단계에서 적용한 것과 동일한 접근 방식을 사용했습니다. getDataByPlaceId() 는 Flowable<HttpClientResponse> 를 반환합니다 . HttpClientResponse 는 몇 바이트보다 긴 경우 청크 로 API 응답을 내보냅니다.
toBufferFlowable() 메서드를 사용 하여 전체 JSON 개체에 액세스할 수 있도록 응답 청크를 단일 청크로 줄입니다.
static Function<HttpClientResponse, Publisher<? extends Buffer>>
toBufferFlowable() {
return response -> response
.toObservable()
.reduce(
Buffer.buffer(),
Buffer::appendBuffer).toFlowable();
}
9. 일몰 및 일출 시간
JSON 개체 에서 관심 있는 정보를 검색하여 반응 체인에 계속 추가해 보겠습니다 .
.map(toCityAndDayLength())
toCityAndDayLength() 메서드 를 작성해 보겠습니다 .
static Function<JsonObject, CityAndDayLength> toCityAndDayLength() {
return json -> {
ZonedDateTime sunRise = ZonedDateTime.parse(json.getString("sun_rise"));
ZonedDateTime sunSet = ZonedDateTime.parse(json.getString("sun_set"));
String cityName = json.getString("title");
return new CityAndDayLength(
cityName, sunSet.toEpochSecond() - sunRise.toEpochSecond());
};
}
일출과 일몰 사이의 시간을 단순히 시간 단위로 계산 하는 POJO 를 생성하기 위해 JSON 에 포함된 정보를 매핑하는 함수를 반환합니다 .
10. 구독
반응 사슬이 완성되었습니다. 이제 생성 된 CityAndDayLength 인스턴스 또는 오류 발생 시 스택 추적 을 출력하는 처리기로 결과 Flowable 을 구독할 수 있습니다.
.subscribe(
System.out::println,
Throwable::printStackTrace)
애플리케이션을 실행하면 List에 포함된 도시와 애플리케이션이 실행되는 날짜에 따라 다음과 같은 결과를 볼 수 있습니다.
In Chicago there are 13.3 hours of light.
In Milan there are 13.5 hours of light.
In Cairo there are 12.9 hours of light.
In Moscow there are 14.1 hours of light.
In Santiago there are 11.3 hours of light.
In Auckland there are 11.2 hours of light.
HTTP API 에 대한 모든 요청이 비동기식으로 실행 되기 때문에 도시는 파일에 지정된 순서와 다른 순서로 나타날 수 있습니다.
11. 결론
이 기사에서는 Vert.x 반응형 모듈을 RxJava 에서 제공하는 연산자 및 논리적 구성과 혼합하는 것이 얼마나 쉬운지 살펴보았습니다 .
우리가 구축한 리액티브 체인은 길지만 복잡한 시나리오를 작성하기 쉽게 만드는 방법을 보여주었습니다.
항상 그렇듯이 전체 소스 코드는 GitHub에서 사용할 수 있습니다 .