flink吧 关注:442贴子:931
  • 0回复贴,共1

flink什么时间做输出啊

只看楼主收藏回复

env.socketTextStream("10.3.87.23", 6666).assignAscendingTimestamps(_.times) //事务时间
.keyBy(k => (k.db, k.tbl))
.window(SlidingEventTimeWindows.of(Time.days(3), Time.days(1)))
.reduce((x, y) => {
TableCall(x.db, x.tbl, x.conuts + 1, y.times)})
.print()
{"timestamp":"2021-08-01 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
{"timestamp":"2021-08-02 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
{"timestamp":"2021-08-03 15:31:56,895","msg":" INFO org.apache.hadoop.hive.metastore.HiveMetaStore: [pool-9-thread-97]: 88: get_table : db=hivetest tbl=chinese_part"}
我已经输入了三天的数据了。为什么没有输出。写jdbc也不行啊。等我结束了nc就有输入了。换kafka source有输出了。但是都输入好好多天的数据才有输出。也不知道应该输入多少天的数据才会输出


IP属地:湖北1楼2021-08-26 16:21回复