Flink 学习笔记八

2017/06/30 Flink

Flink2 学习笔记八

数据批处理API。 ##Batch Processing API 数据批处理API包括以下内容:

  • Data sources
  • Transformations
  • Data sinks
  • Connectors 如图是Flink程序的基本结构: 图一

Data Sources

data sources 是数据源,数据会从数据源进入到DataSet API。

File-based

  • readTextFile(Path)
  • readTextFileWithValue(path)
  • readCsvFile(path)
  • readFileOfPremitives(path,delimiter,class)
  • readHadoopFile(FileInputFormat,Key,Value,path)
  • readSequenceFile(Key,Value,path)

Collection-based

  • fromCollection(Collection)
  • fromCollection(Iterator,class)
  • fromElements(T)
  • fromParallelCollection(SplittableIterator,class)
  • generateSequence(from,to)

Generic Sources

  • readFile(inputFormat,path) 使用FileInputFormat 来读取一个文件
  • createInput(inputFormat) 从能用数据源创建dataset

Compressed files

Flink支持一些后缀的压缩文件读取。.gz,.gzip,.deflate这三种后缀的文件会被自动解压。不是并行读取和处理。

Transformation

支持

  • Map
  • Flat Map
  • Filter
  • Project
  • Reduce on grouped datasets 用户自定义函数将一组数据合并成一条数据,如下:
    Java: ```java public class WC{ public String word; public int count; }

public class WordCounter implements ReduceFunction{ @Override public WC reduce(WC in1,WC int2){ return new WC(int1.word,int1.count+in2.count); } } DataSet words=// DataSet wordCounts=words.groupBy("word").reduce(new WordCounter());

Scala:
```scala
def WC(val word:String,val count:Int){
    def this(){
        this(null,-1)
    }
}

val words:DataSet[WC]
val wordCounts=words.groupBy("word").reduce{
    (w1,w2) => new WC(w1.word,w1.count+w2.count)
}
  • reduce on grouped datasets by field position key
  • group combine 如果要在操作中插入其它的操作的话, group combine可以实现。
    Java:
    ```java DataSet input = [..]

    DataSet<Tuple2<String, Integer» combinedWords = input .groupBy(0); // group similar words .combineGroup(new GroupCombineFunction<String, Tuple2<String,
    Integer>() {

    public void combine(Iterable words, Collector<Tuple2<String,Integer>>){ String key=null; int count=0; for(String word:words){ key=word; count++; } out.collect(new Tuple2(key,count)); } });

Scala:  
```scala
val input:DataSet[String]=[...]
val combinedWords:DataSet[(String,Int)]=input.groupBy(0).combineGroup{
    (words,out:Collector[(String,Int)])=>
        val key:String=null
        var count=0

        for(word <- words){
            key=word
            count+=1
        }
        out.collect((key,count))
}
  • Aggregate on a grouped tuple dataset
  • MinBy on a grouped tuple dataset
  • MaxBy on a grouped tuple dataset
  • Reduce on full dataset
  • group reduce on a full dataset
  • Aggregate on a full tuple dataset
  • MinBy on a full tuple dataset
  • MaxBy on a full tuple dataset
  • Distinct
  • Join
  • Cross
  • Union
  • Rebance
  • Hash Partition
  • Range Partition
  • Sort partition
  • First-n
Show Disqus Comments

Search

    Table of Contents