阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

Flink SQL之Over 聚合操作

284次阅读
没有评论

共计 3247 个字符,预计需要花费 9 分钟才能阅读完成。

导读 Over 聚合定义(支持 Batch\Streaming):可以理解为是一种特殊的滑动窗口聚合函数。那这里我们拿 Over 聚合​ 与 窗口聚合 做一个对比,其之间的最大不同之处在于:窗口聚合:不在 group by 中的字段,不能直接在 select 中拿到;Over 聚合:能够保留原始字段. 在生产环境中,Over 聚合的使用场景还是比较少的。在 Hive 中也有相同的聚合,但是小伙伴萌可以想想你在离线数仓经常使用嘛?

应用场景:计算最近一段滑动窗口的聚合结果数据。
实际案例:查询每个产品最近一小时订单的金额总和:

SELECT order_id, order_time, amount,
  SUM(amount) OVER (
    PARTITION BY product
    ORDER BY order_time
    RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
  ) AS one_hour_prod_amount_sum
FROM Orders

Over 聚合的语法总结如下:

SELECT
  agg_func(agg_col) OVER ([PARTITION BY col1[, col2, ...]]
    ORDER BY time_col
    range_definition),
  ...
FROM ...

其中:

  • ORDER BY:必须是时间戳列(事件时间、处理时间)
  • PARTITION BY:标识了聚合窗口的聚合粒度,如上述案例是按照 product 进行聚合
  • range_definition:这个标识聚合窗口的聚合数据范围,在 Flink 中有两种指定数据范围的方式。第一种为按照行数聚合​,第二种为按照时间区间聚合。
  • 如下案例所示:

    时间区间聚合

    按照时间区间聚合就是时间区间的一个滑动窗口,比如下面案例 1 小时的区间,最新输出的一条数据的 sum 聚合结果就是最近一小时数据的 amount 之和。

    CREATE TABLE source_table (
        order_id BIGINT,
        product BIGINT,
        amount BIGINT,
        order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
        WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.order_id.min' = '1',
      'fields.order_id.max' = '2',
      'fields.amount.min' = '1',
      'fields.amount.max' = '10',
      'fields.product.min' = '1',
      'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
        product BIGINT,
        order_time TIMESTAMP(3),
        amount BIGINT,
        one_hour_prod_amount_sum BIGINT
    ) WITH ('connector' = 'print');
    
    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 1 小时的数据
        RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM source_table

    结果如下:

    +I[2, 2021-12-24T22:08:26.583, 7, 73]
    +I[2, 2021-12-24T22:08:27.583, 7, 80]
    +I[2, 2021-12-24T22:08:28.583, 4, 84]
    +I[2, 2021-12-24T22:08:29.584, 7, 91]
    +I[2, 2021-12-24T22:08:30.583, 8, 99]
    +I[1, 2021-12-24T22:08:31.583, 9, 138]
    +I[2, 2021-12-24T22:08:32.584, 6, 105]
    +I[1, 2021-12-24T22:08:33.584, 7, 145]
    行数聚合

    按照行数聚合就是数据行数的一个滑动窗口,比如下面案例,最新输出的一条数据的 sum 聚合结果就是最近 5 行数据的 amount 之和。

    CREATE TABLE source_table (
        order_id BIGINT,
        product BIGINT,
        amount BIGINT,
        order_time as cast(CURRENT_TIMESTAMP as TIMESTAMP(3)),
        WATERMARK FOR order_time AS order_time - INTERVAL '0.001' SECOND
    ) WITH (
      'connector' = 'datagen',
      'rows-per-second' = '1',
      'fields.order_id.min' = '1',
      'fields.order_id.max' = '2',
      'fields.amount.min' = '1',
      'fields.amount.max' = '2',
      'fields.product.min' = '1',
      'fields.product.max' = '2'
    );
    
    CREATE TABLE sink_table (
        product BIGINT,
        order_time TIMESTAMP(3),
        amount BIGINT,
        one_hour_prod_amount_sum BIGINT
    ) WITH ('connector' = 'print');
    
    INSERT INTO sink_table
    SELECT product, order_time, amount,
      SUM(amount) OVER (
        PARTITION BY product
        ORDER BY order_time
        -- 标识统计范围是一个 product 的最近 5 行数据
        ROWS BETWEEN 5 PRECEDING AND CURRENT ROW
      ) AS one_hour_prod_amount_sum
    FROM source_table

    预跑结果如下:

    +I[2, 2021-12-24T22:18:19.147, 1, 9]
    +I[1, 2021-12-24T22:18:20.147, 2, 11]
    +I[1, 2021-12-24T22:18:21.147, 2, 12]
    +I[1, 2021-12-24T22:18:22.147, 2, 12]
    +I[1, 2021-12-24T22:18:23.148, 2, 12]
    +I[1, 2021-12-24T22:18:24.147, 1, 11]
    +I[1, 2021-12-24T22:18:25.146, 1, 10]
    +I[1, 2021-12-24T22:18:26.147, 1, 9]
    +I[2, 2021-12-24T22:18:27.145, 2, 11]
    +I[2, 2021-12-24T22:18:28.148, 1, 10]
    +I[2, 2021-12-24T22:18:29.145, 2, 10]

    当然,如果你在一个 SELECT 中有多个聚合窗口的聚合方式,Flink SQL 支持了一种简化写法,如下案例:

    SELECT order_id, order_time, amount,
      SUM(amount) OVER w AS sum_amount,
      AVG(amount) OVER w AS avg_amount
    FROM Orders
    -- 使用下面子句,定义 Over Window
    WINDOW w AS (
      PARTITION BY product
      ORDER BY order_time
      RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW)

    阿里云 2 核 2G 服务器 3M 带宽 61 元 1 年,有高配

    腾讯云新客低至 82 元 / 年,老客户 99 元 / 年

    代金券:在阿里云专用满减优惠券

    正文完
    星哥玩云-微信公众号
    post-qrcode
     0
    星锅
    版权声明:本站原创文章,由 星锅 于2024-07-25发表,共计3247字。
    转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
    【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
    阿里云-最新活动爆款每日限量供应
    评论(没有评论)
    验证码
    【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

    星哥玩云

    星哥玩云
    星哥玩云
    分享互联网知识
    用户数
    4
    文章数
    19354
    评论数
    4
    阅读量
    8228121
    文章搜索
    热门文章
    星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

    星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

    星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
    星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

    星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

    星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
    再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

    再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

    再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
    飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

    飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

    飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
    星哥带你玩飞牛NAS-7:手把手教你免费内网穿透-Cloudflare tunnel

    星哥带你玩飞牛NAS-7:手把手教你免费内网穿透-Cloudflare tunnel

    星哥带你玩飞牛 NAS-7:手把手教你免费内网穿透 -Cloudflare tunnel 前言 大家好,我是星...
    阿里云CDN
    阿里云CDN-提高用户访问的响应速度和成功率
    随机文章
    飞牛NAS玩转Frpc并且配置,随时随地直连你的私有云

    飞牛NAS玩转Frpc并且配置,随时随地直连你的私有云

    飞牛 NAS 玩转 Frpc 并且配置,随时随地直连你的私有云 大家好,我是星哥,最近在玩飞牛 NAS。 在数...
    国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

    国产开源公众号AI知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率

    国产开源公众号 AI 知识库 Agent:突破未认证号限制,一键搞定自动回复,重构运营效率 大家好,我是星哥,...
    把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

    把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

    把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地 大家好,我是星哥,今天教大家在飞牛 NA...
    你的云服务器到底有多强?宝塔跑分告诉你

    你的云服务器到底有多强?宝塔跑分告诉你

    你的云服务器到底有多强?宝塔跑分告诉你 为什么要用宝塔跑分? 宝塔跑分其实就是对 CPU、内存、磁盘、IO 做...
    还在找免费服务器?无广告免费主机,新手也能轻松上手!

    还在找免费服务器?无广告免费主机,新手也能轻松上手!

    还在找免费服务器?无广告免费主机,新手也能轻松上手! 前言 对于个人开发者、建站新手或是想搭建测试站点的从业者...

    免费图片视频管理工具让灵感库告别混乱

    一言一句话
    -「
    手气不错
    云服务器部署OpenClaw:轻量应用服务器+钉钉和QQ机器人

    云服务器部署OpenClaw:轻量应用服务器+钉钉和QQ机器人

      云服务器部署 OpenClaw:轻量应用服务器 + 钉钉和 QQ 机器人 一、前言 最近开源圈爆...
    免费获得大模型的Api-Key的方法:英伟达提供GLM-4.7、Minimax M2.1模型和GitHub的AI大模型API申请

    免费获得大模型的Api-Key的方法:英伟达提供GLM-4.7、Minimax M2.1模型和GitHub的AI大模型API申请

      免费获得大模型的 Api-Key 的方法:英伟达提供 GLM-4.7、Minimax M2.1 ...
    零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

    零成本上线!用 Hugging Face免费服务器+Docker 快速部署HertzBeat 监控平台

    零成本上线!用 Hugging Face 免费服务器 +Docker 快速部署 HertzBeat 监控平台 ...
    告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

    告别Notion焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁”

      告别 Notion 焦虑!这款全平台开源加密笔记神器,让你的隐私真正“上锁” 引言 在数字笔记工...
    仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

    仅2MB大小!开源硬件监控工具:Win11 无缝适配,CPU、GPU、网速全维度掌控

    还在忍受动辄数百兆的“全家桶”监控软件?后台偷占资源、界面杂乱冗余,想查个 CPU 温度都要层层点选? 今天给...