博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
flink流计算随笔(5)
阅读量:6325 次
发布时间:2019-06-22

本文共 7614 字,大约阅读时间需要 25 分钟。

Windows

聚合事件(例如计数、和)在流上的工作方式与批处理不同。例如,不可能计算流中的所有元素,因为流通常是无限的(×××的)。相反,流上的聚合(计数、和等)是由窗口 windows限定作用域的,例如“过去5分钟的计数”或“最后100个元素的总和”。

Windows可以是时间驱动(示例:每30秒)或数据驱动(示例:每100个元素)。一个典型的方法是区分不同类型的窗口,比如翻筋斗窗口(没有重叠)、滑动窗口(有重叠)和会话窗口(中间有一个不活跃的间隙)。

Time

当提到流程序中的时间(例如定义窗口)时,可以指不同的时间概念:

事件时间Event Time 是创建事件的时间。它通常由事件中的时间戳描述,例如由生产传感器或生产服务附加的时间戳。Flink通过timestamp assigners(时间戳指定人)访问事件时间戳。

摄入时间Ingestion time 是事件进入源操作符的Flink数据流的时间。

处理时间Processing Time是执行基于时间的操作的每个操作符的本地时间。

有状态操作(Stateful Operations)

虽然数据流中的许多操作一次只查看一个单独的事件(例如事件解析器),但是一些操作记住了跨多个事件的信息(例如窗口操作符)。这些操作称为有状态操作。

有状态操作的状态被维护在可以认为是嵌入式键/值存储中。状态与有状态操作符读取的流一起被严格地分区和分布。因此,在keyBy()函数之后,只能在键控流上访问键/值状态,并且只能访问与当前事件的键相关联的值。对齐流和状态的键确保所有的状态更新都是本地操作,保证一致性而不增加事务开销。这种对齐还允许Flink透明地重新分配状态和调整流分区。

容错检查点Checkpoints for Fault Tolerance
Flink通过流回放和检查点的组合实现了容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复操作符的状态并从检查点重新播放事件,流数据流可以在检查点恢复,同时保持一致性(准确地说是一次处理语义)。

检查点间隔是在执行期间用恢复时间(需要重放的事件数量)来权衡容错开销的一种方法。

关于容错的内部描述提供了关于Flink如何管理检查点和相关主题的更多信息。有关启用和配置检查点的详细信息在检查点API文档中。

批处理流Batch on Streamin

Flink执行批处理程序作为流程序的特殊情况,其中流是有界的(有限的元素数量)。数据集在内部被视为数据流。因此,上述概念同样适用于批处理程序,也适用于流程序,但有少数例外:

批处理程序的容错不使用检查点。恢复通过完全重放流来实现。这是可能的,因为输入是有界的。这将使成本更多地用于恢复,但使常规处理更便宜,因为它避免了检查点。

数据集API中的有状态操作使用简化的内存/核心外数据结构,而不是键/值索引。

DataSet API引入了特殊的synchronized(基于超步的)迭代,这只能在有界的流上实现。

Flink中的DataStream程序是在数据流上实现转换的常规程序(例如,过滤、更新状态、定义窗口、聚合)。数据流最初是从各种来源(例如,消息队列、套接字流、文件)创建的。结果通过sink返回,它可以将数据写入文件或写入标准输出(例如命令行终端)。Flink程序在各种上下文中运行,独立运行,或嵌入到其他程序中。执行可以在本地JVM中执行,也可以在许多机器的集群中执行。
下面的程序是一个完整的流窗口单词计数应用程序的工作示例,它在5秒的窗口中对来自web套接字的单词进行计数。您可以复制并粘贴代码在本地运行它。

import org.apache.flink.streaming.api.scala._import org.apache.flink.streaming.api.windowing.time.Timeobject WindowWordCount {  def main(args: Array[String]) {    val env = StreamExecutionEnvironment.getExecutionEnvironment    val text = env.socketTextStream("localhost", 9999)    val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } }      .map { (_, 1) }      .keyBy(0)      .timeWindow(Time.seconds(5))      .sum(1)    counts.print()    env.execute("Window Stream WordCount")  }}

要运行示例程序,首先从终端启动netcat的输入流:

只需键入一些单词,然后按下回车键就可以得到一个新词。这些将是单词计数程序的输入。如果你想看到数大于1,输入相同的单词一遍又一遍地在5秒(如果你不能快速敲键盘,增加窗口大小的5秒内☺)

Table API & SQL

