Flink 学习笔记十三

2017/07/05 Flink

Flink2 学习笔记十三

Event Streams

CEP的重要组件之一就是输入流, 我们需要定义一个Java POJO类来处理消息。在定义Java POJO类的时候,需要保证实现了hashCode()和equals()方法。 Java:

class TemEvent(){...}

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<TemEvent> inputEventStream=env.fromElements(new TemEvent("aaa",21),new TemEvent("bbb",22),new TemEvent("ccc",23));

Scala:

val env:StreamExecutionEnvironment=StreamExecutionEnvironment.getExecutionEnvironment
val input:DataStream[TemEvent]=env.fromElements(new TemEvent("aaa",1),new TemEvent("bbb",2))

Pattern API

模式API允许定义事件模式,每一个模式都包含多个状态。从一个状态到另一个状态,需要定义条件,条件可以连续或者是对消息进行过滤。 图一

起点

Java:

Pattern<Event,?> start=Pattern.<Event>begin("start")

Scala:

val start:Pattern[Event,_]= Pattern.begin("start")

过滤器

Java:

start.where(new FilterFunction<Event>(){
    @Override
    public boolean filter(Event value){
        return ...//condition
    }
})

Scala:

start.where(event=> ... /*condition*/)

子类型

我们可以通过消息的子类型来过滤,使用subtype()方法: Java:

start.subType(SubEvent.class).where(new FilterFunction<SubEvent>(){
    @Override
    public boolean filter(SubEvent value){
        return ...//condition
    }
})

Scala:

start.subType(ClassOf[SubEvent]).where(subEvent=> ... /*condition*/)

基本逻辑关系

可以使用OR和AND操作符。 Java:

pattern.where(new FilterFunction<Event>(){
    @Override
    public boolean filter(Event value){
        return ...//condition
    }
}).or(new FilterFunction<Event>(){
    @Override
    public boolean filter(Event event){
        return ...//or condiciton
    }
})

Scala:

pattern.where(event=>/*condition*/).or(event=>/*condition*/)

连续性(Continuity)

连续性有两种,一种是严格连续,一种是不严格连续。

严格连续

严格连续需要两个事件直接连续,这两个事件中没有其它事件。这种Pattern使用next()。 Java:

Pattern<Event,?> strictNext=start.next("middle")l

Scala:

val strictNext:Pattern[Event,_] = start.next("middle")

非严格连续

可以两个事件中有其它事件,这种Pattern通过followedBy()定义。 Java:

Pattern<Event,?> nonStrictNext=start.followdBy("middle");

Scala:

val nonStrictNext:Pattern[Event,_]=start.followedBy("middle")

时间范围(Within)

Pattern API 同样允许根据时间范围进行模式发现。 Java:

next.within(Time.seconds(30));

Scala:

next.within(Time.seconds(30))

模式发现

发现流中的模式,使用CEP.pattern()这个函数返回PatternStream。 如下是一个如何发现Pattern的示例: Java:

Pattern<Temperature,?> warningPattern=Pattern.<TemEvent> begin("first").subType(TemEvent.class).where(new FilterFunction<TemEvent>(){
    public boolean filter(TemEvent value){
        if(value.getTem()>=26.0){
            return true;
        }
        return false;
    }
}).with(Time.seconds(10));

PatternStream<TemEvent> patternStream=CEP.pattern(inputEventStream,warningPattern);

Scala:

val env:StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

val input=//data

val pattern:Pattern[TemEvent,_]=Pattern.begin("start").where(event=>event.temp=>26.0)

val patternStream:PatternStream[TemEvent]=CEP.pattern(input,pattern)

从Pattern中查询数据

发现Pattern之后,是可以从Pattern中把数据查询出来进行下一步的操作,可以使用select或者flatSelect方法从Pattern中选择数据。

Select

选择方法需要实现PatternSelectionFunction ,它对所有的消息序列有一个选择方法,会对所有数据执行。这个方法接收的参数是一个Map,select方法返回的结果是一个结果。 为了接收结果,我们需要定义一个POJO类。 Java:

class MyPatternSelectFunction<IN,OUT> implements PatternSelectFunction<IN,OUT>{
    @Override
    public OUT select(Map<String,IN> pattern){
        IN startEvent=pattern.get("start");
        IN endEvent=pattern.get("end");
        return new OUT(startEvent,endEvent);
    }
}

Scala:

def selectFn(pattern:mutable.Map[String,IN]):OUT = {
    val startEvent=pattern.get("start").get
    val endEvent=pattern.get("end").get
    OUT(startEvent,endEvent)
}

flatSelect

两种select方法很类似,唯一的区别是flatSelect返回的结果数量是不定的,flatSelect方法有一个额外的Collector 参数,是为输出结果使用的。 Java:

class MyPatternFlatSelectFunction<IN,OUT> implements PatternFlatSelectFunction<IN,OUT>{
    @Override
    public void select(Map<String,IN> pattern,Collector<OUT> collector){
        IN startEvent=pattern.get("start");
        IN endEvent=pattern.get("end");

        for(int i=0;i<startEvent.getValue();i++){
            collector.collect(new OUT(startEvent,endEvent));
        }
    }
}

Scala:

def flatSelectFn(pattern:mutable.Map[String,IN],collector:Collector[OUT])={
    val start=pattern.get("start").get
    val end=pattern.get("end").get
    for(i <- 0 to startEvent.getValue){
        collector.collect(OUT(start,end))
    }
}
Show Disqus Comments

Search

    Table of Contents