如何使用Kafka Connect创建用于处理实时数据的开源数据管道?
译文2021-07-29 08:00:00本文介绍了如何使用完全开源的技术创建实时数据管道,这类开源技术包括 Kafka Connect、Apache Kafka和Kibana 等。
Kafka Connect是一种特别强大的开源数据流工具;有了它,将Kafka与其他数据技术结合使用非常轻松。作为一种分布式技术,Kafka Connect提供了特别高的可用性和独立于Kafka集群的弹性扩展。Kafka Connect使用源或sink连接件发送进出Kafka主题的数据,无需代码即可与多种非Kafka技术实现整合。
图1
可靠的开源Kafka连接件可供许多流行的数据技术使用,您还有机会编写自己的连接件。本文介绍了一个真实的实际数据用例,即如何使用Kafka Connect将来自Kafka的实时流数据与Elasticsearch(以启用索引Kafka记录的可扩展搜索)和Kibana(以便可视化那些结果)整合起来。
图2
针对表明Kafka和Kafka Connect优点的一个用例,我受到CDC新冠疫情数据跟踪器的启发。基于Kafka的跟踪器从多个位置、以多种格式并使用多种协议收集实时新冠病毒检测数据,并将这些事件处理成易于使用的可视化结果。跟踪器还有必要的数据治理机制,以确保结果快速到达,并值得信任。
我开始寻找一个同样复杂且引人注目的用例——但理想情况下,不像新冠疫情那样令人担忧。最终,我发现了一个有趣的领域:月潮,包括公开可用的流REST API和采用简单JSON格式的丰富数据。
月潮数据
潮汐遵循太阴日,这是一个24小时50分钟的周期;在此期间,地球完全自转到轨道卫星下方的同一点。每个太阴日有月球引力引起的两个高潮和两个低潮:
图3. 来自美国国家海洋和大气管理局
美国国家海洋和大气管理局(NOAA)提供了一个REST API,可以从全球潮汐站轻松获取详细的传感器数据。
图4
比如说,下列REST调用指定了潮汐站ID、数据类型(我选择了海平面)和数据(平均海平面),并请求一个采用公制单位的最近结果:
该调用返回JSON结果,含有潮汐站的经纬度、时间和水位值。请注意,您必须记住您调用的是什么,以便了解所返回结果的数据类型、数据和单位!
启动数据管道(使用REST源连接件)
要开始创建Kafka Connect流数据管道,我们必须先准备Kafka集群和Kafka Connect集群。
图5
接下来,我们引入一个REST连接件,比如这个可用的开源连接件。我们会将其部署到AWS S3存储桶(如果需要,参照这些说明)。 然后我们将要求Kafka Connect集群使用S3存储桶,对它同步以便在集群中可见,配置连接件,最后让它运行起来。这种“BYOC”(自带连接件)方法确保您有无数的方法来寻找满足特定要求的连接件。
图6
下列示例演示使用“curl”命令将完全开源的Kafka Connect部署环境配置成可使用REST API。请注意,您需要更改URL、名称和密码以匹配您自己的部署:
该代码创建的连接件任务以10分钟为间隔轮询REST API,并将结果写入到“tides-Topic”Kafka主题。通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五个配置和五个连接件填充了潮汐主题。
图7
结束管道(使用Elasticsearch sink连接件)
为了将该潮汐数据放在某个地方,我们将在管道末端引入Elasticsearch集群和Kibana。 我们将配置一个开源Elasticsearch sink连接件,以便向Elasticsearch发送数据。
图8
以下示例配置使用sink名称、类、Elasticsearch索引和我们的Kafka主题。如果索引尚未存在,会创建一个有默认映射的索引。
该管道现在可运作起来。然而,由于默认索引映射,进入到Tides索引的所有潮汐数据是字符串。
图9
每次更改Elasticsearch索引映射时,通常都需要Elasticsearch“重新索引”(删除索引并重新索引所有数据)。数据既可以从现有的Kafka sink连接件重放,就像我们在这个用例中所做的那样,也可以使用Elasticsearch重新索引操作来获取。
使用Kibana可视化数据
为了可视化潮汐数据,我们先用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择线图类型。最后,我们将配置图设置,以便y轴显示30分钟内的平均潮位,x 轴显示随时间变化的该数据。
结果是下图显示了五个样本潮汐站的潮汐变化,管道从这些潮汐站收集数据:
图10
结果
我们可以从可视化中清楚地看到潮汐的周期性,每个太阴日出现两次高潮。
图11
更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔不一样。这不仅受月球的影响,还受太阳、当地地理、天气和气候变化的影响。这个示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana帮助演示可视化的优点:它们通常可以揭示原始数据无法揭示的信息!

shell判断文件是否存在
#[-e/tmp/]&&echoyes||echono-e判断对象(文件或目录)是否存在,存在为真祝你好运~~~
嵌入式实时系统,什么是嵌入式实时系统
嵌入式实时操作系统(Embedded Real-time Operation System,RTOS)。 嵌入式系统是“用于控制、监视或者辅助操作机器和设备的装置”,是一种以应用为中心、以计算机技术为基础、软件硬件可裁剪、功能、可靠性、成本、体积、功耗严格要求的专用计算机系统。 而Android是一种基于Linux的自由及开放源代码的操作系统,并不是嵌入式,主要使用于移动设备,如智能手机和平板电脑,由Google公司和开放手机联盟领导及开发。 尚未有统一中文名称,中国大陆地区较多人使用“安卓”或“安致”。 Android操作系统最初由Andy Rubin开发,主要支持手机。 2005年8月由Google收购注资。 2007年11月,Google与84家硬件制造商、软件开发商及电信营运商组建开放手机联盟共同研发改良Android系统。 随后Google以Apache开源许可证的授权方式,发布了Android的源代码
发表评论