Apache Flink具有两个用于统一流和批处理的关系API——Table API和SQL。Table API是Scala和Java的语言集成查询API,允许从关系操作符(如选择、筛选和以非常直观的方式连接)中组合查询。Flink的SQL支持基于实现SQL标准的Apache Calcite。无论输入是批输入(数据集)还是流输入(DataStream),任何接口中指定的查询都具有相同的语义并指定相同的结果。

Table API和SQL接口以及Flink的DataStream和DataSet API紧密集成在一起。您可以很容易地在所有api和基于这些api的库之间切换。例如,您可以使用CEP库从数据流中提取模式,然后使用表API分析模式,或者在对预处理数据运行Gelly图形算法之前,您可以使用SQL查询扫描、过滤和聚合批处理表。

请注意,Table API和SQL的特性还不完整,正在积极开发中。不是所有的操作都被[Table API, SQL]和[stream, batch]输入的每个组合所支持。

SQL标准的Apache Calcite

statement:      setStatement  |   resetStatement  |   explain  |   describe  |   insert  |   update  |   merge  |   delete  |   querysetStatement:      [ ALTER ( SYSTEM | SESSION ) ] SET identifier '=' expressionresetStatement:      [ ALTER ( SYSTEM | SESSION ) ] RESET identifier  |   [ ALTER ( SYSTEM | SESSION ) ] RESET ALLexplain:      EXPLAIN PLAN      [ WITH TYPE | WITH IMPLEMENTATION | WITHOUT IMPLEMENTATION ]      [ EXCLUDING ATTRIBUTES | INCLUDING [ ALL ] ATTRIBUTES ]      [ AS JSON | AS XML ]      FOR ( query | insert | update | merge | delete )describe:      DESCRIBE DATABASE databaseName   |  DESCRIBE CATALOG [ databaseName . ] catalogName   |  DESCRIBE SCHEMA [ [ databaseName . ] catalogName ] . schemaName   |  DESCRIBE [ TABLE ] [ [ [ databaseName . ] catalogName . ] schemaName . ] tableName [ columnName ]   |  DESCRIBE [ STATEMENT ] ( query | insert | update | merge | delete )insert:      ( INSERT | UPSERT ) INTO tablePrimary      [ '(' column [, column ]* ')' ]      queryupdate:      UPDATE tablePrimary      SET assign [, assign ]*      [ WHERE booleanExpression ]assign:      identifier '=' expressionmerge:      MERGE INTO tablePrimary [ [ AS ] alias ]      USING tablePrimary      ON booleanExpression      [ WHEN MATCHED THEN UPDATE SET assign [, assign ]* ]      [ WHEN NOT MATCHED THEN INSERT VALUES '(' value [ , value ]* ')' ]delete:      DELETE FROM tablePrimary [ [ AS ] alias ]      [ WHERE booleanExpression ]query:      values  |   WITH withItem [ , withItem ]* query  |   {          select      |   selectWithoutFrom      |   query UNION [ ALL | DISTINCT ] query      |   query EXCEPT [ ALL | DISTINCT ] query      |   query MINUS [ ALL | DISTINCT ] query      |   query INTERSECT [ ALL | DISTINCT ] query      }      [ ORDER BY orderItem [, orderItem ]* ]      [ LIMIT [ start, ] { count | ALL } ]      [ OFFSET start { ROW | ROWS } ]      [ FETCH { FIRST | NEXT } [ count ] { ROW | ROWS } ONLY ]withItem:      name      [ '(' column [, column ]* ')' ]      AS '(' query ')'orderItem:      expression [ ASC | DESC ] [ NULLS FIRST | NULLS LAST ]select:      SELECT [ STREAM ] [ ALL | DISTINCT ]          { * | projectItem [, projectItem ]* }      FROM tableExpression      [ WHERE booleanExpression ]      [ GROUP BY { groupItem [, groupItem ]* } ]      [ HAVING booleanExpression ]      [ WINDOW windowName AS windowSpec [, windowName AS windowSpec ]* ]selectWithoutFrom:      SELECT [ ALL | DISTINCT ]          { * | projectItem [, projectItem ]* }projectItem:      expression [ [ AS ] columnAlias ]  |   tableAlias . *tableExpression:      tableReference [, tableReference ]*  |   tableExpression [ NATURAL ] [ ( LEFT | RIGHT | FULL ) [ OUTER ] ] JOIN tableExpression [ joinCondition ]  |   tableExpression CROSS JOIN tableExpression  |   tableExpression [ CROSS | OUTER ] APPLY tableExpressionjoinCondition:      ON booleanExpression  |   USING '(' column [, column ]* ')'tableReference:      tablePrimary      [ matchRecognize ]      [ [ AS ] alias [ '(' columnAlias [, columnAlias ]* ')' ] ]tablePrimary:      [ [ catalogName . ] schemaName . ] tableName      '(' TABLE [ [ catalogName . ] schemaName . ] tableName ')'  |   tablePrimary [ EXTEND ] '(' columnDecl [, columnDecl ]* ')'  |   [ LATERAL ] '(' query ')'  |   UNNEST '(' expression ')' [ WITH ORDINALITY ]  |   [ LATERAL ] TABLE '(' [ SPECIFIC ] functionName '(' expression [, expression ]* ')' ')'columnDecl:      column type [ NOT NULL ]values:      VALUES expression [, expression ]*groupItem:      expression  |   '(' ')'  |   '(' expression [, expression ]* ')'  |   CUBE '(' expression [, expression ]* ')'  |   ROLLUP '(' expression [, expression ]* ')'  |   GROUPING SETS '(' groupItem [, groupItem ]* ')'windowRef:      windowName  |   windowSpecwindowSpec:      [ windowName ]      '('      [ ORDER BY orderItem [, orderItem ]* ]      [ PARTITION BY expression [, expression ]* ]      [          RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING }      |   ROWS numericExpression { PRECEDING | FOLLOWING }      ]      ')'

