나는 Kafka 주제에서 json 데이터를 읽고 Student 객체로 변환하고 값을 변경하고 Kafka 주제로 다시 보내는 Spring 배치 및 Kafka를 사용하여 작은 배치 작업을하고 있습니다. 모든 것이 잘 작동하지만 내 유일한 문제는 내 소비자가 항상 주제에 대한 구걸을 읽고 있다는 것입니다. 사용되지 않은 마지막 메시지에서 읽으려면이 파일이 필요합니다. 이미 해당 속성을 추가했습니다.
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to false
ConsumerConfig.GROUP_ID_CONFIG to a random value
그러나 이것은 작동하지 않는 것 같습니다. 소비자가 시작할 때 모든 메시지를 처리합니다. 누구든지 Spring Batch 및 Kafka로 수행하는 방법에 대한 아이디어가 있습니까? 이것은 내 코드입니다.
BatchStudent.java :
@SpringBootApplication
@EnableBatchProcessing
@RequiredArgsConstructor
public class BatchStudent {
public static void main(String[] args) {
SpringApplication.run(BatchStudent.class, args);
}
private final JobBuilderFactory jobBuilderFactory;
private final StepBuilderFactory stepBuilderFactory;
private final KafkaTemplate<Integer, Student> template;
private final KafkaProperties properties;
@Value("${kafka.topic.consumer}")
private String topic;
@Bean
public ItemProcessor<Student, Student> customItemProcessor() {
return new CustomProcessor();
}
@Bean
Job job() {
return this.jobBuilderFactory.get("job")
.start(start())
.incrementer(new RunIdIncrementer())
.build();
}
@Bean
KafkaItemWriter<Integer, Student> writer() {
return new KafkaItemWriterBuilder<Integer, Student>()
.kafkaTemplate(template)
.itemKeyMapper(Student::getId)
.build();
}
@Bean
public KafkaItemReader<Integer, Student> reader() {
Properties props = new Properties();
props.putAll(this.properties.buildConsumerProperties());
return new KafkaItemReaderBuilder<Integer, Student>()
.partitions(0)
.consumerProperties(props)
.name("students-consumer-reader")
.saveState(true)
.topic(topic)
.build();
}
@Bean
Step start() {
return this.stepBuilderFactory
.get("step")
.<Student, Student>chunk(10)
.writer(writer())
.processor(customItemProcessor())
.reader(reader())
.build();
}
}
app.yml
spring.batch.initialize-schema: always
#Conf Kafka Consumer
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
spring.kafka.consumer.value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
#spring.kafka.consumer.group-id: student-group
spring.kafka.consumer.properties.spring.json.trusted.packages: '*'
spring.kafka.consumer.properties.spring.json.value.default.type: com.org.model.Student
#Conf Kafka Producer
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
spring.kafka.producer.bootstrap-servers: localhost:9092
#Conf topics
spring.kafka.template.default-topic: producer.student
kafka.topic.consumer: consumer.student
Student.java
@Data
@NoArgsConstructor
@AllArgsConstructor
public class Student {
Integer id;
Integer count;
}
CustomProcessor.java
@NoArgsConstructor
public class CustomProcessor implements ItemProcessor<Student, Student> {
@Override
public Student process(Student studentRecieved) {
final Student studentSent = new Student();
studentSent.setId(studentRecieved.getId());
studentSent.setCount(200);
return studentSent;
}
}
도와 주셔서 감사합니다!