Spring

자바-스프링 부트-반응 형 Redis 스트림 (TEXT_EVENT_STREAM_VALUE)

기록만이살길 2021. 3. 13. 03:07
반응형

자바-스프링 부트-반응 형 Redis 스트림 (TEXT_EVENT_STREAM_VALUE)

1. 질문(문제점):

redis 스트림의 최신 메시지를 항상 보여주는 엔드 포인트를 작성하고 싶습니다 (반응성).


엔티티는 다음과 같습니다 {'key' : 'some_key', 'status' : 'some_string'}.

따라서 다음과 같은 결과를 얻고 싶습니다.

  1. 페이지가 호출되면 콘텐츠는 예를 들어 엔티티를 표시합니다.
{'key' : 'abc', 'status' : 'status_A'}

페이지가 닫히지 않았습니다

  1. 그런 다음 새 엔티티가 스트림에 추가됩니다.
XADD mystream * key abc status statusB
  1. 이제 탭을 업데이트하지 않고 스트림의 각 항목을보고 싶습니다.
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}

이 동작을 조롱하려고 할 때 작동하고 예상되는 출력을 얻습니다.
    @GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Light> liveLightMock() {
        List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"),
                new Light("key", "on") , new Light("key", "off"));
        return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
    }

List의 개별 요소는 항목 간 500ms 지연으로 차례로 표시됩니다.

그러나 모의 변형 대신 Redis에 액세스하려고하면 더 이상 작동하지 않습니다. 부분 기능을 연속적으로 테스트하려고합니다. 내 아이디어가 먼저 작동하려면 저장 (1) 기능이 작동해야하고, 저장 기능이 작동하면 재 활성화 기능이없는 오래된 레코드를 표시해야하고 (2) 마지막으로 작동해야하지만 적어도 둘 다 작동하면 재 활성화 부분을 가져와야합니다.

아마 당신들은 내가 반응 부품이 작동하도록 도울 수 있습니다. 나는 개선되지 않고 며칠 동안 일하고 있습니다.

Ty들 :)

테스트 1)-저장 기능 (짧은 버전)

작동하는 것처럼 보입니다.

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public Flux<Light> createTestLight() {
        String status = (++statusIdx % 2 == 0) ? "on" : "off";
        Light light = new Light(Consts.LIGHT_ID, status);
        return LightRepository.save(light).flux();
    }
    @Override
    public Mono<Light> save(Light light) {
        Map<String, String> lightMap = new HashMap<>();
        lightMap.put("key", light.getKey());
        lightMap.put("status", light.getStatus());

        return operations.opsForStream(redisSerializationContext)
                .add("mystream", lightMap)
                .map(__ -> light);
    }

테스트 2)-로딩 / 읽기 기능 (짧은 버전)

작동하는 것 같지만 reaktiv는 아닙니다-> WebView가 열려있는 동안 새 항목을 추가하고 View에 모든 항목이 표시되었지만 새 항목을 추가 한 후에는 업데이트되지 않았습니다. 재 장전 후 나는 모든 항목을 보았다

스트림을 구독하는 것과 getLights작동하는 것을 반환하려면 어떻게 TEXT_EVENT_STREAM_VALUE해야합니까?

    @Override
    public Flux<Object> getLights() {
        ReadOffset readOffset = ReadOffset.from("0");
        StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

        Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
            Map<Object, Object> kvp = entries.getValue();
            String key = (String) kvp.get("key");
            String status = (String) kvp.get("status");
            Light light = new Light(key, status);
            return Flux.just(light);
        };

        return operations.opsForStream()
                .read(offset)
                .flatMap(mapFunc);
    }
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Object> lightLive() {
        return LightRepository.getLights();
    }

테스트 1)-저장 기능 (Long Version)

끝점 및 저장 기능은 다른 클래스의 일부입니다. String status = (++statusIdx % 2 == 0) ? "on" : "off";플립은 상태를 켜짐에서 꺼짐, 켜짐, 꺼짐, ...

    @GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
    @ResponseBody
    public Flux<Light> createTestLight() {
        String status = (++statusIdx % 2 == 0) ? "on" : "off";
        Light light = new Light(Consts.LIGHT_ID, status);
        return LightRepository.save(light).flux();
    }
    @Override
    public Mono<Light> save(Light light) {
        Map<String, String> lightMap = new HashMap<>();
        lightMap.put("key", light.getKey());
        lightMap.put("status", light.getStatus());

        return operations.opsForStream(redisSerializationContext)
                .add("mystream", lightMap)
                .map(__ -> light);
    }

