网站建设jw100高粱seo博客
使用Flink做流数据处理时,除了主流数据输出,还自定义侧流输出即旁路输出,以实现灵活的数据拆分。
定义旁路输出标签
首先需要定义一个OutputTag,代码如下:
// 这需要是一个匿名的内部类,以便我们分析类型
OutputTag<String> outputTag = new OutputTag<String>("side-output") {};
在ProcessFunction使用Context调用
可以通过以下Function中,将outputTag作为参数传递到Context中
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
代码示例:
DataStream<Integer> input = ...;final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};SingleOutputStreamOperator<Integer> mainDataStream = input.process(new ProcessFunction<Integer, Integer>() {@Overridepublic void processElement(Integer value,Context ctx,Collector<Integer> out) throws Exception {// 发送数据到主要的输出out.collect(value);// 发送数据到旁路输出ctx.output(outputTag, "sideout-" + String.valueOf(value));}});
在 DataStream 运算结果上使用 getSideOutput(OutputTag) 方法获取旁路输出流:
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Integer> mainDataStream = ...;
// 获取到侧流输出DataStream,输出结果类型要与outputTag 定义的一致
DataStream<String> sideOutputStream = mainDataStream.getSideOutput(outputTag);
本文中只列出了Java代码的实现;
Flink官网还有Scala/python代码实现
参考链接:https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/dev/datastream/side_output/