Ch13_ProcessFunction
Yang Haoran 4/5/2021 FlinkJavaBigData
# ProcessFunction


案例1 :检测一段时间内的温度连续上升,输出报警
// 实现自定义处理函数,检测一段时间内的温度连续上升,输出报警
public static class TempConsIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String>{
// 定义私有属性,当前统计的时间间隔
private Integer interval;
public TempConsIncreWarning(Integer interval) {
this.interval = interval;
}
// 定义状态,保存上一次的温度值,定时器时间戳
private ValueState<Double> lastTempState;
private ValueState<Long> timerTsState;
@Override
public void open(Configuration parameters) throws Exception {
lastTempState = getRuntimeContext().getState(new ValueStateDescriptor<Double>("last-temp", Double.class, Double.MIN_VALUE));
timerTsState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer-ts", Long.class));
}
@Override
public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
// 取出状态
Double lastTemp = lastTempState.value();
Long timerTs = timerTsState.value();
// 如果温度上升并且没有定时器,注册10秒后的定时器,开始等待
if( value.getTemperature() > lastTemp && timerTs == null ){
// 计算出定时器时间戳
Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
ctx.timerService().registerProcessingTimeTimer(ts);
timerTsState.update(ts);
}
// 如果温度下降,那么删除定时器
else if( value.getTemperature() < lastTemp && timerTs != null ){
ctx.timerService().deleteProcessingTimeTimer(timerTs);
timerTsState.clear();
}
// 更新温度状态
lastTempState.update(value.getTemperature());
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// 定时器触发,输出报警信息
out.collect("传感器" + ctx.getCurrentKey().getField(0) + "温度值连续" + interval + "s上升");
timerTsState.clear();
}
@Override
public void close() throws Exception {
lastTempState.clear();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
案例2:实现高低温分流
// 定义一个OutputTag,用来表示侧输出流低温流
OutputTag<SensorReading> lowTempTag = new OutputTag<SensorReading>("lowTemp") {
};
// 测试ProcessFunction,自定义侧输出流实现分流操作
SingleOutputStreamOperator<SensorReading> highTempStream = dataStream.process(new ProcessFunction<SensorReading, SensorReading>() {
@Override
public void processElement(SensorReading value, Context ctx, Collector<SensorReading> out) throws Exception {
// 判断温度,大于30度,高温流输出到主流;小于低温流输出到侧输出流
if( value.getTemperature() > 30 ){
out.collect(value);
}
else {
ctx.output(lowTempTag, value);
}
}
});
highTempStream.print("high-temp");
highTempStream.getSideOutput(lowTempTag).print("low-temp");
env.execute();
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22