Spring Cloud Gateway:如何修改SSE Flux流

huangapple 未分类评论74阅读模式
英文:

Spring Cloud Gateway: How to modify SSE Flux Stream

问题

我正在尝试实现以下内容:

  1. 拥有一个具有响应 SSE 事件流的服务 A 的方法。
  2. 拥有覆盖实际服务的网关。
  3. 目标是转发请求,然后拦截、修改每个事件并将修改后的响应转发回客户端。

我实现了以下 AbstractGatewayFilterFactory,旨在通过附加简单字符串来修改任何响应体,但失败了。

@Component
public class TaskQueueFilter extends AbstractGatewayFilterFactory<TaskQueueFilter.Config> {

    private final Logger LOG = LoggerFactory.getLogger(TaskQueueFilter.class);

    public TaskQueueFilter() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -> {
            ServerHttpResponse response = exchange.getResponse();
            DataBufferFactory dataBufferFactory = response.bufferFactory();

            ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {

                @Override
                public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
                    if (body instanceof Flux) {
                        Flux<? extends DataBuffer> flux = (Flux<? extends DataBuffer>) body;

                        return super.writeWith(flux.buffer().map(dataBuffers -> {
                            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                            dataBuffers.forEach(i -> {
                                byte[] array = new byte[i.readableByteCount()];
                                i.read(array);
                                try {
                                    outputStream.write(array);
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            });
                            try {
                                outputStream.write("0123456789abcdefg".getBytes());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            return dataBufferFactory.wrap(outputStream.toByteArray());
                        }));
                    }
                    return super.writeWith(body);
                }
            };

            ServerWebExchange swe = exchange.mutate().response(decoratedResponse).build();
            return chain.filter(swe);
        };
    }

    public static class Config {

    }
}

我已经尝试过 ModifyResponseBody,但也不起作用。
如果有任何建议,将不胜感激!

英文:

I am trying to achieve the following:

  1. Having service A with a method that responds with SSE Event stream.
  2. Having a gateway that covers the actual service.
  3. The objective is to forward request, then intercept, modify each event and forward modified response back to client

I implemented folowing AbstractGatewayFilterFactory that suposed to modify any response body by attaching a simple string, but failed.

@Component
public class TaskQueueFilter extends AbstractGatewayFilterFactory&lt;TaskQueueFilter.Config&gt; {

    private final Logger LOG = LoggerFactory.getLogger(TaskQueueFilter.class);


    public TaskQueueFilter() {
        super(Config.class);
    }

    @Override
    public GatewayFilter apply(Config config) {
        return (exchange, chain) -&gt; {
            ServerHttpResponse response = exchange.getResponse();
            DataBufferFactory dataBufferFactory = response.bufferFactory();

            ServerHttpResponseDecorator decoratedResponse = new ServerHttpResponseDecorator(response) {

                @Override
                public Mono&lt;Void&gt; writeWith(Publisher&lt;? extends DataBuffer&gt; body) {
                    if (body instanceof Flux) {
                        Flux&lt;? extends DataBuffer&gt; flux = (Flux&lt;? extends DataBuffer&gt;) body;

                        return super.writeWith(flux.buffer().map(dataBuffers -&gt; {
                            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
                            dataBuffers.forEach(i -&gt; {
                                byte[] array = new byte[i.readableByteCount()];
                                i.read(array);
                                try {
                                    outputStream.write(array);
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            });
                            try {
                                outputStream.write(&quot;0123456789abcdefg&quot;.getBytes());
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                            return dataBufferFactory.wrap(outputStream.toByteArray());
                        }));
                    }
                    return super.writeWith(body);
                }
            };

            ServerWebExchange swe = exchange.mutate().response(decoratedResponse).build();
            return chain.filter(swe);
        };
    }


    public static class Config {

    }
}

I have tried ModifyResponseBody, but it was not working as well.
Any suggestions would be appreciated!

huangapple
  • 本文由 发表于 2020年8月14日 20:27:56
  • 转载请务必保留本文链接:https://java.coder-hub.com/63412761.html
匿名

发表评论

匿名网友

:?: :razz: :sad: :evil: :!: :smile: :oops: :grin: :eek: :shock: :???: :cool: :lol: :mad: :twisted: :roll: :wink: :idea: :arrow: :neutral: :cry: :mrgreen:

确定