Flink 学习笔记七

2017/06/29 Flink

Flink2 学习笔记七

Data Sinks

Flink提供了一些数据输出的操作:

  • writeAsText()
  • writeAsCsv()
  • print()/printErr()
  • writeUsingOutputFormat()
  • writeToSocket()

Event Time 和 WaterMark

Event time

数据时间是指数据产生的时间,例如在IoT项目中,这个时间是指采集器收集到的时间。大体上来说,这些时间在数据进入flink的时候就已经确实下来了,在处理过程中,这些时间会被提取出来,参于时间的计算,数据时间可以用来对无序数据进行运算。

Processing time

处理时间是流数据在执行过程中的时间,Processing Time Window只考虑处理过程中添加的timestamp。处理时间是在流处理中最简单的数据同步方法。在分布式异步执行环境中,处理时间不是决定性的,不同机器的处理速度不同。

Ingestion time

这是数据进入flink的时间,所有的基于时间的操作都会引用这个时间,摄入时间比处理时间的代价开销更大,但是它提供了准确的时间,可以预测的结果。使用摄入时间的程序不能处理处理乱序的数据,因为时间戳是在数据进入系统的时候定义的。 如下是一个例子,是显示如何使用消息暑和Watermarks。在使用摄入时间的情况下和处理时间的情况下,时间处理会是能用方法处理的,watermarks会自动生成。 Java:

final StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
//or
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

Scala:

val env=StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
//or
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)

在使用event time的情况下,需要手动指定程序的watermarks和时间戳,有两种方式处理watermarks和时间戳。

  • 直接从数据中取
  • 使用时间赋值器 Java:
    final StreamExectuionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    

    Scala:

    val env=StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EvenTime)
    

    处理时间过程中的最值实践是直接把时间信息加入到数据中,Flink支持预先定义的时间提取函数和watermarks生成器。

Connectors

Flink有许多种类的connector来读写多种数据源。

Kafka connector

Kafak是一种消息队列服务,Flink支持预定义的Kafka组件。下图展示了kafka connectors如何工作的。 图一 只需求添加依赖即可。 Java:

//consumer
Properties prop=new Properties();
prop.setProperty("bootstrap.servers","localhost:9092");
prop.setProperty("group.id","test");
DataStream<String> input = env.addSource(new FlinkKafkaConsumer09<String>("topic",new SimpleStringSchema(),prop));

//producer
stream.addSink(new FlinkKafkaProducer09<String>("localhost:9092","topic",new SimpleStringSchema());

Scala:

//consumer
val prop=new Properties();
prop.setProperty("bootstrap.server","localhost:9092");
prop.setProperty("zookeeper.connect","localhost:2181");
prop.setProperty("group.id","test");
stream=env.addSource(new FlinkKafkaConsumer09[String]("topic",new SimpleStringSchema(),prop)).print
//producer
stream.addSink(new FlinkKafkaProducer09[String]("localhost:9092","topic",new SimpleStringSchema()))

RabbitMQ Connector

RabbitMQ 是一种广泛使用的,高性能的消息队列系统。下图是RabbitMQ工作图示: 图二 只需要添加依赖就可以了。

ElasticSearch Connector

ES Connector工作模式如图: 图三

Embedded node mode

在嵌入式运行模式下,sink使用BulkProcessor把文档发送到ES中。

Transport client mode

ES 允许程序通过9300端口和ES通信。

Cassandra connector

Cassandra 是分布式的,低延迟的,NOSQL数据库。下图是工作模式: 图四

Show Disqus Comments

Search

    Table of Contents