当前位置: 移动技术网 > 科技>人工智能>物联网 > Flink CEP 入门第一步

Flink CEP 入门第一步

2020年11月25日  | 移动技术网科技  | 我要评论
Flink CEP 入门第一步CEP 介绍场景描述需求分析主要逻辑源码分享测试结果参考地址CEP 介绍CEP(Complex Event Process) 直译过来就是复杂事件处理,具体到底有多复杂呢?基本上在时序数据场景中要监控某一些有特征的信息,可以理解为复杂事件。Flink 1.0 以后就开始支持复杂事件处理,可以在项目中通过引用 flink-cep.jar 包来做复杂事件处理。场景描述监控物联网中某设备上送到时序数据,对发现某设备的温度在10秒钟内有两次超过100度时,产生一个报警记录

CEP 介绍

CEP(Complex Event Process) 直译过来就是复杂事件处理,具体到底有多复杂呢?基本上在时序数据场景中要监控某一些有特征的信息,可以理解为复杂事件。

Flink 1.0 以后就开始支持复杂事件处理,可以在项目中通过引用 flink-cep.jar 包来做复杂事件处理。

场景描述

  1. 监控物联网中某设备上送到时序数据,对发现某设备的温度在10秒钟内有两次超过100度时,产生一个报警记录;

  2. 对产生的报警信息进行二次监控,如果发现报警信息中两条信息的平均值在20秒内有递增的情况,要发出一条通知消息。

需求分析

  1. 这个需求场景针对的是某一设备的数据进行的报警分析,肯定要针对设备有一个唯一的设备标识

  2. 需求1中要监控设备上送的原始数据,对发现超过100度的数据时刻为开始,在接下来的10秒内如果又有一个超过100度的数据,则产生一个报警记录

  3. 需求2中要监控产生的报警数据的平均值,以20秒为间隔,如果平均值有递增的情况,发出一条通知消息

主要逻辑

  1. 定义 报警记录的匹配模式 Pattern.beging(“first”).where(value > 100).next(“second”).where(value > 100).within(10s)

  2. 对时序数据进行报警模式匹配,并按照设备标识进行分组,形成patternStream

  3. 对匹配到的报警数据做平均值计算,patternStream.select((map<String, List> element) ->{return (“first”.value+"second".value)/2})

  4. 定义 发送通知消息的匹配模式 Pattern.beging(“first”).next(“second”).within(20s)

  5. 对报警记录实行消息4定义的模式匹配,并按照设备标识进行分组,形成patternStream

  6. 对匹配到的数据进行逻辑分析,如果有平均值递增的情况,就发送通知信息

