나는 '예상대로 작동하지 않는다'고 말하지만 실제로는 '내가 여기에서 적절한 작업을하고 있는지 잘 모르겠다'에 가깝고, 다른 접근 방식의 것들을 혼합하고 있고 실제로 상관 관계가없는 것처럼 느낍니다.
지금은 Spring Cloud Streams를 사용하여 PubSub 구독의 문자열 유형 메시지를 처리하고 있으며 지금까지 많은 번거 로움없이 메시지에서 메시지를 출력합니다.
내가 지금 달성하기 위해 노력하고있어 수집하는 것입니다 의이 말을하자, 1000 메시지 , 이를 처리하고 다른 PubSub 주제로 모두 보내 . List으로 보내거나 지금처럼 개별적으로 보내는 것에 대해 여전히 확실하지 않지만 모두 동시에 (이 질문과 관련이 없어야 함).
이제 방금 다음 속성을 발견했습니다.
spring.cloud.stream.bindings.input.consumer.batch-mode=true
다음 항목과 함께 GCP 항목에 대해 더 구체적입니다.
spring.cloud.gcp.pubsub.publisher.batching.enabled=true
spring.cloud.gcp.pubsub.publisher.batching.delay-threshold-seconds=300
spring.cloud.gcp.pubsub.publisher.batching.element-count-threshold=100
그래서 첫 번째 질문은 ... 어떤 방식 으로든 연결되어 있습니까? 첫 번째와 나머지 세 개를 함께 가져야합니까?
이전 속성을 내 application.properties 파일에 추가 한 후 발생한 일은 실제로 전혀 변경되지 않습니다. 메시지는 아무런 문제없이 배치 접근 방식없이 애플리케이션에 계속 도착하고 떠납니다.
현재 다음과 같은 기능을 사용하고 있습니다.
@Bean
public Function<Message<String>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
메서드는 String List이 아닌 String 만 수신하기 때문에 일부 메시지와 함께 충돌 할 것으로 예상했습니다. 충돌하지 않았기 때문에 위의 메서드를 수정하여 String List을 수신했습니다 (Spring이 백그라운드에서 여전히 메시지를 String으로 수신하지만 나중에 처리 할 메서드 List에 수집 할 수있는 마술을 수행 할 수 있습니까?).
@Bean
public Function<Message<List<String>>, String> sampleFunction() {
... // Stream processing in here
return processedString;
}
그러나 이것은 단일 문자열 메시지를 문자열 List으로 구문 분석하려고하기 때문에 충돌합니다.
모든 문자열 메시지를 List으로 일괄 처리하는 코드를 어떻게 준비 할 수 있습니까? 이것에 대한 예가 있습니까?