在insert中,如果insert或UPSERT语句没有指定目标列的列表,查询的列数必须与目标表相同,除非是在某些一致性级别。

在merge中,至少有一个匹配时和未匹配时的子句必须出现。

tablePrimary可能只包含特定符合性级别的扩展子句;在这些相同的一致性级别中,insert中的任何列都可以被columnDecl替换,其效果类似于将其包含在EXTEND子句中。

在orderItem中,如果表达式是正整数n,它表示SELECT子句中的第n项。

在查询中,count和start可以是无符号整型字面值,也可以是值为整型的动态参数。

aggregate聚合查询是包含GROUP BY或HAVING子句或SELECT子句中的聚合函数的查询。在SELECT中,具有和ORDER BY子句的聚合查询中,所有表达式都必须是当前组中的常量(即,按照group BY子句或常量的定义对常量进行分组)、聚合函数或常量与聚合函数的组合。聚合和分组函数只能出现在聚合查询中,而且只能出现在SELECT、HAVING或ORDER BY子句中。
标量子查询是用作表达式的子查询。如果子查询不返回行,则该值为空;如果它返回多个行,则为错误。

IN、EXISTS和scalar子查询可以出现在表达式的任何地方(例如SELECT子句、where子句、ON子句连接或聚合函数的参数)。

一个IN、EXISTS或scalar子查询可能相互关联;也就是说,它可以引用包含查询的FROM子句中的表。

selectWithoutFrom等价于值,但不是标准SQL,只允许在某些符合级别中使用。

MINUS相当于EXCEPT,但不是标准SQL,只允许在某些一致性级别上使用。

交叉应用和外部应用只允许在某些符合级别。

“限制开始,计数”相当于“限制计数偏移开始”,但只允许在某些符合级别。“LIMIT start, count” is equivalent to “LIMIT count OFFSET start” but is only allowed in certain conformance levels.

转载于:https://blog.51cto.com/13959448/2316204

你可能感兴趣的文章
TDSemiModal
查看>>
Onion Browser
查看>>
开源中国iOS客户端学习——(二)下拉刷新特效EGOTableViewPullRefresh
查看>>
安装eclipse的git插件报错的解决
查看>>
项目总结:定时给微博用户的最新微博回复
查看>>
Log4j或者Logback的NDC和MDC功能
查看>>
构造方法
查看>>
802.11(wifi)的MAC层功能
查看>>
单链表二
查看>>
使用 ssh -R 穿透局域网访问内部服务器主机,反向代理 无人值守化
查看>>
springmvc接口耗时统计&&拦截器
查看>>
Rack 源码分析
查看>>
每天一个linux命令(50):crontab命令
查看>>
XMPP JID的组成部分
查看>>
分布式系统测试那些事儿——理念
查看>>
JPA使用Hibernate实现,使用UUID.主键的生成策略.
查看>>
记一次JAVA连接hbase报getMaster attempt 0 of 10 failed异常的解决
查看>>
Scala之数组
查看>>
Linux入门教程:Apache服务配置(一)
查看>>
Connection to https://dl-ssl.google.com refused
查看>>