Flink 学习笔记十一

2017/12/03 Flink

Flink2 学习笔记十一

SQL

Flink 表格API允许使用者在sql()方法中执行SQL。这个函数会使用Apache Calcite 来作为SQL语法验证和优化。SQL执行的结果可以转化为数据集,数据流以及TableSink。

数据流执行SQL

SQL 可以通过SELECT STREAM在TableEnvironment上执行。 例如: Java:

StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv=TableEnvironment.getTableEnvironment(env);

DataStream<Tuple3<Long,String,Integer>> ds=env.addSource(...);
tableEnv.registerDataStream("Products",ds,"id,name,stock");
Table result=tableEnv.sql("SELECT STREAM * FROM Products WHERE name LIKE '%Apple%'");

Scala:

val env=StreamExecutionEnvironment.getExecutionEnvironment
val tenv=TableEnvironment.getTableEnvironment(env)

val ds:DataStream[(Long,String,Integer)] = env.addSource(...)
tableEnv.registerDataStream("Products",ds,'id,'name,'stock)
val result=tableEnv.sql("SELECT STREAM * FROM Products WHERE name LIKE '%Apple%' ")

SQL的列名使用程序同样的处理方案,列名上加上`。

支持的SQL语法

SQL支持的是BNF:

“query: 
  values 
  | { 
      select 
      | selectWithoutFrom 
      | query UNION [ ALL ] query 
      | query EXCEPT query 
      | query INTERSECT query 
    } 
    [ ORDER BY orderItem [, orderItem ]* ] 
    [ LIMIT { count | ALL } ] 
    [ OFFSET start { ROW | ROWS } ] 
    [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY] 
 
orderItem: 
  expression [ ASC | DESC ] 
 
select: 
  SELECT [ STREAM ] [ ALL | DISTINCT ] 
  { * | projectItem [, projectItem ]* } 
  FROM tableExpression 
  [ WHERE booleanExpression ] 
  [ GROUP BY { groupItem [, groupItem ]* } ] 
  [ HAVING booleanExpression ] 
 
selectWithoutFrom: 
  SELECT [ ALL | DISTINCT ] 
  { * | projectItem [, projectItem ]* } 
 
projectItem: 
  expression [ [ AS ] columnAlias ] 
  | tableAlias . * 
 
tableExpression: 
  tableReference [, tableReference ]* ”


“| tableExpression [ NATURAL ] [ LEFT | RIGHT | FULL ] JOIN tableExpression [ joinCondition ] 
 
joinCondition: 
  ON booleanExpression 
  | USING '(' column [, column ]* ')' 
 
tableReference: 
  tablePrimary 
  [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ] 
 
tablePrimary: 
  [ TABLE ] [ [ catalogName . ] schemaName . ] tableName 
 
values: 
  VALUES expression [, expression ]* 
 
groupItem: 
  expression 
  | '(' ')' 
  | '(' expression [, expression ]* ')' ”

Show Disqus Comments

Search

    Table of Contents