如何使用Kafka-Connect创建用于处理实时数据的开源数据管道 (如何使用kafkaAdmin查看kafka有哪些用户)

教程大全 2025-07-12 20:17:47 浏览

如何使用Kafka Connect创建用于处理实时数据的开源数据管道?

译文2021-07-29 08:00:00本文介绍了如何使用完全开源的技术创建实时数据管道,这类开源技术包括 Kafka Connect、Apache Kafka和Kibana 等。

Kafka Connect是一种特别强大的开源数据流工具;有了它,将Kafka与其他数据技术结合使用非常轻松。作为一种分布式技术,Kafka Connect提供了特别高的可用性和独立于Kafka集群的弹性扩展。Kafka Connect使用源或sink连接件发送进出Kafka主题的数据,无需代码即可与多种非Kafka技术实现整合。

图1

可靠的开源Kafka连接件可供许多流行的数据技术使用,您还有机会编写自己的连接件。本文介绍了一个真实的实际数据用例,即如何使用Kafka Connect将来自Kafka的实时流数据与Elasticsearch(以启用索引Kafka记录的可扩展搜索)和Kibana(以便可视化那些结果)整合起来。

图2

针对表明Kafka和Kafka Connect优点的一个用例,我受到CDC新冠疫情数据跟踪器的启发。基于Kafka的跟踪器从多个位置、以多种格式并使用多种协议收集实时新冠病毒检测数据,并将这些事件处理成易于使用的可视化结果。跟踪器还有必要的数据治理机制,以确保结果快速到达,并值得信任。

我开始寻找一个同样复杂且引人注目的用例——但理想情况下,不像新冠疫情那样令人担忧。最终,我发现了一个有趣的领域:月潮,包括公开可用的流REST API和采用简单JSON格式的丰富数据。

月潮数据

潮汐遵循太阴日,这是一个24小时50分钟的周期;在此期间,地球完全自转到轨道卫星下方的同一点。每个太阴日有月球引力引起的两个高潮和两个低潮:

图3. 来自美国国家海洋和大气管理局

美国国家海洋和大气管理局(NOAA)提供了一个REST API,可以从全球潮汐站轻松获取详细的传感器数据。

图4

比如说,下列REST调用指定了潮汐站ID、数据类型(我选择了海平面)和数据(平均海平面),并请求一个采用公制单位的最近结果:

该调用返回JSON结果,含有潮汐站的经纬度、时间和水位值。请注意,您必须记住您调用的是什么,以便了解所返回结果的数据类型、数据和单位!

启动数据管道(使用REST源连接件)

要开始创建Kafka Connect流数据管道,我们必须先准备Kafka集群和Kafka Connect集群。

图5

接下来,我们引入一个REST连接件,比如这个可用的开源连接件。我们会将其部署到AWS S3存储桶(如果需要,参照这些说明)。 然后我们将要求Kafka Connect集群使用S3存储桶,对它同步以便在集群中可见,配置连接件,最后让它运行起来。这种“BYOC”(自带连接件)方法确保您有无数的方法来寻找满足特定要求的连接件。

图6

下列示例演示使用“curl”命令将完全开源的Kafka Connect部署环境配置成可使用REST API。请注意,您需要更改URL、名称和密码以匹配您自己的部署:

该代码创建的连接件任务以10分钟为间隔轮询REST API,并将结果写入到“tides-Topic”Kafka主题。通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五个配置和五个连接件填充了潮汐主题。

图7

结束管道(使用Elasticsearch sink连接件)

为了将该潮汐数据放在某个地方,我们将在管道末端引入Elasticsearch集群和Kibana。 我们将配置一个开源Elasticsearch sink连接件,以便向Elasticsearch发送数据。

图8

以下示例配置使用sink名称、类、Elasticsearch索引和我们的Kafka主题。如果索引尚未存在,会创建一个有默认映射的索引。

该管道现在可运作起来。然而,由于默认索引映射,进入到Tides索引的所有潮汐数据是字符串。

图9

每次更改Elasticsearch索引映射时,通常都需要Elasticsearch“重新索引”(删除索引并重新索引所有数据)。数据既可以从现有的Kafka sink连接件重放,就像我们在这个用例中所做的那样,也可以使用Elasticsearch重新索引操作来获取。

使用Kibana可视化数据

为了可视化潮汐数据,我们先用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择线图类型。最后,我们将配置图设置,以便y轴显示30分钟内的平均潮位,x 轴显示随时间变化的该数据。

结果是下图显示了五个样本潮汐站的潮汐变化,管道从这些潮汐站收集数据:

图10

结果

我们可以从可视化中清楚地看到潮汐的周期性,每个太阴日出现两次高潮。

图11

更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔不一样。这不仅受月球的影响,还受太阳、当地地理、天气和气候变化的影响。这个示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana帮助演示可视化的优点:它们通常可以揭示原始数据无法揭示的信息!

如何使用Kafka

shell判断文件是否存在

#[-e/tmp/]&&echoyes||echono-e判断对象(文件或目录)是否存在,存在为真祝你好运~~~

嵌入式实时系统,什么是嵌入式实时系统

嵌入式实时操作系统(Embedded Real-time Operation System,RTOS)。 嵌入式系统是“用于控制、监视或者辅助操作机器和设备的装置”,是一种以应用为中心、以计算机技术为基础、软件硬件可裁剪、功能、可靠性、成本、体积、功耗严格要求的专用计算机系统。 而Android是一种基于Linux的自由及开放源代码的操作系统,并不是嵌入式,主要使用于移动设备,如智能手机和平板电脑,由Google公司和开放手机联盟领导及开发。 尚未有统一中文名称,中国大陆地区较多人使用“安卓”或“安致”。 Android操作系统最初由Andy Rubin开发,主要支持手机。 2005年8月由Google收购注资。 2007年11月,Google与84家硬件制造商、软件开发商及电信营运商组建开放手机联盟共同研发改良Android系统。 随后Google以Apache开源许可证的授权方式,发布了Android的源代码

team foundation 和 git 的区别

本文版权声明本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请联系本站客服,一经查实,本站将立刻删除。

发表评论

热门推荐