redis 스트림의 최신 메시지를 항상 보여주는 엔드 포인트를 작성하고 싶습니다 (반응성).
엔티티는 다음과 같습니다 {'key' : 'some_key', 'status' : 'some_string'}
.
따라서 다음과 같은 결과를 얻고 싶습니다.
- 페이지가 호출되면 콘텐츠는 예를 들어 엔티티를 표시합니다.
{'key' : 'abc', 'status' : 'status_A'}
페이지가 닫히지 않았습니다
- 그런 다음 새 엔티티가 스트림에 추가됩니다.
XADD mystream * key abc status statusB
- 이제 탭을 업데이트하지 않고 스트림의 각 항목을보고 싶습니다.
{'key' : 'abc', 'status' : 'status_A'}
{'key' : 'abc', 'status' : 'status_B'}
이 동작을 조롱하려고 할 때 작동하고 예상되는 출력을 얻습니다.
@GetMapping(value="/light/live/mock", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Light> liveLightMock() {
List<Light> test = Arrays.asList(new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"),
new Light("key", "on") , new Light("key", "off"));
return Flux.fromIterable(test).delayElements(Duration.ofMillis(500));
}
List의 개별 요소는 항목 간 500ms 지연으로 차례로 표시됩니다.
그러나 모의 변형 대신 Redis에 액세스하려고하면 더 이상 작동하지 않습니다. 부분 기능을 연속적으로 테스트하려고합니다. 내 아이디어가 먼저 작동하려면 저장 (1) 기능이 작동해야하고, 저장 기능이 작동하면 재 활성화 기능이없는 오래된 레코드를 표시해야하고 (2) 마지막으로 작동해야하지만 적어도 둘 다 작동하면 재 활성화 부분을 가져와야합니다.
아마 당신들은 내가 반응 부품이 작동하도록 도울 수 있습니다. 나는 개선되지 않고 며칠 동안 일하고 있습니다.
Ty들 :)
테스트 1)-저장 기능 (짧은 버전)
작동하는 것처럼 보입니다.
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
테스트 2)-로딩 / 읽기 기능 (짧은 버전)
작동하는 것 같지만 reaktiv는 아닙니다-> WebView가 열려있는 동안 새 항목을 추가하고 View에 모든 항목이 표시되었지만 새 항목을 추가 한 후에는 업데이트되지 않았습니다. 재 장전 후 나는 모든 항목을 보았다
스트림을 구독하는 것과 getLights
작동하는 것을 반환하려면 어떻게 TEXT_EVENT_STREAM_VALUE
해야합니까?
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
테스트 1)-저장 기능 (Long Version)
끝점 및 저장 기능은 다른 클래스의 일부입니다. String status = (++statusIdx % 2 == 0) ? "on" : "off";
플립은 상태를 켜짐에서 꺼짐, 켜짐, 꺼짐, ...
@GetMapping(value="/light/create", produces = MediaType.APPLICATION_JSON_VALUE)
@ResponseBody
public Flux<Light> createTestLight() {
String status = (++statusIdx % 2 == 0) ? "on" : "off";
Light light = new Light(Consts.LIGHT_ID, status);
return LightRepository.save(light).flux();
}
@Override
public Mono<Light> save(Light light) {
Map<String, String> lightMap = new HashMap<>();
lightMap.put("key", light.getKey());
lightMap.put("status", light.getStatus());
return operations.opsForStream(redisSerializationContext)
.add("mystream", lightMap)
.map(__ -> light);
}
기능을 검증하려면 i
- 스트림을 비우고 비우기 위해
127.0.0.1:6379> del mystream
(integer) 1
127.0.0.1:6379> XLEN myStream
(integer) 0
생성 엔드 포인트를 두 번 호출했습니다. /light/create
이제 스트림에 두 개의 항목이있을 것으로 예상했습니다. 상태는 켜진 상태이고 하나는 꺼짐 상태입니다.
127.0.0.1:6379> XLEN mystream
(integer) 2
127.0.0.1:6379> xread STREAMS mystream 0-0
1) 1) "mystream"
2) 1) 1) "1610456865517-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "off"
2) 1) "1610456866708-0"
2) 1) "key"
2) "light_1"
3) "status"
4) "on"
저장 부분이 작동중인 것 같습니다.
테스트 2)-로딩 / 읽기 기능 (긴 버전)
작동하는 것 같지만 reaktiv가 아닙니다-> 새 엔티티를 추가하고 페이지가 값을 업데이트합니다.
@Override
public Flux<Object> getLights() {
ReadOffset readOffset = ReadOffset.from("0");
StreamOffset<String> offset = StreamOffset.fromStart("mystream"); //fromStart or Latest
Function<? super MapRecord<String, Object, Object>, ? extends Publisher<?>> mapFunc = entries -> {
Map<Object, Object> kvp = entries.getValue();
String key = (String) kvp.get("key");
String status = (String) kvp.get("status");
Light light = new Light(key, status);
return Flux.just(light);
};
return operations.opsForStream()
.read(offset)
.flatMap(mapFunc);
}
@GetMapping(value="/light/live", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
@ResponseBody
public Flux<Object> lightLive() {
return LightRepository.getLights();
}
- 호출
/light/live
->N
항목 이 있어야 함 -> 항목을 볼 수 있으면 정상적인 디스플레이가 작동하는 것입니다 (비 반응성). /light/create
두 번 호출 -> 라이브 몇 개 항목을 추가해야 함->N+2
항목- 안전을 위해 1 분 대기
- 보기
N+2
에 Reactiv 부품이 작동하는 항목 이 표시되어야합니다. - 1에서보기 새로 고침 (
/light/live
), Reactiv가 작동하는 경우에도 동일한 양이 표시되어야합니다.
정보 표시 작업 (1), (2)의 추가 부분 작동, 터미널별로 확인, 4) 작동하지 않음
디스플레이가 작동하지만 반응하지 않습니다.
브라우저를 새로 고친 후 (5) 예상 N+2
항목을 얻었 으므로 (2)도 잘 작동했습니다.