기능을 검증하려면 i

  1. 스트림을 비우고 비우기 위해
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0

생성 엔드 포인트를 두 번 호출했습니다. /light/create 이제 스트림에 두 개의 항목이있을 것으로 예상했습니다. 상태는 켜진 상태이고 하나는 꺼짐 상태입니다.

127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
   2) 1) 1) "1610456865517-0"
         2) 1) "key"
            2) "light_1"
            3) "status"
            4) "off"
      2) 1) "1610456866708-0"
         2) 1) "key"
            2) "light_1"
            3) "status"
            4) "on"

저장 부분이 작동중인 것 같습니다.

테스트 2)-로딩 / 읽기 기능 (긴 버전)

작동하는 것 같지만 reaktiv가 아닙니다-> 새 엔티티를 추가하고 페이지가 값을 업데이트합니다.

    @Override
    public Flux<Object> getLights() {
        ReadOffset readOffset = ReadOffset.from("0");
        StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest

        Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
            Map<Object, Object> kvp = entries.getValue();
            String key = (String) kvp.get("key");
            String status = (String) kvp.get("status");
            Light light = new Light(key, status);
            return Flux.just(light);
        };

        return operations.opsForStream()
                .read(offset)
                .flatMap(mapFunc);
    }
    @GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    @ResponseBody
    public Flux<Object> lightLive() {
        return LightRepository.getLights();
    }
  1. 호출 /light/live-> N항목 이 있어야 함 -> 항목을 볼 수 있으면 정상적인 디스플레이가 작동하는 것입니다 (비 반응성).
  2. /light/create두 번 호출 -> 라이브 몇 개 항목을 추가해야 함-> N+2항목
  3. 안전을 위해 1 분 대기
  4. 보기 N+2에 Reactiv 부품이 작동하는 항목 이 표시되어야합니다.
  5. 1에서보기 새로 고침 ( /light/live), Reactiv가 작동하는 경우에도 동일한 양이 표시되어야합니다.

정보 표시 작업 (1), (2)의 추가 부분 작동, 터미널별로 확인, 4) 작동하지 않음

디스플레이가 작동하지만 반응하지 않습니다.

브라우저를 새로 고친 후 (5) 예상 N+2항목을 얻었 으므로 (2)도 잘 작동했습니다.


2. 해결방안:

There's a misconception here, reading from Redis reactively does not mean you have subscribed for new events.

Reactive will not provide you live updates, it will call Redis once and it will display whatever is there. So even if you wait for a day or two nothing is going to change in UI/Console, you will still seeing N entries.

You need to either use Redis PUB/SUB or you need to call Redis repetitively to get the latest update.

EDIT:

A working solution..

  private List<Light> reactiveReadToList() {
    log.info("reactiveReadToList");
    return read().collectList().block();
  }

  private Flux<Light> read() {
    StreamOffset<Object> offset = StreamOffset.fromStart("mystream");
    return redisTemplate
        .opsForStream()
        .read(offset)
        .flatMap(
            e -> {
              Map<Object, Object> kvp = e.getValue();
              String key = (String) kvp.get("key");
              String id = (String) kvp.get("id");
              String status = (String) kvp.get("status");
              Light light = new Light(id, key, status);
              log.info("{}", light);
              return Flux.just(light);
            });
  }

A reader that reads data from Redis on demand using reactive template and send it to the client as it sees using offset, it sends only one event at once we can send all of them.

  @RequiredArgsConstructor
  class DataReader {
    @NonNull FluxSink<Light> sink;
    private List<Light> readLights = null;
    private int currentOffset = 0;
    void register() {
      readLights = reactiveReadToList();
      sink.onRequest(
          e -> {
            long demand = sink.requestedFromDownstream();
            for (int i = 0; i < demand && currentOffset < readLights.size(); i++, currentOffset++) {
              sink.next(readLights.get(currentOffset));
            }
            if (currentOffset == readLights.size()) {
              readLights = reactiveReadToList();
              currentOffset = 0;
            }
          });
    }
  }

A method that uses DataReader to generate flux

  public Flux<Light> getLights() {
    return Flux.create(e -> new DataReader(e).register());
  }

Now we've added an onRequest method on the sink to handle the client demand, this reads data from the Redis stream as required and sends it to the client.

This looks to be very CPU intensive maybe we should delay the calls if there're no more new events, maybe add a sleep call inside register method if we see there're not new elements in the stream.

65685159
반응형