如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

介绍

KSQL 是 Apache Kafka 中的开源的流式 SQL 引擎。它可以让你在 Kafka 主题topic上,使用一个简单的并且是交互式的 SQL 接口,很容易地做一些复杂的流处理。在这个短文中,我们将看到如何轻松地配置并运行在一个沙箱中去探索它,并使用大家都喜欢的演示数据库源: Twitter。我们将从推文的原始流中获取,通过使用 KSQL 中的条件去过滤它,来构建一个聚合,如统计每个用户每小时的推文数量。

 

Confluent

如何在 Apache Kafka 中通过 KSQL 分析 Twitter 数据

首先, 获取一个 Confluent 平台的副本。我使用的是 RPM 包,但是,如果你需要的话,你也可以使用 tar、 zip 等等 。启动 Confluent 系统:

  1. <span class="pln">$ confluent start</span>

(如果你感兴趣,这里有一个 Confluent 命令行的快速教程

我们将使用 Kafka Connect 从 Twitter 上拉取数据。 这个 Twitter 连接器可以在 GitHub 上找到。要安装它,像下面这样操作:

  1. <span class="com">#</span><span class="typ">Clone</span><span class="pln"> the </span><span class="kwd">git</span><span class="pln"> repo</span>
  2. <span class="kwd">cd</span><span class="pun">/</span><span class="pln">home</span><span class="pun">/</span><span class="pln">rmoff</span>
  3. <span class="kwd">git</span><span class="kwd">clone</span><span class="pln"> https</span><span class="pun">:</span><span class="com">//github.com/jcustenborder/kafka-connect-twitter.git</span>
  1. <span class="com">#</span><span class="typ">Compile</span><span class="pln"> the code</span>
  2. <span class="kwd">cd</span><span class="pln"> kafka</span><span class="pun">-</span><span class="pln">connect</span><span class="pun">-</span><span class="pln">twitter</span>
  3. <span class="pln">mvn clean </span><span class="kwd">package</span>

要让 Kafka Connect 去使用我们构建的连接器, 你要去修改配置文件。因为我们使用 Confluent 命令行,真实的配置文件是在 etc/schema-registry/connect-avro-distributed.properties,因此去修改它并增加如下内容:

  1. <span class="pln">plugin</span><span class="pun">.</span><span class="pln">path</span><span class="pun">=</span><span class="str">/home/</span><span class="pln">rmoff</span><span class="pun">/</span><span class="pln">kafka</span><span class="pun">-</span><span class="pln">connect</span><span class="pun">-</span><span class="pln">twitter</span><span class="pun">/</span><span class="pln">target</span><span class="pun">/</span><span class="pln">kafka</span><span class="pun">-</span><span class="pln">connect</span><span class="pun">-</span><span class="pln">twitter</span><span class="pun">-</span><span class="lit">0.2</span><span class="pun">-</span><span class="pln">SNAPSHOT</span><span class="pun">.</span><span class="kwd">tar</span><span class="pun">.</span><span class="pln">gz</span>

重启动 Kafka Connect:

  1. <span class="pln">confluent stop connect</span>
  2. <span class="pln">confluent start connect</span>

一旦你安装好插件,你可以很容易地去配置它。你可以直接使用 Kafka Connect 的 REST API ,或者创建你的配置文件,这就是我要在这里做的。如果你需要全部的方法,请首先访问 Twitter 来获取你的 API 密钥

  1. <span class="pun">{</span>
  2. <span class="str">"name"</span><span class="pun">:</span><span class="str">"twitter_source_json_01"</span><span class="pun">,</span>
  3. <span class="str">"config"</span><span class="pun">:</span><span class="pun">{</span>
  4. <span class="str">"connector.class"</span><span class="pun">:</span><span class="str">"com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector"</span><span class="pun">,</span>
  5. <span class="str">"twitter.oauth.accessToken"</span><span class="pun">:</span><span class="str">"xxxx"</span><span class="pun">,</span>
  6. <span class="str">"twitter.oauth.consumerSecret"</span><span class="pun">:</span><span class="str">"xxxxx"</span><span class="pun">,</span>
  7. <span class="str">"twitter.oauth.consumerKey"</span><span class="pun">:</span><span class="str">"xxxx"</span><span class="pun">,</span>
  8. <span class="str">"twitter.oauth.accessTokenSecret"</span><span class="pun">:</span><span class="str">"xxxxx"</span><span class="pun">,</span>
  9. <span class="str">"kafka.delete.topic"</span><span class="pun">:</span><span class="str">"twitter_deletes_json_01"</span><span class="pun">,</span>
  10. <span class="str">"value.converter"</span><span class="pun">:</span><span class="str">"org.apache.kafka.connect.json.JsonConverter"</span><span class="pun">,</span>
  11. <span class="str">"key.converter"</span><span class="pun">:</span><span class="str">"org.apache.kafka.connect.json.JsonConverter"</span><span class="pun">,</span>
  12. <span class="str">"value.converter.schemas.enable"</span><span class="pun">:</span><span class="kwd">false</span><span class="pun">,</span>
  13. <span class="str">"key.converter.schemas.enable"</span><span class="pun">:</span><span class="kwd">false</span><span class="pun">,</span>
  14. <span class="str">"kafka.status.topic"</span><span class="pun">:</span><span class="str">"twitter_json_01"</span><span class="pun">,</span>
  15. <span class="str">"process.deletes"</span><span class="pun">:</span><span class="kwd">true</span><span class="pun">,</span>
  16. <span class="str">"filter.keywords"</span><span class="pun">:</span><span class="str">"rickastley,kafka,ksql,rmoff"</span>
  17. <span class="pun">}</span>
  18. <span class="pun">}</span>

假设你写这些到 /home/rmoff/twitter-source.json,你可以现在运行:

  1. <span class="pln">$ confluent load twitter_source </span><span class="pun">-</span><span class="pln">d </span><span class="pun">/</span><span class="pln">home</span><span class="pun">/</span><span class="pln">rmoff</span><span class="pun">/</span><span class="pln">twitter</span><span class="pun">-</span><span class="pln">source</span><span class="pun">.</span><span class="pln">json</span>

然后推文就从大家都喜欢的网络明星 [rick] 滚滚而来……

  1. <span class="pln">$ kafka</span><span class="pun">-</span><span class="pln">console</span><span class="pun">-</span><span class="pln">consumer </span><span class="pun">--</span><span class="pln">bootstrap</span><span class="pun">-</span><span class="pln">server localhost</span><span class="pun">:</span><span class="lit">9092</span><span class="pun">--</span><span class="kwd">from</span><span class="pun">-</span><span class="pln">beginning </span><span class="pun">--</span><span class="pln">topic twitter_json_01</span><span class="pun">|</span><span class="pln">jq </span><span class="str">'.Text'</span>
  2. <span class="pun">{</span>
  3. <span class="str">"string"</span><span class="pun">:</span><span class="str">"RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB"</span>
  4. <span class="pun">}</span>
  5. <span class="pun">{</span>
  6. <span class="str">"string"</span><span class="pun">:</span><span class="str">"RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!\nDo not forget Chile!!\nWe hope you get back someday!!\nHappy weekend for you!!\n❤…"</span>
  7. <span class="pun">}</span>

 

KSQL

现在我们从 KSQL 开始 ! 马上去下载并构建它:

  1. <span class="kwd">cd</span><span class="pun">/</span><span class="pln">home</span><span class="pun">/</span><span class="pln">rmoff</span>
  2. <span class="kwd">git</span><span class="kwd">clone</span><span class="pln"> https</span><span class="pun">:</span><span class="com">//github.com/confluentinc/ksql.git</span>
  3. <span class="kwd">cd</span><span class="pun">/</span><span class="pln">home</span><span class="pun">/</span><span class="pln">rmoff</span><span class="pun">/</span><span class="pln">ksql</span>
  4. <span class="pln">mvn clean compile install </span><span class="pun">-</span><span class="typ">DskipTests</span>

构建完成后,让我们来运行它:

  1. <span class="pun">.</span><span class="str">/bin/</span><span class="pln">ksql</span><span class="pun">-</span><span class="pln">cli </span><span class="kwd">local</span><span class="pun">--</span><span class="pln">bootstrap</span><span class="pun">-</span><span class="pln">server localhost</span><span class="pun">:</span><span class="lit">9092</span>
  1. <span class="pun">======================================</span>
  2. <span class="pun">=</span><span class="pln"> _ __ _____ ____ _ </span><span class="pun">=</span>
  3. <span class="pun">=</span><span class="pun">|</span><span class="pun">|</span><span class="str">/ /</span><span class="pun">/</span><span class="pln"> ____</span><span class="pun">|/</span><span class="pln"> __ \| </span><span class="pun">|</span><span class="pun">=</span>
  4. <span class="pun">=</span><span class="pun">|</span><span class="str">' /| (___ | | | | | =</span>
  5. <span class="str">= | < \___ \| | | | | =</span>
  6. <span class="str">= | . \ ____) | |__| | |____ =</span>
  7. <span class="str">= |_|\_\_____/ \___\_\______| =</span>
  8. <span class="str">= =</span>
  9. <span class="str">= Streaming SQL Engine for Kafka =</span>
  10. <span class="str">Copyright 2017 Confluent Inc.</span>
  11. <span class="str">CLI v0.1, Server v0.1 located at http://localhost:9098</span>
  12. <span class="str">Having trouble? Type '</span><span class="pln">help</span><span class="str">' (case-insensitive) for a rundown of how things work!</span>
  13. <span class="str">ksql> </span>

