카테고리 없음

하나의 Kafka 주제 아래에 직렬화 된 Java 객체 2 개 보내기

기록만이살길 2021. 2. 20. 01:55
반응형

하나의 Kafka 주제 아래에 직렬화 된 Java 객체 2 개 보내기

1. 질문(문제점):

Java 객체를 송수신하는 Kafka Consumer 및 Producer를 구현하고 싶습니다. 전체

소스

나는 이것을 시도했다 :

생산자:

    @Configuration
public class KafkaProducerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    @Bean
    public ProducerFactory<String, SaleRequestFactory> saleRequestFactoryProducerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SaleRequestFactorySerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }

    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new DefaultKafkaProducerFactory<>(configProps);
    }


    @Bean
    public KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate() {
        return new KafkaTemplate<>(saleRequestFactoryProducerFactory());
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }

    @Bean
    public ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> replyKafkaTemplate(ProducerFactory<String, SaleRequestFactory> producerFactory, ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory) {
        ConcurrentMessageListenerContainer<String, SaleResponseFactory> kafkaMessageListenerContainer = factory.createContainer("tp-sale");
        kafkaMessageListenerContainer.getContainerProperties().setGroupId("tp-sale.reply");
        return new ReplyingKafkaTemplate<>(producerFactory, kafkaMessageListenerContainer);
    }
}

개체 보내기 :

 @RestController
@RequestMapping("/checkout")
public class CheckoutController {
    
    private TransactionService transactionService;
    private KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate;
    private ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate;
    private static String topic = "tp-sale";

    @Autowired
    public CheckoutController(ValidationMessage validationMessage, TransactionService transactionService,
                              KafkaTemplate<String, SaleRequestFactory> saleRequestFactoryKafkaTemplate,
                              ReplyingKafkaTemplate<String, SaleRequestFactory, SaleResponseFactory> requestReplyKafkaTemplate){
        this.transactionService = transactionService;
        this.saleRequestFactoryKafkaTemplate = saleRequestFactoryKafkaTemplate;
        this.requestReplyKafkaTemplate = requestReplyKafkaTemplate;
    }

    @PostMapping("test")
    private void performPayment() throws ExecutionException, InterruptedException, TimeoutException {

        Transaction transaction = new Transaction();
        transaction.setStatus(PaymentTransactionStatus.IN_PROGRESS.getText());

        Transaction insertedTransaction = transactionService.save(transaction);

        SaleRequestFactory obj = new SaleRequestFactory();
        obj.setId(100);

        ProducerRecord<String, SaleRequestFactory> record = new ProducerRecord<>("tp-sale", obj);
        RequestReplyFuture<String, SaleRequestFactory, SaleResponseFactory> replyFuture = requestReplyKafkaTemplate.sendAndReceive(record);
        SendResult<String, SaleRequestFactory> sendResult = replyFuture.getSendFuture().get(10, TimeUnit.SECONDS);
        ConsumerRecord<String, SaleResponseFactory> consumerRecord = replyFuture.get(10, TimeUnit.SECONDS);


        SaleResponseFactory value = consumerRecord.value();
        System.out.println("!!!!!!!!!!!! " + value.getUnique_id());
    }
}

소비자:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {

    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;

    private String groupId = "test";

    @Bean
    public ConsumerFactory<String, SaleResponseFactory> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, SaleResponseFactoryDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, SaleResponseFactory> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

개체 받기

@Component
public class ProcessingSaleListener {

    private static String topic = "tp-sale";

    @KafkaListener(topics = "tp-sale")
    public SaleResponseFactory process(@Payload SaleRequestFactory tf, @Headers MessageHeaders headers) throws Exception {

        System.out.println(tf.getId());

        SaleResponseFactory resObj = new SaleResponseFactory();
        resObj.setUnique_id("123123");

        return resObj;
    }
}

사용자 지정 개체

import java.io.Serializable;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleRequestFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;
    
    private int id;

}

