다음과 같은 앱 요구 사항이 있습니다.
- RabbitMq에서 메시지를 수신 한 다음
types
속성 (사전 지정된 유형-시간 매핑 사용) 및 큐에서 대기중인 기존 시간 메시지 (old
속성) 에 따라 좀 더 복잡한 규칙을 기반으로 집계 됩니다. - 모든 메시지는 가변 메시지 속도 (예 : 1msg / sec, 최대 100msg / sec)로 릴리스되어야합니다. 이 속도는 rabbitmq 대기열 크기 (이 구성 요소와 관련이없고 파이프 라인에있는 하나의 대기열)를 모니터링하는 서비스에 의해 제어되고 설정되며 대기열에 너무 많은 메시지가있는 경우 속도가 감소합니다.
이미지에서 볼 수 있듯이 하나의 사용 사례 : 세 개의 메시지가 이미 집계되어 다음 초에 릴리스되기를 기다리고 있지만 (현재 속도가이므로 1msg/sec
) 바로 그 시점에으로 MSG
도착 id:10
하고 업데이트 AGGREGATED 2
되어 첫 번째 메시지가됩니다. 우선 순위에 따라. 따라서 다음 틱에서를 AGGREGATED 3
릴리스하는 대신 AGGREGATED 2
이제 더 높은 우선 순위를 갖기 때문에 릴리스 합니다.
이제 질문은-집계 중에 메시지의 우선 순위 지정을 지원하는지 알 수 없기 때문에 Spring Integration Aggregator를 사용할 수 있습니까? 나는 알고 groupTimeout
있지만, 하나는 다른 그룹의 우선 순위를 변경하지 않고 단일 메시지 그룹 만 조정하고 있습니다. MessageGroupStoreReaper
새 MSG가 도착할 때 우선 순위에 따라 다른 모든 집계 메시지를 조정하는 기능 을 사용할 수 있습니까?
최신 정보
나는 이와 같은 구현을했습니다. 지금은 괜찮아 보입니다. 도착하면 메시지를 집계하고 비교기는 내 사용자 지정 논리에 따라 메시지를 정렬합니다.
이것에 몇 가지 문제 (동시성 등)가있을 수 있다고 생각하십니까? 로그에서 폴러가 작업에서 두 번 이상 호출되는 것을 볼 수 있습니다. 이것은 정상입니까?
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277 INFO 16080 --- [ scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
또한이 주석 doit
처리 된 방법, 런타임에서 최대 폴링 메시지 수를 늘리는 적절한 방법입니까?
@Bean
public MessageChannel aggregatingChannel(){
return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
Comparator.comparingInt(x -> x),
(m) -> {
ExampleDTO d = (ExampleDTO) m.getPayload();
return d.getId();
}
));
}
class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
private final Log logger = LogFactory.getLog(getClass());
private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
private final Function<Message<?>, K> keyExtractor;
private final NavigableMap<K, Message<?>> keyToAggregatedMessage;
public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
Comparator<? super K> comparator,
Function<Message<?>, K> keyExtractor) {
this.accumulator = accumulator;
this.keyExtractor = keyExtractor;
keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
}
@Override
public Iterator<Message<?>> iterator() {
return keyToAggregatedMessage.values().iterator();
}
@Override
public int size() {
return keyToAggregatedMessage.size();
}
@Override
public boolean offer(Message<?> m) {
logger.info("OFFER");
return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
}
@Override
public Message<?> poll() {
logger.info("POLL");
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
return m != null ? m.getValue() : null;
}
@Override
public Message<?> peek() {
Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
return m!= null ? m.getValue() : null;
}
}
// @Scheduled(fixedDelay = 10*1000)
// public void doit(){
// System.out.println("INCREASE POLL");
// pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
// }
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata pollerMetadata(){
PollerMetadata metadata = new PollerMetadata();
metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
metadata.setMaxMessagesPerPoll(1);
return metadata;
}
@Bean
public IntegrationFlow aggregatingFlow(
AmqpInboundChannelAdapter aggregatorInboundChannel,
AmqpOutboundEndpoint aggregatorOutboundChannel,
MessageChannel wtChannel,
MessageChannel aggregatingChannel,
PollerMetadata pollerMetadata
) {
return IntegrationFlows.from(aggregatorInboundChannel)
.wireTap(wtChannel)
.channel(aggregatingChannel)
.handle(aggregatorOutboundChannel)
.get();
}