使用 KSQL, 我们可以让我们的数据保留在 Kafka 主题上并可以查询它。首先,我们需要去告诉 KSQL 主题上的数据模式schema是什么,一个 twitter 消息实际上是一个非常巨大的 JSON 对象, 但是,为了简洁,我们只选出其中几行:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> CREATE STREAM twitter_raw </span><span class="pun">(</span><span class="typ">CreatedAt</span><span class="pln"> BIGINT</span><span class="pun">,</span><span class="typ">Id</span><span class="pln"> BIGINT</span><span class="pun">,</span><span class="typ">Text</span><span class="pln"> VARCHAR</span><span class="pun">)</span><span class="pln"> WITH </span><span class="pun">(</span><span class="pln">KAFKA_TOPIC</span><span class="pun">=</span><span class="str">'twitter_json_01'</span><span class="pun">,</span><span class="pln"> VALUE_FORMAT</span><span class="pun">=</span><span class="str">'JSON'</span><span class="pun">);</span>
  2. <span class="typ">Message</span>
  3. <span class="pun">----------------</span>
  4. <span class="typ">Stream</span><span class="pln"> created</span>

在定义的模式中,我们可以查询这些流。要让 KSQL 从该主题的开始展示数据(而不是默认的当前时间点),运行如下命令:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SET </span><span class="str">'auto.offset.reset'</span><span class="pun">=</span><span class="str">'earliest'</span><span class="pun">;</span>
  2. <span class="typ">Successfully</span><span class="pln"> changed </span><span class="kwd">local</span><span class="kwd">property</span><span class="str">'auto.offset.reset'</span><span class="kwd">from</span><span class="str">'null'</span><span class="pln"> to </span><span class="str">'earliest'</span>

