카테고리 없음

스프링 통합-우선 순위 수집기

기록만이살길 2021. 3. 1. 01:57
반응형

스프링 통합-우선 순위 수집기

1. 질문(문제점):

다음과 같은 앱 요구 사항이 있습니다.

  • RabbitMq에서 메시지를 수신 한 다음 types속성 (사전 지정된 유형-시간 매핑 사용) 및 큐에서 대기중인 기존 시간 메시지 ( old속성) 에 따라 좀 더 복잡한 규칙을 기반으로 집계 됩니다.
  • 모든 메시지는 가변 메시지 속도 (예 : 1msg / sec, 최대 100msg / sec)로 릴리스되어야합니다. 이 속도는 rabbitmq 대기열 크기 (이 구성 요소와 관련이없고 파이프 라인에있는 하나의 대기열)를 모니터링하는 서비스에 의해 제어되고 설정되며 대기열에 너무 많은 메시지가있는 경우 속도가 감소합니다.

이미지에서 볼 수 있듯이 하나의 사용 사례 : 세 개의 메시지가 이미 집계되어 다음 초에 릴리스되기를 기다리고 있지만 (현재 속도가이므로 1msg/sec) 바로 그 시점에으로 MSG도착 id:10하고 업데이트 AGGREGATED 2되어 첫 번째 메시지가됩니다. 우선 순위에 따라. 따라서 다음 틱에서를 AGGREGATED 3릴리스하는 대신 AGGREGATED 2이제 더 높은 우선 순위를 갖기 때문에 릴리스 합니다.

여기에 이미지 설명 입력

이제 질문은-집계 중에 메시지의 우선 순위 지정을 지원하는지 알 수 없기 때문에 Spring Integration Aggregator를 사용할 수 있습니까? 나는 알고 groupTimeout있지만, 하나는 다른 그룹의 우선 순위를 변경하지 않고 단일 메시지 그룹 만 조정하고 있습니다. MessageGroupStoreReaper새 MSG가 도착할 때 우선 순위에 따라 다른 모든 집계 메시지를 조정하는 기능 을 사용할 수 있습니까?

최신 정보

나는 이와 같은 구현을했습니다. 지금은 괜찮아 보입니다. 도착하면 메시지를 집계하고 비교기는 내 사용자 지정 논리에 따라 메시지를 정렬합니다.

이것에 몇 가지 문제 (동시성 등)가있을 수 있다고 생각하십니까? 로그에서 폴러가 작업에서 두 ​​번 이상 호출되는 것을 볼 수 있습니다. 이것은 정상입니까?

2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL
2021-01-18 13:52:05.277  INFO 16080 --- [   scheduling-1] ggregatorConfig$PriorityAggregatingQueue : POLL

또한이 주석 doit처리 된 방법, 런타임에서 최대 폴링 메시지 수를 늘리는 적절한 방법입니까?

@Bean
    public MessageChannel aggregatingChannel(){
        return new QueueChannel(new PriorityAggregatingQueue<>((m1, m2) -> {//aggr here},
                Comparator.comparingInt(x -> x),
                (m) -> {
                    ExampleDTO d = (ExampleDTO) m.getPayload();
                    return d.getId();
                }
        ));
    }

    class PriorityAggregatingQueue<K> extends AbstractQueue<Message<?>> {
        private final Log logger = LogFactory.getLog(getClass());
        private final BiFunction<Message<?>, Message<?>, Message<?>> accumulator;
        private final Function<Message<?>, K> keyExtractor;
        private final NavigableMap<K, Message<?>> keyToAggregatedMessage;

        public PriorityAggregatingQueue(BiFunction<Message<?>, Message<?>, Message<?>> accumulator,
                                        Comparator<? super K> comparator,
                                        Function<Message<?>, K> keyExtractor) {
            this.accumulator = accumulator;
            this.keyExtractor = keyExtractor;
            keyToAggregatedMessage = new ConcurrentSkipListMap<>(comparator);
        }

        @Override
        public Iterator<Message<?>> iterator() {
            return keyToAggregatedMessage.values().iterator();
        }

        @Override
        public int size() {
            return keyToAggregatedMessage.size();
        }

        @Override
        public boolean offer(Message<?> m) {
            logger.info("OFFER");
            return keyToAggregatedMessage.compute(keyExtractor.apply(m), (k,old) -> accumulator.apply(old, m)) != null;
        }

        @Override
        public Message<?> poll() {
            logger.info("POLL");
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.pollLastEntry();
            return m != null ? m.getValue() : null;
        }

        @Override
        public Message<?> peek() {
            Map.Entry<K, Message<?>> m = keyToAggregatedMessage.lastEntry();
            return m!= null ? m.getValue() : null;
        }
    }

//    @Scheduled(fixedDelay = 10*1000)
//    public void doit(){
//        System.out.println("INCREASE POLL");
//        pollerMetadata().setMaxMessagesPerPoll(pollerMetadata().getMaxMessagesPerPoll() * 2);
//    }

    @Bean(name = PollerMetadata.DEFAULT_POLLER)
    public PollerMetadata pollerMetadata(){
        PollerMetadata metadata = new PollerMetadata();
        metadata.setTrigger(new DynamicPeriodicTrigger(Duration.ofSeconds(30)));
        metadata.setMaxMessagesPerPoll(1);
        return metadata;
    }

    @Bean
    public IntegrationFlow aggregatingFlow(
            AmqpInboundChannelAdapter aggregatorInboundChannel,
            AmqpOutboundEndpoint aggregatorOutboundChannel,
            MessageChannel wtChannel,
            MessageChannel aggregatingChannel,
            PollerMetadata pollerMetadata
    ) {
    return IntegrationFlows.from(aggregatorInboundChannel)
        .wireTap(wtChannel)
        .channel(aggregatingChannel)
        .handle(aggregatorOutboundChannel)
        .get();
    }

2. 해결방안:

글쎄, 그룹이 완료해야 할 새 메시지가 있으면 집계 자에 도착하면 해당 그룹이 즉시 해제됩니다 ( ReleaseStrategy그래도 그렇게 말하면). 타임 아웃 된 나머지 그룹은 계속해서 일정을 기다립니다.

단일 공통 일정에 의존하여 MessageGroupStoreReaper해당 부분 그룹을 해제해야하는지 아니면 그냥 폐기해야하는지 결정 하는 스마트 알고리즘을 생각해내는 것이 가능할 것입니다. 다시 말하지만 ReleaseStrategy, 부분적이라 할지라도 릴리스 할 것인지 아닌지에 대한 단서를 제공해야합니다. 폐기가 발생하고 해당 메시지를 수집기에 보관하려면 약간의 지연 후 수집 자에게 다시 보내야합니다. 만료 후에는 그룹이 저장소에서 제거되며 이는 이미 폐기 채널로 전송 한 경우 발생하므로이를 지연하고 집계자가 해당 그룹을 정리하도록하는 것이 좋습니다. 따라서 지연 후 안전하게 다시 보낼 수 있습니다. 새 그룹의 일부로 새 만료 기간에 대한 애그리 게이터.

다음 만료 시간을 위해 헤더의 일부 시간 키를 조정하기 위해 일반 그룹을 해제 한 후 저장소의 모든 메시지를 반복 할 수도 있습니다.

나는 이것이 어려운 문제라는 것을 알고 있지만, 우리가 방금 다루었던 그룹의 다른 그룹에 영향을 미치도록 설계되지 않았기 때문에 즉시 사용 가능한 솔루션은 없습니다.

65735002
반응형