직렬 변환기

import org.apache.kafka.common.serialization.Serializer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectOutputStream;
import java.io.Serializable;

public class SaleRequestFactorySerializer implements Serializable, Serializer<SaleRequestFactory> {

    @Override
    public byte[] serialize(String topic, SaleRequestFactory data)
    {
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        try
        {
            ObjectOutputStream outputStream = new ObjectOutputStream(out);
            outputStream.writeObject(data);
            out.close();
        }
        catch (IOException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return out.toByteArray();
    }
}

응답 개체

import java.io.Serializable;
import java.time.LocalDateTime;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@Builder(toBuilder = true)
public class SaleResponseFactory implements Serializable {

    private static final long serialVersionUID = 1744050117179344127L;

    private String unique_id;
}

응답 클래스

import org.apache.kafka.common.serialization.Deserializer;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;

public class SaleResponseFactoryDeserializer implements Serializable, Deserializer<SaleRequestFactory> {

    @Override
    public SaleRequestFactory deserialize(String topic, byte[] data)
    {
        SaleRequestFactory saleRequestFactory = null;
        try
        {
            ByteArrayInputStream bis = new ByteArrayInputStream(data);
            ObjectInputStream in = new ObjectInputStream(bis);
            saleRequestFactory = (SaleRequestFactory) in.readObject();
            in.close();
        }
        catch (IOException | ClassNotFoundException e)
        {
            throw new RuntimeException("Unhandled", e);
        }
        return saleRequestFactory;
    }
}
객체 유형에 따라 다른 직렬화 된 Java 객체를 보내고 받고 싶습니다. 예를 들어 때때로

SaleRequestFactory

및 수신

SaleResponseFactory

또는 송신

AuthRequestFactory

및 수신

AuthResponseFactory

. 하나의 주제를 사용하여 다른 Java Obects를 보내고받을 수 있습니까?

Full example code

2. 해결방안:

사용 Object값 유형으로 - 여기 부트의 자동 구성 인프라 bean을 사용하는 예입니다 ...

@SpringBootApplication
public class So65866763Application {

    public static void main(String[] args) {
        SpringApplication.run(So65866763Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, Object> template) {
        return args -> {
            template.send("so65866763", new Foo());
            template.send("so65866763", new Bar());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("so65866763").partitions(1).replicas(1).build();
    }

}

class Foo implements Serializable {

}

class Bar implements Serializable {

}

@Component
@KafkaListener(id = "so65866763", topics = "so65866763")
class Listener {

    @KafkaHandler
    void fooListener(Foo foo) {
        System.out.println("In fooListener: " + foo);
    }

    @KafkaHandler
    void barListener(Bar bar) {
        System.out.println("In barListener: " + bar);
    }

}
public class JavaSerializer implements Serializer<Object> {

    @Override
    public byte[] serialize(String topic, Object data) {
        return null;
    }

    @Override
    public byte[] serialize(String topic, Headers headers, Object data) {
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        try (ObjectOutputStream oos = new ObjectOutputStream(baos)) {
            oos.writeObject(data);
            return baos.toByteArray();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

}
public class JavaDeserializer implements Deserializer<Object> {

    @Override
    public Object deserialize(String topic, byte[] data) {
        return null;
    }

    @Override
    public Object deserialize(String topic, Headers headers, byte[] data) {
        ByteArrayInputStream bais = new ByteArrayInputStream(data);
        try (ObjectInputStream ois = new ObjectInputStream(bais)) {
            return ois.readObject();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        catch (ClassNotFoundException e) {
            throw new IllegalStateException(e);
        }
    }

}
spring.kafka.consumer.auto-offset-reset=earliest

spring.kafka.producer.value-serializer=com.example.demo.JavaSerializer
spring.kafka.consumer.value-deserializer=com.example.demo.JavaDeserializer
In fooListener: com.example.demo.Foo@331ca660
In barListener: com.example.demo.Bar@26f54288
65866763
반응형