现在,让我们看看这些数据,我们将使用 LIMIT 从句仅检索一行:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT text FROM twitter_raw LIMIT </span><span class="lit">1</span><span class="pun">;</span>
  2. <span class="pln">RT </span><span class="lit">@rickastley</span><span class="pun">:</span><span class="lit">30</span><span class="pln"> years ago today I said I was </span><span class="typ">Never</span><span class="typ">Gonna</span><span class="typ">Give</span><span class="typ">You</span><span class="typ">Up</span><span class="pun">.</span><span class="pln"> I am a </span><span class="kwd">man</span><span class="pln"> of </span><span class="kwd">my</span><span class="pln"> word </span><span class="pun">-</span><span class="typ">Rick</span><span class="pln"> x https</span><span class="pun">:</span><span class="com">//t.co/VmbMQA6tQB</span>
  3. <span class="pln">LIMIT reached </span><span class="kwd">for</span><span class="pln"> the partition</span><span class="pun">.</span>
  4. <span class="typ">Query</span><span class="pln"> terminated</span>
  5. <span class="pln">ksql</span><span class="pun">></span>

现在,让我们使用刚刚定义和可用的推文内容的全部数据重新定义该流:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> DROP stream twitter_raw</span><span class="pun">;</span>
  2. <span class="typ">Message</span>
  3. <span class="pun">--------------------------------</span>
  4. <span class="typ">Source</span><span class="pln"> TWITTER_RAW was dropped</span>
  5. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> CREATE STREAM twitter_raw </span><span class="pun">(</span><span class="typ">CreatedAt</span><span class="pln"> bigint</span><span class="pun">,</span><span class="typ">Id</span><span class="pln"> bigint</span><span class="pun">,</span><span class="typ">Text</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="pln"> SOURCE VARCHAR</span><span class="pun">,</span><span class="typ">Truncated</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">InReplyToStatusId</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">InReplyToUserId</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">InReplyToScreenName</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">GeoLocation</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Place</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Favorited</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Retweeted</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">FavoriteCount</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">User</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Retweet</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Contributors</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">RetweetCount</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">RetweetedByMe</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">CurrentUserRetweetId</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">PossiblySensitive</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">Lang</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">WithheldInCountries</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">HashtagEntities</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">UserMentionEntities</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">MediaEntities</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">SymbolEntities</span><span class="pln"> VARCHAR</span><span class="pun">,</span><span class="typ">URLEntities</span><span class="pln"> VARCHAR</span><span class="pun">)</span><span class="pln"> WITH </span><span class="pun">(</span><span class="pln">KAFKA_TOPIC</span><span class="pun">=</span><span class="str">'twitter_json_01'</span><span class="pun">,</span><span class="pln">VALUE_FORMAT</span><span class="pun">=</span><span class="str">'JSON'</span><span class="pun">);</span>
  6. <span class="typ">Message</span>
  7. <span class="pun">----------------</span>
  8. <span class="typ">Stream</span><span class="pln"> created</span>
  9. <span class="pln">ksql</span><span class="pun">></span>

