阿里云-云小站(无限量代金券发放中)
【腾讯云】云服务器、云数据库、COS、CDN、短信等热卖云产品特惠抢购

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

427次阅读
没有评论

共计 9584 个字符,预计需要花费 24 分钟才能阅读完成。

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

介绍

KSQL 是 Apache Kafka 中的开源的流式 SQL 引擎。它可以让你在 Kafka 主题 topic 上,使用一个简单的并且是交互式的 SQL 接口,很容易地做一些复杂的流处理。在这个短文中,我们将看到如何轻松地配置并运行在一个沙箱中去探索它,并使用大家都喜欢的演示数据库源:Twitter。我们将从推文的原始流中获取,通过使用 KSQL 中的条件去过滤它,来构建一个聚合,如统计每个用户每小时的推文数量。

 

Confluent

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

首先,获取一个 Confluent 平台的副本。我使用的是 RPM 包,但是,如果你需要的话,你也可以使用 tar、zip 等等。启动 Confluent 系统:

  1. $ confluent start

(如果你感兴趣,这里有一个 Confluent 命令行的快速教程)

我们将使用 Kafka Connect 从 Twitter 上拉取数据。这个 Twitter 连接器可以在 GitHub 上找到。要安装它,像下面这样操作:

  1. #Clone the git repo
  2. cd/home/rmoff
  3. gitclone https://github.com/jcustenborder/kafka-connect-twitter.git
  1. #Compile the code
  2. cd kafka-connect-twitter
  3. mvn clean package

要让 Kafka Connect 去使用我们构建的连接器,你要去修改配置文件。因为我们使用 Confluent 命令行,真实的配置文件是在 etc/schema-registry/connect-avro-distributed.properties,因此去修改它并增加如下内容:

  1. plugin.path=/home/rmoff/kafka-connect-twitter/target/kafka-connect-twitter-0.2-SNAPSHOT.tar.gz

重启动 Kafka Connect:

  1. confluent stop connect
  2. confluent start connect

一旦你安装好插件,你可以很容易地去配置它。你可以直接使用 Kafka Connect 的 REST API,或者创建你的配置文件,这就是我要在这里做的。如果你需要全部的方法,请首先访问 Twitter 来获取你的 API 密钥。

  1. {
  2. "name":"twitter_source_json_01",
  3. "config":{
  4. "connector.class":"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
  5. "twitter.oauth.accessToken":"xxxx",
  6. "twitter.oauth.consumerSecret":"xxxxx",
  7. "twitter.oauth.consumerKey":"xxxx",
  8. "twitter.oauth.accessTokenSecret":"xxxxx",
  9. "kafka.delete.topic":"twitter_deletes_json_01",
  10. "value.converter":"org.apache.kafka.connect.json.JsonConverter",
  11. "key.converter":"org.apache.kafka.connect.json.JsonConverter",
  12. "value.converter.schemas.enable":false,
  13. "key.converter.schemas.enable":false,
  14. "kafka.status.topic":"twitter_json_01",
  15. "process.deletes":true,
  16. "filter.keywords":"rickastley,kafka,ksql,rmoff"
  17. }
  18. }

假设你写这些到 /home/rmoff/twitter-source.json,你可以现在运行:

  1. $ confluent load twitter_source -d /home/rmoff/twitter-source.json

然后推文就从大家都喜欢的网络明星 [rick] 滚滚而来……

  1. $ kafka-console-consumer --bootstrap-server localhost:9092--from-beginning --topic twitter_json_01|jq '.Text'
  2. {
  3. "string":"RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB"
  4. }
  5. {
  6. "string":"RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!\nDo not forget Chile!!\nWe hope you get back someday!!\nHappy weekend for you!!\n❤…"
  7. }

 

KSQL

现在我们从 KSQL 开始 ! 马上去下载并构建它:

  1. cd/home/rmoff
  2. gitclone https://github.com/confluentinc/ksql.git
  3. cd/home/rmoff/ksql
  4. mvn clean compile install -DskipTests

