카테고리 없음

Spring Batch-AmqpWriter 및 AmqpReader 예제

기록만이살길 2021. 2. 23. 11:07

Spring Batch-AmqpWriter 및 AmqpReader 예제

1. 질문(문제점):

를 사용하여 RabbitMQ에 데이터를 쓰고 RabbitMQ를 사용 AmqpWriter하여 데이터를 읽는 솔루션이 필요합니다 AmqpReader. 우리는 Apache Kafka를 찾는 것이 아니라 단순히 프로그램 세부 정보를 보내서 소비하고 싶습니다.

작가 코드


public class JobConfig {
    private StepBuilderFactory stepBuilderFactory;

    private JobBuilderFactory jobBuilderFactory;
    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());

    public RabbitTemplate rabbitTemplate() {
        return new RabbitTemplate(connectionFactory());

    public Queue myQueue() {
       return new Queue("myqueue");

    public FlatFileItemReader<Customer> customerItemReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("/data/customer.csv"));

        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[] { "id", "firstName", "lastName", "birthdate" });

        DefaultLineMapper<Customer> customerLineMapper = new DefaultLineMapper<>();
        customerLineMapper.setFieldSetMapper(new CustomerFieldSetMapper());


        return reader;
    public AmqpItemWriter<Customer> amqpWriter(){
        AmqpItemWriter<Customer> amqpItemWriter = new AmqpItemWriter<>(this.rabbitTemplate());
        return amqpItemWriter;
    public Step step1() throws Exception {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer>chunk(10)
    public Job job() throws Exception {
        return jobBuilderFactory.get("job")
                .incrementer(new RunIdIncrementer())


public class CustomerFieldSetMapper implements FieldSetMapper<Customer> {
    public Customer mapFieldSet(FieldSet fieldSet) throws BindException {
        return Customer.builder()


@JsonIgnoreProperties(ignoreUnknown = true)
public class Customer implements Serializable {
    private static final long serialVersionUID = 1L;
    private Long id;
    private String firstName;
    private String lastName;
    private String birthdate;


public class SpringBatchAmqpApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBatchAmqpApplication.class, args);

리더 코드


public class JobConfiguration {
    private StepBuilderFactory stepBuilderFactory;

    private JobBuilderFactory jobBuilderFactory;

    public ConnectionFactory connectionFactory() {
        return new CachingConnectionFactory("localhost");

    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());

    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        return factory; 

    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();

    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;

    public Queue myQueue() {
        return new Queue("myqueue");

    public ItemReader<Customer> customerReader(){
        return new AmqpItemReader<>(this.rabbitTemplate());

    public ItemWriter<Customer> customerItemWriter(){
        return items -> {
            for(Customer c : items) {

    public Step step1() {
        return stepBuilderFactory.get("step1")
                .<Customer, Customer> chunk(10)

    public Job job() {
        return jobBuilderFactory.get("job")

    public CustomerStepListener customerStepListener() {
        return new CustomerStepListener();


public class CustomerStepListener implements StepExecutionListener {

    public void beforeStep(StepExecution stepExecution) {

    public ExitStatus afterStep(StepExecution stepExecution) {
        System.out.println("READ COUNT = "+stepExecution);
        return ExitStatus.COMPLETED;


2021-01-18 18 : 41 : 05.023 INFO 25532 --- [main] osbatch.core.job.SimpleStepHandler : 실행 단계 : [step1] == 2021-01-18 18 : 41 : 05.031 INFO 25532 --- [ main] osarcCachingConnectionFactory : 연결 시도 중 : localhost : 5672 2021-01-18 18 : 41 : 05.072 INFO 25532 --- [main] osarcCachingConnectionFactory : 새 연결 생성 : connectionFactory # 20a14b55 : 0 / SimpleConnection @ 4650a407 [delegate = amqp : //guest@, localPort = 55797] 읽기 횟수 = StepExecution : id = 1, version = 2, name = step1, status = COMPLETED, exitStatus = COMPLETED, readCount = 0, filterCount = 0, writeCount = 0 readSkipCount = 0, writeSkipCount = 0, processSkipCount = 0, commitCount = 1, rollbackCount = 0, exitDescription = 2021-01-18 18 : 41 : 05.097 INFO 25532 --- [main] osbatch.core.step.AbstractStep : Step :[step1] 73ms에서 실행 됨 2021-01-18 18 : 41 : 05.099 INFO 25532 --- [main] osbclsupport.SimpleJobLauncher : 작업 : [SimpleJob : [name = job]]이 다음 매개 변수로 완료되었습니다. [{-spring. output.ansi.enabled = always}] 및 다음 상태 : [COMPLETED] (87ms)

2. 해결방안:

"작성기 코드"쪽에서 AmqpItemWriterRabbitTemplate. 기본적으로 메시지는 이름이없는 교환으로 전송됩니다. 여기에서는 Javadoc에서 발췌 한 것입니다.

Messages will be sent to the nameless exchange if not specified on the provided AmqpTemplate.

작성기 구성에서 토끼 템플릿과 대기열 사이에 "연결"이 없습니다. 따라서 큐에 메시지를 보내도록 rabbit 템플릿을 구성해야합니다.

public RabbitTemplate rabbitTemplate() {
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
    return rabbitTemplate;

이것은 독자 측에서 rabbitTemplate.setDefaultReceiveQueue("myqueue");.