现在,我们可以操作和检查更多的最近的数据,使用一般的 SQL 查询:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT TIMESTAMPTOSTRING</span><span class="pun">(</span><span class="typ">CreatedAt</span><span class="pun">,</span><span class="str">'yyyy-MM-dd HH:mm:ss.SSS'</span><span class="pun">)</span><span class="pln"> AS </span><span class="typ">CreatedAt</span><span class="pun">,</span><span class="pln">\</span>
  2. <span class="pln">EXTRACTJSONFIELD</span><span class="pun">(</span><span class="pln">user</span><span class="pun">,</span><span class="str">'$.ScreenName'</span><span class="pun">)</span><span class="kwd">as</span><span class="typ">ScreenName</span><span class="pun">,</span><span class="typ">Text</span><span class="pln"> \</span>
  3. <span class="pln">FROM twitter_raw \</span>
  4. <span class="pln">WHERE LCASE</span><span class="pun">(</span><span class="pln">hashtagentities</span><span class="pun">)</span><span class="pln"> LIKE </span><span class="str">'%oow%'</span><span class="pln"> OR \</span>
  5. <span class="pln">LCASE</span><span class="pun">(</span><span class="pln">hashtagentities</span><span class="pun">)</span><span class="pln"> LIKE </span><span class="str">'%ksql%'</span><span class="pun">;</span>
  6. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">13</span><span class="pun">:</span><span class="lit">59</span><span class="pun">:</span><span class="lit">58.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="typ">Looking</span><span class="pln"> forward to talking all about </span><span class="lit">@apachekafka</span><span class="pun">&</span><span class="lit">@confluentinc</span><span class="pun">’</span><span class="pln">s </span><span class="com">#</span><span class="pln">KSQL at </span><span class="com">#</span><span class="pln">OOW17 on </span><span class="typ">Sunday</span><span class="lit">13</span><span class="pun">:</span><span class="lit">45</span><span class="pln"> https</span><span class="pun">:</span><span class="com">//t.co/XbM4eIuzeG</span>

