网创优客建站品牌官网
为成都网站建设公司企业提供高品质网站建设
热线:028-86922220
成都专业网站建设公司

定制建站费用3500元

符合中小企业对网站设计、功能常规化式的企业展示型网站建设

成都品牌网站建设

品牌网站建设费用6000元

本套餐主要针对企业品牌型网站、中高端设计、前端互动体验...

成都商城网站建设

商城网站建设费用8000元

商城网站建设因基本功能的需求不同费用上面也有很大的差别...

成都微信网站建设

手机微信网站建站3000元

手机微信网站开发、微信官网、微信商城网站...

建站知识

当前位置:首页 > 建站知识

大数据(9g)FlinkCEP-创新互联

文章目录
  • 概述
  • 示例代码
    • 环境和依赖
    • Java代码
      • 上面代码可改成下面

坚守“ 做人真诚 · 做事靠谱 · 口碑至上 · 高效敬业 ”的价值观,专业网站建设服务10余年为成都成都墙体彩绘小微创业公司专业提供企业网站建设营销网站建设商城网站建设手机网站建设小程序网站建设网站改版,从内容策划、视觉设计、底层架构、网页布局、功能开发迭代于一体的高端网站建设服务。
概述
  • CEP
    Complex Event Processing:复合事件处理
    通过分析事件间的关系,从事件流中查询出符合要求的事件序列
  • 例如【切菜=>洗菜=>炒菜】3个事件按时间序串联,是正常的事件流
    当发现【切菜=>炒菜】忽略洗菜的事件流,可认为是异常事件
示例代码 环境和依赖

WIN10+JDK1.8+IDEA2021+Maven3.6.3
CEP额外依赖为flink-cep

881.14.62.121.18.24org.apache.flinkflink-java${flink.version}org.apache.flinkflink-streaming-java_${scala.binary.version}${flink.version}org.apache.flinkflink-clients_${scala.binary.version}${flink.version}org.apache.flinkflink-runtime-web_${scala.binary.version}${flink.version}org.apache.flinkflink-cep_${scala.binary.version}${flink.version}
Java代码

监测 严格近邻的连续三次a的事件流

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class CepPractice {public static void main(String[] args) throws Exception {//创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //添加数据源,确定水位线策略
        SingleOutputStreamOperatord = env.fromElements("c", "a", "a", "a", "a", "b", "a", "a")
                .assignTimestampsAndWatermarks(WatermarkStrategy.forMonotonousTimestamps()
                        .withTimestampAssigner((element, recordTimestamp) ->1L));
        //定义模式
        Patternp = Pattern
                .begin("first")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                })
                .next("second")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                })
                .next("third")
                .where(new SimpleCondition() {@Override
                    public boolean filter(String value) {return value.equals("a");
                    }
                });
        //在流上匹配模型
        PatternStreampatternStream = CEP.pattern(d, p);
        //使用select方法将匹配到的事件流取出
        patternStream.select((PatternSelectFunction) map ->{//Map的key是事件名称(上面的first、second和third)
            //Map的key对应的value是列表,储存匹配到的事件
            String first = map.get("first").toString();
            String second = map.get("second").toString();
            String third = map.get("third").toString();
            return first + "->" + second + "->" + third;
        }).print();
        //执行
        env.execute();
    }
}

打印结果

[a]->[a]->[a]
[a]->[a]->[a]
上面代码可改成下面

留意.times(3).consecutive()

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.cep.CEP;
import org.apache.flink.cep.PatternSelectFunction;
import org.apache.flink.cep.PatternStream;
import org.apache.flink.cep.pattern.Pattern;
import org.apache.flink.cep.pattern.conditions.SimpleCondition;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.List;

public class CepPractice2 {public static void main(String[] args) throws Exception {//创建环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment().setParallelism(1);
        //添加数据源,确定水位线策略
        SingleOutputStreamOperator>d = env.fromElements(
                Tuple2.of("a", 1000L), Tuple2.of("a", 2000L), Tuple2.of("a", 3000L),
                Tuple2.of("a", 4000L), Tuple2.of("b", 5000L), Tuple2.of("a", 6000L))
                .assignTimestampsAndWatermarks(WatermarkStrategy.>forMonotonousTimestamps()
                        .withTimestampAssigner((element, recordTimestamp) ->element.f1));
        //定义模式
        Pattern, Tuple2>p = Pattern
                .>begin("=a")
                .where(new SimpleCondition>() {@Override
                    public boolean filter(Tuple2value) {return value.f0.equals("a");
                    }
                })
                .times(3)
                .consecutive(); //严格连续
        //在流上匹配模型
        PatternStream>patternStream = CEP.pattern(d, p);
        //使用select方法将匹配到的事件流取出
        patternStream.select((PatternSelectFunction, String>) map ->{//Map的key是事件名称(上面的first、second和third)
            //Map的key对应的value是列表,储存匹配到的事件
            List>ls = map.get("=a");
            String first = ls.get(0).f0;
            String second = ls.get(1).f0;
            String third = ls.get(2).f0;
            return String.join("=>", first, second, third);
        }).print();
        //执行
        env.execute();
    }
}

你是否还在寻找稳定的海外服务器提供商?创新互联www.cdcxhl.cn海外机房具备T级流量清洗系统配攻击溯源,准确流量调度确保服务器高可用性,企业级服务器适合批量采购,新人活动首月15元起,快前往官网查看详情吧


网站栏目:大数据(9g)FlinkCEP-创新互联
分享URL:http://bjjierui.cn/article/dgicho.html

其他资讯