카테고리 없음

Spring Batch-AmqpWriter 및 AmqpReader 예제

기록만이살길 2021. 2. 23. 11:07
반응형

Spring Batch-AmqpWriter 및 AmqpReader 예제

1. 질문(문제점):

를 사용하여 RabbitMQ에 데이터를 쓰고 RabbitMQ를 사용 AmqpWriter하여 데이터를 읽는 솔루션이 필요합니다 AmqpReader. 우리는 Apache Kafka를 찾는 것이 아니라 단순히 프로그램 세부 정보를 보내서 소비하고 싶습니다.

작가 코드

JobConfig.java

@Configuration
public class JobConfig {
    
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }
    
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());
    }

    @Bean
    public Queue myQueue() {
       return new Queue("myqueue");
    }
    

    @Bean
    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1);
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setLineTokenizer(tokenizer);
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());
        customerLineMapper.afterPropertiesSet();

        reader.setLineMapper(customerLineMapper);

        return reader;
    }
        
    @Bean
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    }
        
    @Bean
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
                .reader(customerItemReader())
                .writer(amqpWriter())
                .build();
    }
    
    @Bean
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())
                .start(step1())
                .build();
    }
}

CustomerFieldSetMapper.java

public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    
    @Override
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()
                .id(fieldSet.readLong("id"))
                .firstName(fieldSet.readRawString("firstName"))
                .lastName(fieldSet.readRawString("lastName"))
                .birthdate(fieldSet.readRawString("birthdate"))
                .build();
    }
}

Customer.java

@AllArgsConstructor
@NoArgsConstructor
@Builder
@Data
@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;
}

SpringBatchAmqpApplication.java

@EnableBatchProcessing
@SpringBootApplication
@EnableBinding(Source.class)
public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);
    }
}

리더 코드

JobConfiguration.java

@Configuration
public class JobConfiguration {
    @Autowired
    private StepBuilderFactory stepBuilderFactory;

    @Autowired
    private JobBuilderFactory jobBuilderFactory;

    @Bean
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setMessageConverter(jsonMessageConverter());
        return factory; 
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setDefaultReceiveQueue("myqueue");
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public Queue myQueue() {
        return new Queue("myqueue");
    }

    @Bean
    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());
    }

    @Bean
    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {
                System.out.println(c.toString());
            }
        };
    }

    @Bean
    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)
                .reader(customerReader())
                .writer(customerItemWriter())
                .listener(customerStepListener())
                .build();
    }

    @Bean
    public Job job() {
        return jobBuilderFactory.get("job")
                .start(step1())
                .build();
    }

    @Bean
    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();
    }
}

CustomerStepListener.java

public class CustomerStepListener implements StepExecutionListener {

    @Override
    public void beforeStep(StepExecution stepExecution) {
        System.out.println("==");
    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;
    }
}

로그

2021-01-18 18 : 41 : 05.023 INFO 25532 --- [main] osbatch.core.job.SimpleStepHandler : 실행 단계 : [step1] == 2021-01-18 18 : 41 : 05.031 INFO 25532 --- [ main] osarcCachingConnectionFactory : 연결 시도 중 : localhost : 5672 2021-01-18 18 : 41 : 05.072 INFO 25532 --- [main] osarcCachingConnectionFactory : 새 연결 생성 : connectionFactory # 20a14b55 : 0 / SimpleConnection @ 4650a407 [delegate = amqp : //guest@127.0.0.1:5672/, localPort = 55797] 읽기 횟수 = StepExecution : id = 1, version = 2, name = step1, status = COMPLETED, exitStatus = COMPLETED, readCount = 0, filterCount = 0, writeCount = 0 readSkipCount = 0, writeSkipCount = 0, processSkipCount = 0, commitCount = 1, rollbackCount = 0, exitDescription = 2021-01-18 18 : 41 : 05.097 INFO 25532 --- [main] osbatch.core.step.AbstractStep : Step :[step1] 73ms에서 실행 됨 2021-01-18 18 : 41 : 05.099 INFO 25532 --- [main] osbclsupport.SimpleJobLauncher : 작업 : [SimpleJob : [name = job]]이 다음 매개 변수로 완료되었습니다. [{-spring. output.ansi.enabled = always}] 및 다음 상태 : [COMPLETED] (87ms)

2. 해결방안:

"작성기 코드"쪽에서 AmqpItemWriterRabbitTemplate. 기본적으로 메시지는 이름이없는 교환으로 전송됩니다. 여기에서는 Javadoc에서 발췌 한 것입니다.

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

작성기 구성에서 토끼 템플릿과 대기열 사이에 "연결"이 없습니다. 따라서 큐에 메시지를 보내도록 rabbit 템플릿을 구성해야합니다.

@Bean
public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    rabbitTemplate.setRoutingKey(myQueue().getName());
    return rabbitTemplate;
}

이것은 독자 측에서 rabbitTemplate.setDefaultReceiveQueue("myqueue");.

65750319
반응형