本文共 7614 字,大约阅读时间需要 25 分钟。
Windows
聚合事件(例如计数、和)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口 windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。Windows可以是时间驱动(示例:每30秒)或数据驱动(示例:每100个元素)。一个典型的方法是区分不同类型的窗口,比如翻筋斗窗口(没有重叠)、滑动窗口(有重叠)和会话窗口(中间有一个不活跃的间隙)。
Time
当提到流程序中的时间(例如定义窗口)时,可以指不同的时间概念:事件时间Event Time 是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感器或生产服务附加的时间戳。Flink通过timestamp assigners(时间戳指定人)访问事件时间戳。
摄入时间Ingestion time 是事件进入源操作符的Flink数据流的时间。
处理时间Processing Time是执行基于时间的操作的每个操作符的本地时间。
有状态操作(Stateful Operations)
虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但是一些操作记住了跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。有状态操作的状态被维护在可以认为是嵌入式键/值存储中。状态与有状态操作符读取的流一起被严格地分区和分布。因此,在keyBy()函数之后,只能在键控流上访问键/值状态,并且只能访问与当前事件的键相关联的值。对齐流和状态的键确保所有的状态更新都是本地操作,保证一致性而不增加事务开销。这种对齐还允许Flink透明地重新分配状态和调整流分区。
容错检查点Checkpoints for Fault ToleranceFlink通过流回放和检查点的组合实现了容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复操作符的状态并从检查点重新播放事件,流数据流可以在检查点恢复,同时保持一致性(准确地说是一次处理语义)。检查点间隔是在执行期间用恢复时间(需要重放的事件数量)来权衡容错开销的一种方法。
关于容错的内部描述提供了关于Flink如何管理检查点和相关主题的更多信息。有关启用和配置检查点的详细信息在检查点API文档中。
批处理流Batch on Streamin
Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限的元素数量)。数据集在内部被视为数据流。因此,上述概念同样适用于批处理程序,也适用于流程序,但有少数例外:批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可能的,因为输入是有界的。这将使成本更多地用于恢复,但使常规处理更便宜,因为它避免了检查点。
数据集API中的有状态操作使用简化的内存/核心外数据结构,而不是键/值索引。
DataSet API引入了特殊的synchronized(基于超步的)迭代,这只能在有界的流上实现。
Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种来源(例如,消息队列、套接字流、文件)创建的。结果通过sink返回,它可以将数据写入文件或写入标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中执行,也可以在许多机器的集群中执行。下面的程序是一个完整的流窗口单词计数应用程序的工作示例,它在5秒的窗口中对来自web套接字的单词进行计数。您可以复制并粘贴代码在本地运行它。import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject WindowWordCount { def main(args: Array[String]) { val env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("localhost", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1) counts.print() env.execute("Window Stream WordCount") }}
要运行示例程序,首先从终端启动netcat的输入流:
只需键入一些单词,然后按下回车键就可以得到一个新词。这些将是单词计数程序的输入。如果你想看到数大于1,输入相同的单词一遍又一遍地在5秒(如果你不能快速敲键盘,增加窗口大小的5秒内☺)Table API & SQL
Apache Flink具有两个用于统一流和批处理的关系API——Table API和SQL。Table API是Scala和Java的语言集成查询API,允许从关系操作符(如选择、筛选和以非常直观的方式连接)中组合查询。Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(数据集)还是流输入(DataStream),任何接口中指定的查询都具有相同的语义并指定相同的结果。Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以很容易地在所有api和基于这些api的库之间切换。例如,您可以使用CEP库从数据流中提取模式,然后使用表API分析模式,或者在对预处理数据运行Gelly图形算法之前,您可以使用SQL查询扫描、过滤和聚合批处理表。
请注意,Table API和SQL的特性还不完整,正在积极开发中。不是所有的操作都被[Table API, SQL]和[stream, batch]输入的每个组合所支持。
SQL标准的Apache Calcite
statement: setStatement | resetStatement | explain | describe | insert | update | merge | delete | querysetStatement: [ ALTER ( SYSTEM | SESSION ) ] SET identifier '=' expressionresetStatement: [ ALTER ( SYSTEM | SESSION ) ] RESET identifier | [ ALTER ( SYSTEM | SESSION ) ] RESET ALLexplain: EXPLAIN PLAN [ WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION ] [ EXCLUDING ATTRIBUTES | INCLUDING [ ALL ] ATTRIBUTES ] [ AS JSON | AS XML ] FOR ( query | insert | update | merge | delete )describe: DESCRIBE DATABASE databaseName | DESCRIBE CATALOG [ databaseName . ] catalogName | DESCRIBE SCHEMA [ [ databaseName . ] catalogName ] . schemaName | DESCRIBE [ TABLE ] [ [ [ databaseName . ] catalogName . ] schemaName . ] tableName [ columnName ] | DESCRIBE [ STATEMENT ] ( query | insert | update | merge | delete )insert: ( INSERT | UPSERT ) INTO tablePrimary [ '(' column [, column ]* ')' ] queryupdate: UPDATE tablePrimary SET assign [, assign ]* [ WHERE booleanExpression ]assign: identifier '=' expressionmerge: MERGE INTO tablePrimary [ [ AS ] alias ] USING tablePrimary ON booleanExpression [ WHEN MATCHED THEN UPDATE SET assign [, assign ]* ] [ WHEN NOT MATCHED THEN INSERT VALUES '(' value [ , value ]* ')' ]delete: DELETE FROM tablePrimary [ [ AS ] alias ] [ WHERE booleanExpression ]query: values | WITH withItem [ , withItem ]* query | { select | selectWithoutFrom | query UNION [ ALL | DISTINCT ] query | query EXCEPT [ ALL | DISTINCT ] query | query MINUS [ ALL | DISTINCT ] query | query INTERSECT [ ALL | DISTINCT ] query } [ ORDER BY orderItem [, orderItem ]* ] [ LIMIT [ start, ] { count | ALL } ] [ OFFSET start { ROW | ROWS } ] [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY ]withItem: name [ '(' column [, column ]* ')' ] AS '(' query ')'orderItem: expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]select: SELECT [ STREAM ] [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* } FROM tableExpression [ WHERE booleanExpression ] [ GROUP BY { groupItem [, groupItem ]* } ] [ HAVING booleanExpression ] [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom: SELECT [ ALL | DISTINCT ] { * | projectItem [, projectItem ]* }projectItem: expression [ [ AS ] columnAlias ] | tableAlias . *tableExpression: tableReference [, tableReference ]* | tableExpression [ NATURAL ] [ ( LEFT | RIGHT | FULL ) [ OUTER ] ] JOIN tableExpression [ joinCondition ] | tableExpression CROSS JOIN tableExpression | tableExpression [ CROSS | OUTER ] APPLY tableExpressionjoinCondition: ON booleanExpression | USING '(' column [, column ]* ')'tableReference: tablePrimary [ matchRecognize ] [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary: [ [ catalogName . ] schemaName . ] tableName '(' TABLE [ [ catalogName . ] schemaName . ] tableName ')' | tablePrimary [ EXTEND ] '(' columnDecl [, columnDecl ]* ')' | [ LATERAL ] '(' query ')' | UNNEST '(' expression ')' [ WITH ORDINALITY ] | [ LATERAL ] TABLE '(' [ SPECIFIC ] functionName '(' expression [, expression ]* ')' ')'columnDecl: column type [ NOT NULL ]values: VALUES expression [, expression ]*groupItem: expression | '(' ')' | '(' expression [, expression ]* ')' | CUBE '(' expression [, expression ]* ')' | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef: windowName | windowSpecwindowSpec: [ windowName ] '(' [ ORDER BY orderItem [, orderItem ]* ] [ PARTITION BY expression [, expression ]* ] [ RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING } | ROWS numericExpression { PRECEDING | FOLLOWING } ] ')'
在insert中,如果insert或UPSERT语句没有指定目标列的列表,查询的列数必须与目标表相同,除非是在某些一致性级别。
在merge中,至少有一个匹配时和未匹配时的子句必须出现。
tablePrimary可能只包含特定符合性级别的扩展子句;在这些相同的一致性级别中,insert中的任何列都可以被columnDecl替换,其效果类似于将其包含在EXTEND子句中。
在orderItem中,如果表达式是正整数n,它表示SELECT子句中的第n项。
在查询中,count和start可以是无符号整型字面值,也可以是值为整型的动态参数。
aggregate聚合查询是包含GROUP BY或HAVING子句或SELECT子句中的聚合函数的查询。在SELECT中,具有和ORDER BY子句的聚合查询中,所有表达式都必须是当前组中的常量(即,按照group BY子句或常量的定义对常量进行分组)、聚合函数或常量与聚合函数的组合。聚合和分组函数只能出现在聚合查询中,而且只能出现在SELECT、HAVING或ORDER BY子句中。标量子查询是用作表达式的子查询。如果子查询不返回行,则该值为空;如果它返回多个行,则为错误。IN、EXISTS和scalar子查询可以出现在表达式的任何地方(例如SELECT子句、where子句、ON子句连接或聚合函数的参数)。
一个IN、EXISTS或scalar子查询可能相互关联;也就是说,它可以引用包含查询的FROM子句中的表。
selectWithoutFrom等价于值,但不是标准SQL,只允许在某些符合级别中使用。
MINUS相当于EXCEPT,但不是标准SQL,只允许在某些一致性级别上使用。
交叉应用和外部应用只允许在某些符合级别。
“限制开始,计数”相当于“限制计数偏移开始”,但只允许在某些符合级别。“LIMIT start, count” is equivalent to “LIMIT count OFFSET start” but is only allowed in certain conformance levels.
转载于:https://blog.51cto.com/13959448/2316204