博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊flink Table的Time Attributes
阅读量:6229 次
发布时间:2019-06-21

本文共 7421 字,大约阅读时间需要 24 分钟。

本文主要研究一下flink Table的Time Attributes

Processing time

通过fromDataStream定义

DataStream
> stream = ...;// declare an additional logical field as a processing time attributeTable table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Processing time

通过TableSource定义

// define a table source with a processing attributepublic class UserActionSource implements StreamTableSource
, DefinedProctimeAttribute { @Override public TypeInformation
getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream
getDataStream(StreamExecutionEnvironment execEnv) { // create stream DataStream
stream = ...; return stream; } @Override public String getProctimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; }}// register table sourcetEnv.registerTableSource("UserActions", new UserActionSource());WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time

Event time

通过fromDataStream定义

// Option 1:// extract timestamp and assign watermarks based on knowledge of the streamDataStream
> stream = inputStream.assignTimestampsAndWatermarks(...);// declare an additional logical field as an event time attributeTable table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");// Option 2:// extract timestamp from first field, and assign watermarks based on knowledge of the streamDataStream
> stream = inputStream.assignTimestampsAndWatermarks(...);// the first field has been used for timestamp extraction, and is no longer necessary// replace first field with a logical event time attributeTable table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");// Usage:WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 从DataStream创建Table的话,可以在fromDataStream里头进行定义Event time;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段

通过TableSource定义

// define a table source with a rowtime attributepublic class UserActionSource implements StreamTableSource
, DefinedRowtimeAttributes { @Override public TypeInformation
getReturnType() { String[] names = new String[] {"Username", "Data", "UserActionTime"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()}; return Types.ROW(names, types); } @Override public DataStream
getDataStream(StreamExecutionEnvironment execEnv) { // create stream // ... // assign watermarks based on the "UserActionTime" attribute DataStream
stream = inputStream.assignTimestampsAndWatermarks(...); return stream; } @Override public List
getRowtimeAttributeDescriptors() { // Mark the "UserActionTime" attribute as event-time attribute. // We create one attribute descriptor of "UserActionTime". RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor( "UserActionTime", new ExistingField("UserActionTime"), new AscendingTimestamps()); List
listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr); return listRowtimeAttrDescr; }}// register the table sourcetEnv.registerTableSource("UserActions", new UserActionSource());WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
  • 通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time

definedTimeAttributes

flink-table_2.11-1.7.0-sources.jar!/org/apache/flink/table/sources/definedTimeAttributes.scala

/**  * Extends a [[TableSource]] to specify a processing time attribute.  */trait DefinedProctimeAttribute {  /**    * Returns the name of a processing time attribute or null if no processing time attribute is    * present.    *    * The referenced attribute must be present in the [[TableSchema]] of the [[TableSource]] and of    * type [[Types.SQL_TIMESTAMP]].    */  @Nullable  def getProctimeAttribute: String}/**  * Extends a [[TableSource]] to specify rowtime attributes via a  * [[RowtimeAttributeDescriptor]].  */trait DefinedRowtimeAttributes {  /**    * Returns a list of [[RowtimeAttributeDescriptor]] for all rowtime attributes of the table.    *    * All referenced attributes must be present in the [[TableSchema]] of the [[TableSource]] and of    * type [[Types.SQL_TIMESTAMP]].    *    * @return A list of [[RowtimeAttributeDescriptor]].    */  def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor]}/**  * Describes a rowtime attribute of a [[TableSource]].  *  * @param attributeName The name of the rowtime attribute.  * @param timestampExtractor The timestamp extractor to derive the values of the attribute.  * @param watermarkStrategy The watermark strategy associated with the attribute.  */class RowtimeAttributeDescriptor(  val attributeName: String,  val timestampExtractor: TimestampExtractor,  val watermarkStrategy: WatermarkStrategy) {  /** Returns the name of the rowtime attribute. */  def getAttributeName: String = attributeName  /** Returns the [[TimestampExtractor]] for the attribute. */  def getTimestampExtractor: TimestampExtractor = timestampExtractor  /** Returns the [[WatermarkStrategy]] for the attribute. */  def getWatermarkStrategy: WatermarkStrategy = watermarkStrategy  override def equals(other: Any): Boolean = other match {    case that: RowtimeAttributeDescriptor =>        Objects.equals(attributeName, that.attributeName) &&        Objects.equals(timestampExtractor, that.timestampExtractor) &&        Objects.equals(watermarkStrategy, that.watermarkStrategy)    case _ => false  }  override def hashCode(): Int = {    Objects.hash(attributeName, timestampExtractor, watermarkStrategy)  }}
  • DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

小结

  • 在从DataStream或者TableSource创建Table时可以指定Time Attributes,指定了之后就可以作为field来使用或者参与time-based的操作
  • 针对Processing time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;通过TableSource创建Table的话,可以通过实现DefinedProctimeAttribute接口来定义Processing time;DefinedProctimeAttribute定义了getProctimeAttribute方法,返回String,用于定义Process time的字段名
  • 针对Event time,如果从DataStream创建Table的话,可以在fromDataStream里头进行定义;具体有两种方式,一种是额外定义一个字段,一种是覆盖原有的字段;通过TableSource创建Table的话,可以通过实现DefinedRowtimeAttributes接口来定义Event time;DefinedRowtimeAttributes定义了getRowtimeAttributeDescriptors方法,返回的是RowtimeAttributeDescriptor的List,RowtimeAttributeDescriptor有3个属性,分别是attributeName、timestampExtractor及watermarkStrategy

doc

转载地址:http://anxna.baihongyu.com/

你可能感兴趣的文章
SQLserver From simple To Full backup model
查看>>
Centos7.5-文件权限管理
查看>>
tomcat虚拟主机 server.xml文件配置
查看>>
Capture Nx
查看>>
OC中的NSSet(集合)
查看>>
马士兵教学语录
查看>>
计算机网络与Internet应用
查看>>
每天一个linux命令-mkdir
查看>>
四天精通shell编程(二)
查看>>
标签制作软件中如何导出标签模板为PDF文件?
查看>>
Linux运维系统工程师系列---22
查看>>
我的友情链接
查看>>
我的友情链接
查看>>
域结构的网络
查看>>
mysql 命令
查看>>
Oracle 11g rac 生产环境部署详录
查看>>
web.xml 中<taglib>报错
查看>>
Linux文件系统上的特殊权限(SUID、SGID、Sticky)的知识点
查看>>
零部件表设计 T_AIS_BASE_PARTS_INFO
查看>>
fgsdf
查看>>