英文:
Spring Cloud Gateway: How to modify SSE Flux Stream
问题
我正在尝试实现以下内容:
- 拥有一个具有响应 SSE 事件流的服务 A 的方法。
- 拥有覆盖实际服务的网关。
- 目标是转发请求,然后拦截、修改每个事件并将修改后的响应转发回客户端。
我实现了以下 AbstractGatewayFilterFactory
,旨在通过附加简单字符串来修改任何响应体,但失败了。
@Component
public class TaskQueueFilter extends AbstractGatewayFilterFactory<TaskQueueFilter.Config> {
private final Logger LOG = LoggerFactory.getLogger(TaskQueueFilter.class);
public TaskQueueFilter() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory dataBufferFactory = response.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
return super.writeWith(flux.buffer().map(dataBuffers -> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dataBuffers.forEach(i -> {
byte[] array = new byte[i.readableByteCount()];
i.read(array);
try {
outputStream.write(array);
} catch (IOException e) {
e.printStackTrace();
}
});
try {
outputStream.write("0123456789abcdefg".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
return dataBufferFactory.wrap(outputStream.toByteArray());
}));
}
return super.writeWith(body);
}
};
ServerWebExchange swe = exchange.mutate().response(decoratedResponse).build();
return chain.filter(swe);
};
}
public static class Config {
}
}
我已经尝试过 ModifyResponseBody
,但也不起作用。
如果有任何建议,将不胜感激!
英文:
I am trying to achieve the following:
- Having service A with a method that responds with SSE Event stream.
- Having a gateway that covers the actual service.
- The objective is to forward request, then intercept, modify each event and forward modified response back to client
I implemented folowing AbstractGatewayFilterFactory
that suposed to modify any response body by attaching a simple string, but failed.
@Component
public class TaskQueueFilter extends AbstractGatewayFilterFactory<TaskQueueFilter.Config> {
private final Logger LOG = LoggerFactory.getLogger(TaskQueueFilter.class);
public TaskQueueFilter() {
super(Config.class);
}
@Override
public GatewayFilter apply(Config config) {
return (exchange, chain) -> {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory dataBufferFactory = response.bufferFactory();
ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;
return super.writeWith(flux.buffer().map(dataBuffers -> {
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
dataBuffers.forEach(i -> {
byte[] array = new byte[i.readableByteCount()];
i.read(array);
try {
outputStream.write(array);
} catch (IOException e) {
e.printStackTrace();
}
});
try {
outputStream.write("0123456789abcdefg".getBytes());
} catch (IOException e) {
e.printStackTrace();
}
return dataBufferFactory.wrap(outputStream.toByteArray());
}));
}
return super.writeWith(body);
}
};
ServerWebExchange swe = exchange.mutate().response(decoratedResponse).build();
return chain.filter(swe);
};
}
public static class Config {
}
}
I have tried ModifyResponseBody
, but it was not working as well.
Any suggestions would be appreciated!
专注分享java语言的经验与见解,让所有开发者获益!
评论