를 사용하여 RabbitMQ에 데이터를 쓰고 RabbitMQ를 사용 AmqpWriter
하여 데이터를 읽는 솔루션이 필요합니다 AmqpReader
. 우리는 Apache Kafka를 찾는 것이 아니라 단순히 프로그램 세부 정보를 보내서 소비하고 싶습니다.
작가 코드
JobConfig.java
@Configuration
public class JobConfig {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public RabbitTemplate rabbitTemplate() {
return new RabbitTemplate(connectionFactory());
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public FlatFileItemReader<Customer> customerItemReader() {
FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
reader.setLinesToSkip(1);
reader.setResource(new ClassPathResource("/data/customer.csv"));
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });
DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
customerLineMapper.setLineTokenizer(tokenizer);
customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
customerLineMapper.afterPropertiesSet();
reader.setLineMapper(customerLineMapper);
return reader;
}
@Bean
public AmqpItemWriter<Customer> amqpWriter(){
AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
return amqpItemWriter;
}
@Bean
public Step step1() throws Exception {
return stepBuilderFactory.get("step1")
.<Customer, Customer>chunk(10)
.reader(customerItemReader())
.writer(amqpWriter())
.build();
}
@Bean
public Job job() throws Exception {
return jobBuilderFactory.get("job")
.incrementer(new RunIdIncrementer())
.start(step1())
.build();
}
}
CustomerFieldSetMapper.java
public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
@Override
public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
return Customer.builder()
.id(fieldSet.readLong("id"))
.firstName(fieldSet.readRawString("firstName"))
.lastName(fieldSet.readRawString("lastName"))
.birthdate(fieldSet.readRawString("birthdate"))
.build();
}
}
Customer.java
@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
private static final long serialVersionUID = 1L;
private Long id;
private String firstName;
private String lastName;
private String birthdate;
}
SpringBatchAmqpApplication.java
@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {
public static void main(String[] args) {
SpringApplication.run(SpringBatchAmqpApplication.class, args);
}
}
리더 코드
JobConfiguration.java
@Configuration
public class JobConfiguration {
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Autowired
private JobBuilderFactory jobBuilderFactory;
@Bean
public ConnectionFactory connectionFactory() {
return new CachingConnectionFactory("localhost");
}
@Bean
public AmqpAdmin amqpAdmin() {
return new RabbitAdmin(connectionFactory());
}
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory());
factory.setMessageConverter(jsonMessageConverter());
return factory;
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setDefaultReceiveQueue("myqueue");
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
@Bean
public Queue myQueue() {
return new Queue("myqueue");
}
@Bean
public ItemReader<Customer> customerReader(){
return new AmqpItemReader<>(this.rabbitTemplate());
}
@Bean
public ItemWriter<Customer> customerItemWriter(){
return items -> {
for(Customer c : items) {
System.out.println(c.toString());
}
};
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.<Customer, Customer> chunk(10)
.reader(customerReader())
.writer(customerItemWriter())
.listener(customerStepListener())
.build();
}
@Bean
public Job job() {
return jobBuilderFactory.get("job")
.start(step1())
.build();
}
@Bean
public CustomerStepListener customerStepListener() {
return new CustomerStepListener();
}
}
CustomerStepListener.java
public class CustomerStepListener implements StepExecutionListener {
@Override
public void beforeStep(StepExecution stepExecution) {
System.out.println("==");
}
@Override
public ExitStatus afterStep(StepExecution stepExecution) {
System.out.println("READ COUNT = "+stepExecution);
return ExitStatus.COMPLETED;
}
}
로그
2021-01-18 18 : 41 : 05.023 INFO 25532 --- [main] osbatch.core.job.SimpleStepHandler : 실행 단계 : [step1] == 2021-01-18 18 : 41 : 05.031 INFO 25532 --- [ main] osarcCachingConnectionFactory : 연결 시도 중 : localhost : 5672 2021-01-18 18 : 41 : 05.072 INFO 25532 --- [main] osarcCachingConnectionFactory : 새 연결 생성 : connectionFactory # 20a14b55 : 0 / SimpleConnection @ 4650a407 [delegate = amqp : //guest@127.0.0.1:5672/, localPort = 55797] 읽기 횟수 = StepExecution : id = 1, version = 2, name = step1, status = COMPLETED, exitStatus = COMPLETED, readCount = 0, filterCount = 0, writeCount = 0 readSkipCount = 0, writeSkipCount = 0, processSkipCount = 0, commitCount = 1, rollbackCount = 0, exitDescription = 2021-01-18 18 : 41 : 05.097 INFO 25532 --- [main] osbatch.core.step.AbstractStep : Step :[step1] 73ms에서 실행 됨 2021-01-18 18 : 41 : 05.099 INFO 25532 --- [main] osbclsupport.SimpleJobLauncher : 작업 : [SimpleJob : [name = job]]이 다음 매개 변수로 완료되었습니다. [{-spring. output.ansi.enabled = always}] 및 다음 상태 : [COMPLETED] (87ms)