构建完成后,让我们来运行它:

  1. ./bin/ksql-cli local--bootstrap-server localhost:9092
  1. ======================================
  2. = _ __ _____ ____ _ =
  3. =||/ // ____|/ __ \| |=
  4. =|' /| (___ | | | | | =
  5. = | < \___ \| | | | | =
  6. = | . \ ____) | |__| | |____ =
  7. = |_|\_\_____/ \___\_\______| =
  8. = =
  9. = Streaming SQL Engine for Kafka =
  10. Copyright 2017 Confluent Inc.
  11. CLI v0.1, Server v0.1 located at http://localhost:9098
  12. Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!
  13. ksql>

使用 KSQL,我们可以让我们的数据保留在 Kafka 主题上并可以查询它。首先,我们需要去告诉 KSQL 主题上的数据模式 schema 是什么,一个 twitter 消息实际上是一个非常巨大的 JSON 对象,但是,为了简洁,我们只选出其中几行:

  1. ksql> CREATE STREAM twitter_raw (CreatedAt BIGINT,Id BIGINT,Text VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01', VALUE_FORMAT='JSON');
  2. Message
  3. ----------------
  4. Stream created

在定义的模式中,我们可以查询这些流。要让 KSQL 从该主题的开始展示数据(而不是默认的当前时间点),运行如下命令:

  1. ksql> SET 'auto.offset.reset'='earliest';
  2. Successfully changed localproperty'auto.offset.reset'from'null' to 'earliest'

现在,让我们看看这些数据,我们将使用 LIMIT 从句仅检索一行:

  1. ksql> SELECT text FROM twitter_raw LIMIT 1;
  2. RT @rickastley:30 years ago today I said I was NeverGonnaGiveYouUp. I am a man of my word -Rick x https://t.co/VmbMQA6tQB
  3. LIMIT reached for the partition.
  4. Query terminated
  5. ksql>

现在,让我们使用刚刚定义和可用的推文内容的全部数据重新定义该流:

  1. ksql> DROP stream twitter_raw;
  2. Message
  3. --------------------------------
  4. Source TWITTER_RAW was dropped
  5. ksql> CREATE STREAM twitter_raw (CreatedAt bigint,Id bigint,Text VARCHAR, SOURCE VARCHAR,Truncated VARCHAR,InReplyToStatusId VARCHAR,InReplyToUserId VARCHAR,InReplyToScreenName VARCHAR,GeoLocation VARCHAR,Place VARCHAR,Favorited VARCHAR,Retweeted VARCHAR,FavoriteCount VARCHAR,User VARCHAR,Retweet VARCHAR,Contributors VARCHAR,RetweetCount VARCHAR,RetweetedByMe VARCHAR,CurrentUserRetweetId VARCHAR,PossiblySensitive VARCHAR,Lang VARCHAR,WithheldInCountries VARCHAR,HashtagEntities VARCHAR,UserMentionEntities VARCHAR,MediaEntities VARCHAR,SymbolEntities VARCHAR,URLEntities VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01',VALUE_FORMAT='JSON');
  6. Message
  7. ----------------
  8. Stream created
  9. ksql>

现在,我们可以操作和检查更多的最近的数据,使用一般的 SQL 查询:

  1. ksql> SELECT TIMESTAMPTOSTRING(CreatedAt,'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
  2. EXTRACTJSONFIELD(user,'$.ScreenName')asScreenName,Text \
  3. FROM twitter_raw \
  4. WHERE LCASE(hashtagentities) LIKE '%oow%' OR \
  5. LCASE(hashtagentities) LIKE '%ksql%';
  6. 2017-09-2913:59:58.000| rmoff |Looking forward to talking all about @apachekafka&@confluentincs #KSQL at #OOW17 on Sunday13:45 https://t.co/XbM4eIuzeG

注意这里没有 LIMIT 从句,因此,你将在屏幕上看到“continuous query”的结果。不像关系型数据表中返回一个确定数量结果的查询,一个持续查询会运行在无限的流式数据上,因此,它总是可能返回更多的记录。点击 Ctrl-C 去中断然后返回到 KSQL 提示符。在以上的查询中我们做了一些事情:

  • TIMESTAMPTOSTRING 将时间戳从 epoch 格式转换到人类可读格式。(LCTT 译注:epoch 指的是一个特定的时间 1970-01-01 00:00:00 UTC)
  • EXTRACTJSONFIELD 来展示数据源中嵌套的用户域中的一个字段,它看起来像:

    1. {
    2. "CreatedAt":1506570308000,
    3. "Text":"RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6",
    4. [...]
    5. "User":{
    6. "Id":82564066,
    7. "Name":"Robin Moffatt \uD83C\uDF7B\uD83C\uDFC3\uD83E\uDD53",
    8. "ScreenName":"rmoff",
    9. [...]
  • 应用断言去展示内容,对 #(hashtag)使用模式匹配,使用 LCASE 去强制小写字母。(LCTT 译注:hashtag 是 twitter 中用来标注线索主题的标签)

关于支持的函数列表,请查看 KSQL 文档。

我们可以创建一个从这个数据中得到的流:

  1. ksql> CREATE STREAM twitter AS \
  2. SELECT TIMESTAMPTOSTRING(CreatedAt,'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
  3. EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
  4. EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
  5. EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
  6. EXTRACTJSONFIELD(user,'$.Description') AS user_Description,\
  7. Text,hashtagentities,lang \
  8. FROM twitter_raw ;
  9. Message
  10. ----------------------------
  11. Stream created and running
  12. ksql> DESCRIBE twitter;
  13. Field|Type
  14. ------------------------------------
  15. ROWTIME | BIGINT
  16. ROWKEY | VARCHAR(STRING)
  17. CREATEDAT | VARCHAR(STRING)
  18. USER_NAME | VARCHAR(STRING)
  19. USER_SCREENNAME | VARCHAR(STRING)
  20. USER_LOCATION | VARCHAR(STRING)
  21. USER_DESCRIPTION | VARCHAR(STRING)
  22. TEXT | VARCHAR(STRING)
  23. HASHTAGENTITIES | VARCHAR(STRING)
  24. LANG | VARCHAR(STRING)
  25. ksql>

并且查询这个得到的流:

  1. ksql> SELECT CREATEDAT, USER_NAME, TEXT \
  2. FROM TWITTER \
  3. WHERE TEXT LIKE '%KSQL%';
  4. 2017-10-0323:39:37.000|NicolaFerraro| RT @flashdba:Again, I'm really taken with the possibilities opened up by @confluentinc's KSQL engine #Kafka https://t.co/aljnScgvvs

 

聚合

在我们结束之前,让我们去看一下怎么去做一些聚合。

  1. ksql> SELECT user_screenname, COUNT(*) \
  2. FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
  3. GROUP BY user_screenname HAVING COUNT(*)>1;
  4. Oracleace |2
  5. rojulman |2
  6. smokeinpublic |2
  7. ArtFlowMe|2
  8. [...]

你将可能得到满屏幕的结果;这是因为 KSQL 在每次给定的时间窗口更新时实际发出聚合值。因为我们设置 KSQL 去读取在主题上的全部消息(SET 'auto.offset.reset' = 'earliest';),它是一次性读取这些所有的消息并计算聚合更新。这里有一个微妙之处值得去深入研究。我们的入站推文流正好就是一个流。但是,现有它不能创建聚合,我们实际上是创建了一个表。一个表是在给定时间点的给定键的值的一个快照。KSQL 聚合数据基于消息的事件时间,并且如果它更新了,通过简单的相关窗口重申去操作后面到达的数据。困惑了吗?我希望没有,但是,让我们看一下,如果我们可以用这个例子去说明。我们将申明我们的聚合作为一个真实的表:

  1. ksql> CREATE TABLE user_tweet_count AS \
  2. SELECT user_screenname, count(*) AS tweet_count \
  3. FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
  4. GROUP BY user_screenname ;
  5. Message
  6. ---------------------------
  7. Table created and running

看表中的列,这里除了我们要求的外,还有两个隐含列:

  1. ksql> DESCRIBE user_tweet_count;
  2. Field|Type
  3. -----------------------------------
  4. ROWTIME | BIGINT
  5. ROWKEY | VARCHAR(STRING)
  6. USER_SCREENNAME | VARCHAR(STRING)
  7. TWEET_COUNT | BIGINT
  8. ksql>

我们看一下这些是什么:

  1. ksql> SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss.SSS'), \
  2. ROWKEY, USER_SCREENNAME, TWEET_COUNT \
  3. FROM user_tweet_count \
  4. WHERE USER_SCREENNAME='rmoff';
  5. 2017-09-2911:00:00.000| rmoff :Window{start=1506708000000end=-}| rmoff |2
  6. 2017-09-2912:00:00.000| rmoff :Window{start=1506711600000end=-}| rmoff |4
  7. 2017-09-2822:00:00.000| rmoff :Window{start=1506661200000end=-}| rmoff |2
  8. 2017-09-2909:00:00.000| rmoff :Window{start=1506700800000end=-}| rmoff |4
  9. 2017-09-2915:00:00.000| rmoff :Window{start=1506722400000end=-}| rmoff |2
  10. 2017-09-2913:00:00.000| rmoff :Window{start=1506715200000end=-}| rmoff |6

ROWTIME 是窗口开始时间,ROWKEY 是 GROUP BYUSER_SCREENNAME)加上窗口的组合。因此,我们可以通过创建另外一个衍生的表来整理一下:

  1. ksql> CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \
  2. SELECT TIMESTAMPTOSTRING(ROWTIME,'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START ,\
  3. USER_SCREENNAME, TWEET_COUNT \
  4. FROM user_tweet_count;
  5. Message
  6. ---------------------------
  7. Table created and running

现在它更易于查询和查看我们感兴趣的数据:

  1. ksql> SELECT WINDOW_START , USER_SCREENNAME, TWEET_COUNT \
  2. FROM USER_TWEET_COUNT_DISPLAY WHERE TWEET_COUNT>20;
  3. 2017-09-2912:00:00.000|VikasAatOracle|22
  4. 2017-09-2814:00:00.000|Throne_ie|50
  5. 2017-09-2814:00:00.000| pikipiki_net |22
  6. 2017-09-2909:00:00.000| johanlouwers |22
  7. 2017-09-2809:00:00.000| yvrk1973 |24
  8. 2017-09-2813:00:00.000| cmosoares |22
  9. 2017-09-2911:00:00.000| ypoirier |24
  10. 2017-09-2814:00:00.000| pikisec |22
  11. 2017-09-2907:00:00.000|Throne_ie|22
  12. 2017-09-2909:00:00.000|ChrisVoyance|24
  13. 2017-09-2811:00:00.000|ChrisVoyance|28

 

结论

所以我们有了它!我们可以从 Kafka 中取得数据,并且很容易使用 KSQL 去探索它。而不仅是去浏览和转换数据,我们可以很容易地使用 KSQL 从流和表中建立流处理。

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

如果你对 KSQL 能够做什么感兴趣,去查看:

  • KSQL 公告
  • 我们最近的 KSQL 在线研讨会 和 Kafka 峰会讲演
  • clickstream 演示,它是 KSQL 的 GitHub 仓库 的一部分
  • 我最近做的演讲 展示了 KSQL 如何去支持基于流的 ETL 平台

记住,KSQL 现在正处于开发者预览阶段。欢迎在 KSQL 的 GitHub 仓库上提出任何问题,或者去我们的 community Slack group 的 #KSQL 频道。

下面关于 Kafka 的文章您也可能喜欢,不妨参考下:

CentOS 7.2 部署 Elasticsearch+Kibana+Zookeeper+Kafka  http://www.linuxidc.com/Linux/2016-11/137636.htm

CentOS 7 下安装 Logstash ELK Stack 日志管理系统  http://www.linuxidc.com/Linux/2016-08/134165.htm

Kafka 集群部署与配置手册 http://www.linuxidc.com/Linux/2017-02/141037.htm

CentOS 7 下 Kafka 集群安装  http://www.linuxidc.com/Linux/2017-01/139734.htm

Apache Kafka 教程笔记 http://www.linuxidc.com/Linux/2014-01/94682.htm

CentOS 7 下安装 Kafka 单机版  http://www.linuxidc.com/Linux/2017-01/139732.htm

Apache kafka 原理与特性(0.8V)  http://www.linuxidc.com/Linux/2014-09/107388.htm

Kafka 部署与代码实例  http://www.linuxidc.com/Linux/2014-09/107387.htm

Kafka 介绍及环境搭建  http://www.linuxidc.com/Linux/2016-12/138724.htm

Kafka 介绍和集群环境搭建  http://www.linuxidc.com/Linux/2014-09/107382.htm

CentOS7.0 安装配置 Kafka 集群  http://www.linuxidc.com/Linux/2017-06/144951.htm

Kafka 的详细介绍:请点这里
Kafka 的下载地址:请点这里


via: https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka

作者:Robin Moffatt 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux 中国 荣誉推出

本文永久更新链接地址:http://www.linuxidc.com/Linux/2017-11/148455.htm

正文完
星哥玩云-微信公众号
post-qrcode
 0
星锅
版权声明:本站原创文章,由 星锅 于2022-01-21发表,共计9584字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
【腾讯云】推广者专属福利,新客户无门槛领取总价值高达2860元代金券,每种代金券限量500张,先到先得。
阿里云-最新活动爆款每日限量供应
评论(没有评论)
验证码
【腾讯云】云服务器、云数据库、COS、CDN、短信等云产品特惠热卖中

星哥玩云

星哥玩云
星哥玩云
分享互联网知识
用户数
4
文章数
19351
评论数
4
阅读量
7988074
文章搜索
热门文章
星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛NAS-6:抖音视频同步工具,视频下载自动下载保存

星哥带你玩飞牛 NAS-6:抖音视频同步工具,视频下载自动下载保存 前言 各位玩 NAS 的朋友好,我是星哥!...
星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛NAS-3:安装飞牛NAS后的很有必要的操作

星哥带你玩飞牛 NAS-3:安装飞牛 NAS 后的很有必要的操作 前言 如果你已经有了飞牛 NAS 系统,之前...
我把用了20年的360安全卫士卸载了

我把用了20年的360安全卫士卸载了

我把用了 20 年的 360 安全卫士卸载了 是的,正如标题你看到的。 原因 偷摸安装自家的软件 莫名其妙安装...
再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见zabbix!轻量级自建服务器监控神器在Linux 的完整部署指南

再见 zabbix!轻量级自建服务器监控神器在 Linux 的完整部署指南 在日常运维中,服务器监控是绕不开的...
飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛NAS中安装Navidrome音乐文件中文标签乱码问题解决、安装FntermX终端

飞牛 NAS 中安装 Navidrome 音乐文件中文标签乱码问题解决、安装 FntermX 终端 问题背景 ...
阿里云CDN
阿里云CDN-提高用户访问的响应速度和成功率
随机文章
每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年0.99刀,拿下你的第一个顶级域名,详细注册使用

每年 0.99 刀,拿下你的第一个顶级域名,详细注册使用 前言 作为长期折腾云服务、域名建站的老玩家,星哥一直...
把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地 大家好,我是星哥,今天教大家在飞牛 NA...
一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸

一句话生成拓扑图!AI+Draw.io 封神开源组合,工具让你的效率爆炸 前言 作为天天跟架构图、拓扑图死磕的...
在Windows系统中通过VMware安装苹果macOS15

在Windows系统中通过VMware安装苹果macOS15

在 Windows 系统中通过 VMware 安装苹果 macOS15 许多开发者和爱好者希望在 Window...
【开源神器】微信公众号内容单篇、批量下载软件

【开源神器】微信公众号内容单篇、批量下载软件

【开源神器】微信公众号内容单篇、批量下载软件 大家好,我是星哥,很多人都希望能高效地保存微信公众号的文章,用于...

免费图片视频管理工具让灵感库告别混乱

一言一句话
-「
手气不错
星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛NAS硬件02:某鱼6张左右就可拿下5盘位的飞牛圣体NAS

星哥带你玩飞牛 NAS 硬件 02:某鱼 6 张左右就可拿下 5 盘位的飞牛圣体 NAS 前言 大家好,我是星...
星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的NAS中!

星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的NAS中!

星哥带你玩飞牛 NAS-10:备份微信聊天记录、数据到你的 NAS 中! 大家对「数据安全感」的需求越来越高 ...
把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地

把小米云笔记搬回家:飞牛 NAS 一键部署,小米云笔记自动同步到本地 大家好,我是星哥,今天教大家在飞牛 NA...
星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

星哥带你玩飞牛NAS-16:飞牛云NAS换桌面,fndesk图标管理神器上线!

  星哥带你玩飞牛 NAS-16:飞牛云 NAS 换桌面,fndesk 图标管理神器上线! 引言 哈...
星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择

星哥带你玩飞牛NAS-12:开源笔记的进化之路,效率玩家的新选择

星哥带你玩飞牛 NAS-12:开源笔记的进化之路,效率玩家的新选择 前言 如何高效管理知识与笔记,已经成为技术...