注意这里没有 LIMIT 从句,因此,你将在屏幕上看到  “continuous query” 的结果。不像关系型数据表中返回一个确定数量结果的查询,一个持续查询会运行在无限的流式数据上, 因此,它总是可能返回更多的记录。点击 Ctrl-C 去中断然后返回到 KSQL 提示符。在以上的查询中我们做了一些事情:

  • TIMESTAMPTOSTRING 将时间戳从 epoch 格式转换到人类可读格式。(LCTT 译注: epoch 指的是一个特定的时间 1970-01-01 00:00:00 UTC)
  • EXTRACTJSONFIELD 来展示数据源中嵌套的用户域中的一个字段,它看起来像:

    1. <span class="pun">{</span>
    2. <span class="str">"CreatedAt"</span><span class="pun">:</span><span class="lit">1506570308000</span><span class="pun">,</span>
    3. <span class="str">"Text"</span><span class="pun">:</span><span class="str">"RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6"</span><span class="pun">,</span>
    4. <span class="pun">[...]</span>
    5. <span class="str">"User"</span><span class="pun">:</span><span class="pun">{</span>
    6. <span class="str">"Id"</span><span class="pun">:</span><span class="lit">82564066</span><span class="pun">,</span>
    7. <span class="str">"Name"</span><span class="pun">:</span><span class="str">"Robin Moffatt \uD83C\uDF7B\uD83C\uDFC3\uD83E\uDD53"</span><span class="pun">,</span>
    8. <span class="str">"ScreenName"</span><span class="pun">:</span><span class="str">"rmoff"</span><span class="pun">,</span>
    9. <span class="pun">[...]</span>
  • 应用断言去展示内容,对 #(hashtag)使用模式匹配, 使用 LCASE 去强制小写字母。(LCTT 译注:hashtag 是twitter 中用来标注线索主题的标签)

关于支持的函数列表,请查看 KSQL 文档

我们可以创建一个从这个数据中得到的流:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> CREATE STREAM twitter AS \</span>
  2. <span class="pln">SELECT TIMESTAMPTOSTRING</span><span class="pun">(</span><span class="typ">CreatedAt</span><span class="pun">,</span><span class="str">'yyyy-MM-dd HH:mm:ss.SSS'</span><span class="pun">)</span><span class="pln"> AS </span><span class="typ">CreatedAt</span><span class="pun">,</span><span class="pln">\</span>
  3. <span class="pln">EXTRACTJSONFIELD</span><span class="pun">(</span><span class="pln">user</span><span class="pun">,</span><span class="str">'$.Name'</span><span class="pun">)</span><span class="pln"> AS user_Name</span><span class="pun">,</span><span class="pln">\</span>
  4. <span class="pln">EXTRACTJSONFIELD</span><span class="pun">(</span><span class="pln">user</span><span class="pun">,</span><span class="str">'$.ScreenName'</span><span class="pun">)</span><span class="pln"> AS user_ScreenName</span><span class="pun">,</span><span class="pln">\</span>
  5. <span class="pln">EXTRACTJSONFIELD</span><span class="pun">(</span><span class="pln">user</span><span class="pun">,</span><span class="str">'$.Location'</span><span class="pun">)</span><span class="pln"> AS user_Location</span><span class="pun">,</span><span class="pln">\</span>
  6. <span class="pln">EXTRACTJSONFIELD</span><span class="pun">(</span><span class="pln">user</span><span class="pun">,</span><span class="str">'$.Description'</span><span class="pun">)</span><span class="pln"> AS user_Description</span><span class="pun">,</span><span class="pln">\</span>
  7. <span class="typ">Text</span><span class="pun">,</span><span class="pln">hashtagentities</span><span class="pun">,</span><span class="pln">lang \</span>
  8. <span class="pln">FROM twitter_raw </span><span class="pun">;</span>
  9. <span class="typ">Message</span>
  10. <span class="pun">----------------------------</span>
  11. <span class="typ">Stream</span><span class="pln"> created </span><span class="kwd">and</span><span class="pln"> running </span>
  12. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> DESCRIBE twitter</span><span class="pun">;</span>
  13. <span class="typ">Field</span><span class="pun">|</span><span class="typ">Type</span>
  14. <span class="pun">------------------------------------</span>
  15. <span class="pln">ROWTIME </span><span class="pun">|</span><span class="pln"> BIGINT </span>
  16. <span class="pln">ROWKEY </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  17. <span class="pln">CREATEDAT </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  18. <span class="pln">USER_NAME </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  19. <span class="pln">USER_SCREENNAME </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  20. <span class="pln">USER_LOCATION </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  21. <span class="pln">USER_DESCRIPTION </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  22. <span class="pln">TEXT </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  23. <span class="pln">HASHTAGENTITIES </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  24. <span class="pln">LANG </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  25. <span class="pln">ksql</span><span class="pun">></span>

并且查询这个得到的流:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT CREATEDAT</span><span class="pun">,</span><span class="pln"> USER_NAME</span><span class="pun">,</span><span class="pln"> TEXT \</span>
  2. <span class="pln">FROM TWITTER \</span>
  3. <span class="pln">WHERE TEXT LIKE </span><span class="str">'%KSQL%'</span><span class="pun">;</span>
  4. <span class="lit">2017</span><span class="pun">-</span><span class="lit">10</span><span class="pun">-</span><span class="lit">03</span><span class="lit">23</span><span class="pun">:</span><span class="lit">39</span><span class="pun">:</span><span class="lit">37.000</span><span class="pun">|</span><span class="typ">Nicola</span><span class="typ">Ferraro</span><span class="pun">|</span><span class="pln"> RT </span><span class="lit">@flashdba</span><span class="pun">:</span><span class="typ">Again</span><span class="pun">,</span><span class="pln"> I</span><span class="str">'m really taken with the possibilities opened up by @confluentinc'</span><span class="pln">s KSQL engine </span><span class="com">#</span><span class="typ">Kafka</span><span class="pln"> https</span><span class="pun">:</span><span class="com">//t.co/aljnScgvvs</span>

 

聚合

在我们结束之前,让我们去看一下怎么去做一些聚合。

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT user_screenname</span><span class="pun">,</span><span class="pln"> COUNT</span><span class="pun">(*)</span><span class="pln"> \</span>
  2. <span class="pln">FROM twitter WINDOW TUMBLING </span><span class="pun">(</span><span class="pln">SIZE </span><span class="lit">1</span><span class="pln"> HOUR</span><span class="pun">)</span><span class="pln"> \</span>
  3. <span class="pln">GROUP BY user_screenname HAVING COUNT</span><span class="pun">(*)</span><span class="pun">></span><span class="lit">1</span><span class="pun">;</span>
  4. <span class="pln">Oracleace </span><span class="pun">|</span><span class="lit">2</span>
  5. <span class="pln">rojulman </span><span class="pun">|</span><span class="lit">2</span>
  6. <span class="pln">smokeinpublic </span><span class="pun">|</span><span class="lit">2</span>
  7. <span class="typ">ArtFlowMe</span><span class="pun">|</span><span class="lit">2</span>
  8. <span class="pun">[...]</span>

你将可能得到满屏幕的结果;这是因为 KSQL 在每次给定的时间窗口更新时实际发出聚合值。因为我们设置 KSQL 去读取在主题上的全部消息(SET 'auto.offset.reset' = 'earliest';),它是一次性读取这些所有的消息并计算聚合更新。这里有一个微妙之处值得去深入研究。我们的入站推文流正好就是一个流。但是,现有它不能创建聚合,我们实际上是创建了一个表。一个表是在给定时间点的给定键的值的一个快照。 KSQL 聚合数据基于消息的事件时间,并且如果它更新了,通过简单的相关窗口重申去操作后面到达的数据。困惑了吗? 我希望没有,但是,让我们看一下,如果我们可以用这个例子去说明。 我们将申明我们的聚合作为一个真实的表:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> CREATE TABLE user_tweet_count AS \</span>
  2. <span class="pln">SELECT user_screenname</span><span class="pun">,</span><span class="pln"> count</span><span class="pun">(*)</span><span class="pln"> AS tweet_count \</span>
  3. <span class="pln">FROM twitter WINDOW TUMBLING </span><span class="pun">(</span><span class="pln">SIZE </span><span class="lit">1</span><span class="pln"> HOUR</span><span class="pun">)</span><span class="pln"> \</span>
  4. <span class="pln">GROUP BY user_screenname </span><span class="pun">;</span>
  5. <span class="typ">Message</span>
  6. <span class="pun">---------------------------</span>
  7. <span class="typ">Table</span><span class="pln"> created </span><span class="kwd">and</span><span class="pln"> running</span>

看表中的列,这里除了我们要求的外,还有两个隐含列:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> DESCRIBE user_tweet_count</span><span class="pun">;</span>
  2. <span class="typ">Field</span><span class="pun">|</span><span class="typ">Type</span>
  3. <span class="pun">-----------------------------------</span>
  4. <span class="pln">ROWTIME </span><span class="pun">|</span><span class="pln"> BIGINT </span>
  5. <span class="pln">ROWKEY </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  6. <span class="pln">USER_SCREENNAME </span><span class="pun">|</span><span class="pln"> VARCHAR</span><span class="pun">(</span><span class="pln">STRING</span><span class="pun">)</span>
  7. <span class="pln">TWEET_COUNT </span><span class="pun">|</span><span class="pln"> BIGINT </span>
  8. <span class="pln">ksql</span><span class="pun">></span>

我们看一下这些是什么:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT TIMESTAMPTOSTRING</span><span class="pun">(</span><span class="pln">ROWTIME</span><span class="pun">,</span><span class="str">'yyyy-MM-dd HH:mm:ss.SSS'</span><span class="pun">)</span><span class="pun">,</span><span class="pln"> \</span>
  2. <span class="pln">ROWKEY</span><span class="pun">,</span><span class="pln"> USER_SCREENNAME</span><span class="pun">,</span><span class="pln"> TWEET_COUNT \</span>
  3. <span class="pln">FROM user_tweet_count \</span>
  4. <span class="pln">WHERE USER_SCREENNAME</span><span class="pun">=</span><span class="str">'rmoff'</span><span class="pun">;</span>
  5. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">11</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506708000000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">2</span>
  6. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">12</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506711600000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">4</span>
  7. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">28</span><span class="lit">22</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506661200000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">2</span>
  8. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">09</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506700800000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">4</span>
  9. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">15</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506722400000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">2</span>
  10. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">13</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">:</span><span class="typ">Window</span><span class="pun">{</span><span class="pln">start</span><span class="pun">=</span><span class="lit">1506715200000</span><span class="kwd">end</span><span class="pun">=-}</span><span class="pun">|</span><span class="pln"> rmoff </span><span class="pun">|</span><span class="lit">6</span>

ROWTIME 是窗口开始时间,  ROWKEY 是 GROUP BYUSER_SCREENNAME)加上窗口的组合。因此,我们可以通过创建另外一个衍生的表来整理一下:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \</span>
  2. <span class="pln">SELECT TIMESTAMPTOSTRING</span><span class="pun">(</span><span class="pln">ROWTIME</span><span class="pun">,</span><span class="str">'yyyy-MM-dd HH:mm:ss.SSS'</span><span class="pun">)</span><span class="pln"> AS WINDOW_START </span><span class="pun">,</span><span class="pln">\</span>
  3. <span class="pln">USER_SCREENNAME</span><span class="pun">,</span><span class="pln"> TWEET_COUNT \</span>
  4. <span class="pln">FROM user_tweet_count</span><span class="pun">;</span>
  5. <span class="typ">Message</span>
  6. <span class="pun">---------------------------</span>
  7. <span class="typ">Table</span><span class="pln"> created </span><span class="kwd">and</span><span class="pln"> running</span>

现在它更易于查询和查看我们感兴趣的数据:

  1. <span class="pln">ksql</span><span class="pun">></span><span class="pln"> SELECT WINDOW_START </span><span class="pun">,</span><span class="pln"> USER_SCREENNAME</span><span class="pun">,</span><span class="pln"> TWEET_COUNT \</span>
  2. <span class="pln">FROM USER_TWEET_COUNT_DISPLAY WHERE TWEET_COUNT</span><span class="pun">></span><span class="lit">20</span><span class="pun">;</span>
  3. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09</span><span class="pun">-</span><span class="lit">29</span><span class="lit">12</span><span class="pun">:</span><span class="lit">00</span><span class="pun">:</span><span class="lit">00.000</span><span class="pun">|</span><span class="typ">VikasAatOracle</span><span class="pun">|</span><span class="lit">22</span>
  4. <span class="lit">2017</span><span class="pun">-</span><span class="lit">09<

相关推荐