源码分享

    // Warning pattern: Two consecutive temperature events whose temperature is higher than the given threshold
    // appearing within a time interval of 10 seconds
    Pattern<MonitoringEvent, ?> warningPattern = Pattern.<MonitoringEvent>begin("first")
            .subtype(TemperatureEvent.class)
            .where(new IterativeCondition<TemperatureEvent>() {
                private static final long serialVersionUID = -6301755149429716724L;

                @Override
                public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                        return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                }
            })
            .next("second")
            .subtype(TemperatureEvent.class)
            .where(new IterativeCondition<TemperatureEvent>() {
                private static final long serialVersionUID = 2392863109523984059L;

                @Override
                public boolean filter(TemperatureEvent value, Context<TemperatureEvent> ctx) throws Exception {
                    return value.getTemperature() >= TEMPERATURE_THRESHOLD;
                }
            })
            .within(Time.seconds(10));

    // Create a pattern stream from our warning pattern
    PatternStream<MonitoringEvent> tempPatternStream = CEP.pattern(
            inputEventStream.keyBy("rackID"),
            warningPattern);

    // Generate temperature warnings for each matched warning pattern
    DataStream<TemperatureWarning> warnings = tempPatternStream.select(
        (Map<String, List<MonitoringEvent>> pattern) -> {
            TemperatureEvent first = (TemperatureEvent) pattern.get("first").get(0);
            TemperatureEvent second = (TemperatureEvent) pattern.get("second").get(0);

            return new TemperatureWarning(first.getRackID(), (first.getTemperature() + second.getTemperature()) / 2);
        }
    );

    // Alert pattern: Two consecutive temperature warnings appearing within a time interval of 20 seconds
    Pattern<TemperatureWarning, ?> alertPattern = Pattern.<TemperatureWarning>begin("first")
            .next("second")
            .within(Time.seconds(20));

    // Create a pattern stream from our alert pattern
    PatternStream<TemperatureWarning> alertPatternStream = CEP.pattern(
            warnings.keyBy("rackID"),
            alertPattern);

    // Generate a temperature alert only if the second temperature warning's average temperature is higher than
    // first warning's temperature
    DataStream<TemperatureAlert> alerts = alertPatternStream.flatSelect(
        (Map<String, List<TemperatureWarning>> pattern, Collector<TemperatureAlert> out) -> {
            TemperatureWarning first = pattern.get("first").get(0);
            TemperatureWarning second = pattern.get("second").get(0);

            if (first.getAverageTemperature() < second.getAverageTemperature()) {
                out.collect(new TemperatureAlert(first.getRackID()));
            }
        },
        TypeInformation.of(TemperatureAlert.class));

    // Print the warning and alert events to stdout
    warnings.print();
    alerts.print();

测试结果

2020-11-25 17:47:35:87  emit data: TemperatureEvent(5, 62.16286476572637)
2020-11-25 17:47:36:17  emit data: TemperatureEvent(5, 76.30725109897276)
2020-11-25 17:47:36:47  emit data: TemperatureEvent(5, 107.76770951345256)
2020-11-25 17:47:36:77  emit data: PowerEvent(3, 106.68654516819112)
2020-11-25 17:47:37:07  emit data: TemperatureEvent(3, 80.30137883422269)
2020-11-25 17:47:37:37  emit data: TemperatureEvent(4, 47.77738055334537)
2020-11-25 17:47:37:67  emit data: PowerEvent(7, 74.51323501555572)
2020-11-25 17:47:37:97  emit data: PowerEvent(9, 114.84340930132853)
2020-11-25 17:47:38:27  emit data: PowerEvent(3, 99.01726074676452)
2020-11-25 17:47:38:57  emit data: TemperatureEvent(7, 65.44951371049709)
2020-11-25 17:47:38:87  emit data: TemperatureEvent(8, 97.25405709950533)
2020-11-25 17:47:39:17  emit data: PowerEvent(9, 103.84754449954623)
2020-11-25 17:47:39:47  emit data: PowerEvent(4, 94.60893342305071)
2020-11-25 17:47:39:77  emit data: TemperatureEvent(2, 78.77024073146522)
2020-11-25 17:47:40:07  emit data: TemperatureEvent(2, 73.52434875689119)
2020-11-25 17:47:40:37  emit data: PowerEvent(4, 86.76448452660071)
2020-11-25 17:47:40:67  emit data: TemperatureEvent(8, 101.17490865381467)
2020-11-25 17:47:40:97  emit data: TemperatureEvent(5, 101.49599620863171)
TemperatureWarning(5, 104.63185286104213)
2020-11-25 17:47:41:27  emit data: TemperatureEvent(5, 115.554622385676)
TemperatureWarning(5, 108.52530929715385)
TemperatureAlert(5)
2020-11-25 17:47:41:57  emit data: TemperatureEvent(3, 72.9746590530073)
2020-11-25 17:47:41:88  emit data: PowerEvent(4, 102.61427127744209)

参考地址

源码可参考https://github.com/tillrohrmann/cep-monitoring.git,我这里对其进行了简单的修改。

本文地址:https://blog.csdn.net/wowSpark/article/details/110142419

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网