Flink 学习笔记十

2017/12/02 Flink

Flink2 学习笔记十

Table API 的使用介绍

Flink提供了Sql interface ,能够从数据流中的抽取出SQL Operator。一旦数据流注册到一个数据表,就可以使用一些SQL操作如aggregations,joins,selections。 数据表可以运行Sql查询,当操作执行完毕后,再次将table转化为数据流和数据集就可以了。Flink使用了Apache Calcite来处理查询的优化过程。 需要使用Table API的,只需要引入对应的Maven依赖就可以了。

数据表注册

为了使用数据表API,我们需要把数据表注册到TableEnvironment中,一旦数据表注册了一个名称,然后就可以在TableEnvironment中使用它了。 TableEnvironment维护了一份内在的表目录数据,如下图所示: 图一 数据表的名称必须是唯一的。

注册数据集

为了在数据集上执行SQL操作,我们需要把它注册到BatchTableEnvironment中。注册的同时需要定义Java POJO类。 例如:

public static class WC{
    public String word;
    public long frequency;
    public WC(){
    }

    public WC(String word,long frequency){
        this.word=word;
        this.frequency=frequency;
    }

    @Override
    public String toString(){
        return "WC"+word+"  "+frequency;
    }
}

    ExecutionEnviroment env=ExecutionEnvironment.getExecutionEnvironment();

    BatchTableEnvironment tEnv=TableEnvironment.getTableEnvironment(env);

    DataSet<WC> input=env.fromElements(new WC("Hello",1),new WC("World",1),new WC("Hello",1));

    tEnv.registerDataSet("WordCount",input,"word,frequency");

Scala:

case class WordCount(word:String,frequency:Long)

val env=ExecutionEnvironment.getExecutionEnvironment

val tEnv=TableEnvironment.getTableEnvironment(env)

val input=env.fromElements(WordCount("hello",1),WordCount("hello",1),WordCount("world",1),WordCount("hello",1))

tEnv.registerDataSet("WordCount",input,'word,'frequency)

注册数据流

可以把数据流也注册到StreamTableEnviroment中。也需要定义一个POJO 类即可。 env.registerDataStream("wordCount",input,'word,'frequency)

注册数据表

和数据集和数据流相似,可以直接注册数据表。 Java:

ExecutionEnvironment env=ExecutionEnvironment.getExecutionEnvironment();
BatchTabbleEnvironment tEnv=TableEnvironment.getTableEnvironment(env);
DataSet<WC> input=env.fromElements(new WC("hello",1),new WC("world",1),new WC("Hello",1))

tEnv.registerDataSet("WordCount",input,"word,frequency");

Table table=tEnv.sql("SELECT word,SUM(frequency) as frequency FROM Word-Count GROUP BY word having word='Hello'")

tEnv.registerTable("selected",selectedTable);

Scala:

val env=ExecutionEnvironment.getExecutionEnvironment
val tEnv=TableEnvironment.getTableEnvironment(env)
val input=env.fromElements(WordCount("hello",1),WordCount("hello",1),WordCount("word",1),WordCount("hello",1))
tEnv.registerDataSet("WordCount",input,'word,'frequency)
val table=tEnv.sql("SELECT word,SUM(frequency) FROM WordCount GROUP BY word")
val selected=tEnv.sql("SELECt word,SUM(frequency) FROM WordCount GROUP BY word where word='hello'")
tEnv.registerTable("selected",selected)

注册外部数据源

Flink支持外部数据源,例如:

  • CVS
  • Kafka Json

访问数据表

tableEnvironment.scan("tableName")

数据表操作

Flink API 支持的数据表操作有:

  • select
  • where
  • filter
  • groupby
  • join
  • leftOuterJoin
  • rightOuterJoin
  • fullOuterJoin
  • union
  • unionAll
  • intersect
  • insersectAll
  • minus
  • minusAll
  • distinct
  • orderbBy
  • limit

数据表的数据类型和SQL,Java类型的关系

|Table API | SQL | Java Type| | —— | —— | —— | | Type.String | VARCHAR | String | |TYPE.BOOLEAN|BOOLEAN|Boolean| |TYPE.BYTE|TINYINT|Byte| |TYPE.SHORT|SMALLINT|Short| |TYPE.INT|INTEGER,INT|INTEGER| |TYPE.LONG|BIGINT|Long| |TYPE.FLOAT|REAL,FLOAT|Float| |TYPE.DOUBLE|DOUBLE|Double| |TYPE.DECIMAL|DECIMAL|BigDecimal| |TYPE.DATE|DATE|Date| |TYPE.TIME|TIME|Time| |TYPE.TIMESTAMP|TIMESTAMP(3)|TimeStamp| |TYPE.INTERVAL_MONTHS|INTERVAL YEAR TO MONTH|Integer| |TYPE.INTERVAL_MILLIOS|INTERVAL DAY TO SECONDS(3)|Long|

Show Disqus Comments

Search

    Table of Contents