카테고리 없음

Kafka & Spring Batch-동일한 주제에서 커밋되지 않은 메시지 만 읽는 방법은 무엇입니까?

기록만이살길 2021. 2. 22. 11:13
반응형

Kafka & Spring Batch-동일한 주제에서 커밋되지 않은 메시지 만 읽는 방법은 무엇입니까?

1. 질문(문제점):

나는 Kafka 주제에서 json 데이터를 읽고 Student 객체로 변환하고 값을 변경하고 Kafka 주제로 다시 보내는 Spring 배치 및 Kafka를 사용하여 작은 배치 작업을하고 있습니다. 모든 것이 잘 작동하지만 내 유일한 문제는 내 소비자가 항상 주제에 대한 구걸을 읽고 있다는 것입니다. 사용되지 않은 마지막 메시지에서 읽으려면이 파일이 필요합니다. 이미 해당 속성을 추가했습니다.

ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value

그러나 이것은 작동하지 않는 것 같습니다. 소비자가 시작할 때 모든 메시지를 처리합니다. 누구든지 Spring Batch 및 Kafka로 수행하는 방법에 대한 아이디어가 있습니까? 이것은 내 코드입니다.

BatchStudent.java :

@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
    public static void main(String[] args) {
        SpringApplication.run(BatchStudent.class, args);
    }

    private final JobBuilderFactory jobBuilderFactory;
    private final StepBuilderFactory stepBuilderFactory;
    private final KafkaTemplate<Integer, Student> template;
    private final KafkaProperties properties;

    @Value("${kafka.topic.consumer}")
    private String topic;

    @Bean
    public ItemProcessor<Student, Student> customItemProcessor() {
        return new CustomProcessor();
    }

    @Bean
    Job job() {
        return this.jobBuilderFactory.get("job")
                .start(start())
                .incrementer(new RunIdIncrementer())
                .build();
    }

    @Bean
    KafkaItemWriter<Integer, Student> writer() {
        return new KafkaItemWriterBuilder<Integer, Student>()
                .kafkaTemplate(template)
                .itemKeyMapper(Student::getId)
                .build();
    }

    @Bean
    public KafkaItemReader<Integer, Student> reader() {
        Properties props = new Properties();
        props.putAll(this.properties.buildConsumerProperties());

        return new KafkaItemReaderBuilder<Integer, Student>()
                .partitions(0)
                .consumerProperties(props)
                .name("students-consumer-reader")
                .saveState(true)
                .topic(topic)
                .build();
    }

    @Bean
    Step start() {
        return this.stepBuilderFactory
                .get("step")
                .<Student, Student>chunk(10)
                .writer(writer())
                .processor(customItemProcessor())
                .reader(reader())
                .build();
    }
}

app.yml

spring.batch.initialize-schema: always

#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student

#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092

#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student

Student.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
    Integer id;
    Integer count;
}

CustomProcessor.java

@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {

    @Override
    public Student process(Student studentRecieved) {
        final Student studentSent = new Student();
        studentSent.setId(studentRecieved.getId());
        studentSent.setCount(200);
        return studentSent;
    }
}

도와 주셔서 감사합니다!

2. 해결방안:

모든 것이 잘 작동하지만 내 유일한 문제는 내 소비자가 항상 주제에 대한 구걸을 읽고 있다는 것입니다. 사용되지 않은 마지막 메시지에서 읽으려면이 파일이 필요합니다.

Spring Batch 4.3은 Kafka에 저장된 오프셋에서 레코드를 소비하는 방법을 도입했습니다. 작년 Spring One에서이 기능에 대해 이야기했습니다. Spring Batch 4.3의 새로운 기능은 무엇입니까? . setPartitionOffsets 를 사용하여 각 파티션에서 사용자 정의 시작 오프셋으로 kafka 판독기를 구성 할 수 있습니다 .

Setter for partition offsets. This mapping tells the reader the offset to start reading
from in each partition. This is optional, defaults to starting from offset 0 in each
partition. Passing an empty map makes the reader start from the offset stored in Kafka
for the consumer group ID.

이 테스트 케이스 에서 완전한 예를 찾을 수 있습니다 .

65835903
반응형