Connect创建用于处理实时数据的开源数据管道-如何使用Kafka (connect with)

教程大全 2025-07-18 16:39:53 浏览

如何使用Kafka Connect创建用于处理实时数据的开源数据管道?

译文2021-07-29 08:00:00本文介绍了如何使用完全开源的技术创建实时数据管道,这类开源技术包括 Kafka Connect、Apache Kafka和Kibana 等。

Kafka Connect是一种特别强大的开源数据流工具;有了它,将Kafka与其他数据技术结合使用非常轻松。作为一种分布式技术,Kafka Connect提供了特别高的可用性和独立于Kafka集群的弹性扩展。Kafka Connect使用源或sink连接件发送进出Kafka主题的数据,无需代码即可与多种非Kafka技术实现整合。

图1

可靠的开源Kafka连接件可供许多流行的数据技术使用,您还有机会编写自己的连接件。本文介绍了一个真实的实际数据用例,即如何使用Kafka Connect将来自Kafka的实时流数据与Elasticsearch(以启用索引Kafka记录的可扩展搜索)和Kibana(以便可视化那些结果)整合起来。

Connect创建用于处理实时数据的开源数据管道

图2

针对表明Kafka和Kafka Connect优点的一个用例,我受到CDC新冠疫情数据跟踪器的启发。基于Kafka的跟踪器从多个位置、以多种格式并使用多种协议收集实时新冠病毒检测数据,并将这些事件处理成易于使用的可视化结果。跟踪器还有必要的数据治理机制,以确保结果快速到达,并值得信任。

我开始寻找一个同样复杂且引人注目的用例——但理想情况下,不像新冠疫情那样令人担忧。最终,我发现了一个有趣的领域:月潮,包括公开可用的流REST API和采用简单Json格式的丰富数据。

月潮数据

潮汐遵循太阴日,这是一个24小时50分钟的周期;在此期间,地球完全自转到轨道卫星下方的同一点。每个太阴日有月球引力引起的两个高潮和两个低潮:

图3. 来自美国国家海洋和大气管理局

美国国家海洋和大气管理局(NOAA)提供了一个REST API,可以从全球潮汐站轻松获取详细的传感器数据。

图4

比如说,下列REST调用指定了潮汐站ID、数据类型(我选择了海平面)和数据(平均海平面),并请求一个采用公制单位的最近结果:

该调用返回JSON结果,含有潮汐站的经纬度、时间和水位值。请注意,您必须记住您调用的是什么,以便了解所返回结果的数据类型、数据和单位!

启动数据管道(使用REST源连接件)

要开始创建Kafka Connect流数据管道,我们必须先准备Kafka集群和Kafka Connect集群。

图5

接下来,我们引入一个REST连接件,比如这个可用的开源连接件。我们会将其部署到AWS S3存储桶(如果需要,参照这些说明)。 然后我们将要求Kafka Connect集群使用S3存储桶,对它同步以便在集群中可见,配置连接件,最后让它运行起来。这种“BYOC”(自带连接件)方法确保您有无数的方法来寻找满足特定要求的连接件。

图6

下列示例演示使用“curl”命令将完全开源的Kafka Connect部署环境配置成可使用REST API。请注意,您需要更改URL、名称和密码以匹配您自己的部署:

该代码创建的连接件任务以10分钟为间隔轮询REST API,并将结果写入到“tides-Topic”Kafka主题。通过随机选择五个潮汐传感器以这种方式收集数据,潮汐数据现在通过五个配置和五个连接件填充了潮汐主题。

图7

结束管道(使用Elasticsearch sink连接件)

为了将该潮汐数据放在某个地方,我们将在管道末端引入Elasticsearch集群和Kibana。 我们将配置一个开源Elasticsearch sink连接件,以便向Elasticsearch发送数据。

图8

以下示例配置使用sink名称、类、Elasticsearch索引和我们的Kafka主题。如果索引尚未存在,会创建一个有默认映射的索引。

该管道现在可运作起来。然而,由于默认索引映射,进入到Tides索引的所有潮汐数据是字符串。

图9

每次更改Elasticsearch索引映射时,通常都需要Elasticsearch“重新索引”(删除索引并重新索引所有数据)。数据既可以从现有的Kafka sink连接件重放,就像我们在这个用例中所做的那样,也可以使用Elasticsearch重新索引操作来获取。

使用Kibana可视化数据

为了可视化潮汐数据,我们先用Kibana创建一个索引模式,将“t”配置为时间过滤器字段。然后,我们将创建一个可视化,选择线图类型。最后,我们将配置图设置,以便y轴显示30分钟内的平均潮位,x 轴显示随时间变化的该数据。

结果是下图显示了五个样本潮汐站的潮汐变化,管道从这些潮汐站收集数据:

图10

结果

我们可以从可视化中清楚地看到潮汐的周期性,每个太阴日出现两次高潮。

图11

更令人惊讶的是,每个全球潮汐站的高潮和低潮之间的间隔不一样。这不仅受月球的影响,还受太阳、当地地理、天气和气候变化的影响。这个示例Kafka Connect管道利用Kafka、Elasticsearch和Kibana帮助演示可视化的优点:它们通常可以揭示原始数据无法揭示的信息!


求java增删改查例题!!

***********对人员表TUser的业务逻辑:package bean;import .*;import ;import ;import ;public class UserDAO{public UserDAO() {}/*更新数据,通过传递许更新的对象以及SQL语句,对数据进行添加、删除或更新操作*obj为实体类的对象,sql为SQL语句*/public boolean insertUser(UserBean ub){boolean b=false;Connection conn=null;PreparedStatement stmt=null;String sql=null;try{//插入一条记录sql=insert into TUser values(user_,?,?,?,?,?);//创建一个连接conn=();//创建PreparedStatement的对象stmt=(sql);//给SQL语句内的?赋值//(1,_ID());(1,_Number());(2,_Status());(3,_Level());(4,_ID());(5,_ID());//返回执行更新操作后受影响的行数int rst=();if(rst!=0){b=true;}//关闭相关的连接(stmt);(conn);}catch(SQLException se){();}catch(Exception e){();} return b;}/*修改一条数据*直接调用insertUser()方法*/ public boolean updateUser(UserBean ub){ boolean b=false;Connection conn=null;PreparedStatement stmt=null;String sql=null;try{//插入一条记录sql=update TUser set Mobile_Number=?,Roaming_Status=?,Com_Level=?,Customer_ID=?,Account_ID=? where User_ID=+_ID();//创建一个连接conn=();//创建PreparedStatement的对象stmt=(sql);//给SQL语句内的?赋值//(1,_ID());(1,_Number());(2,_Status());(3,_Level());(4,_ID());(5,_ID());//返回执行更新操作后受影响的行数int rst=();if(rst!=0){b=true;}//关闭相关的连接(stmt);(conn);}catch(SQLException se){();}catch(Exception e){();} return b; }/*删除一条记录*@返回boolean类型的标志,通过传递UserBean的实例对象,得到该对象的User_ID*/ public boolean deleteUser(UserBean ub){boolean b=false; //测试变量,无实在意义Connection conn=null;Statement stmt=null;String sql=null;int id=0;try{id=_ID();//删除记录为id的记录sql=delete from TUser where User_ID=+id;//创建一个连接conn=();//创建PreparedStatement的对象stmt=();//检测删除的对象是否存在if(getUser(id)!=null&&getUser(id)_ID()==id){//返回执行更新操作后受影响的行数int rst = (sql);if (rst > 0) {b = true;}}else{(此条记录不存在!);}//关闭相关的连接(stmt);(conn);//捕获异常}catch(SQLException se){();}catch(Exception e){();} return b;}/*查询数据/得到数据*str为数据库对应的列名,sql为传入的SQL语句*/public UserBean getUser(int user_id){Connection conn=null;Statement stmt=null;ResultSet rs=null;String sql=null;UserBean user=null;try{//从数据库中查询User_ID值为user_id的记录sql=select * from TUser where User_ID=+user_id;//创建一个连接conn=();//创建Statement的对象stmt=();//实例化UserBean的一个对象user =new UserBean();//执行操作,返回一个结果集rs=(sql);//从结果集中读取一条记录,并且将对应的属性值赋值给userif(()){ //((User_Address)); _ID((User_ID)); _Number((Mobile_Number)); _Status((Roaming_Status)); _Level((Com_Level)); _ID((Customer_ID)); _ID((Account_ID));//(_ID());}//关闭相关的连接(rs);(stmt);(conn);//捕获异常}catch(SQLException se){();}catch(Exception e){();}//返回user对象// (_ID());return user;}/*查询数据*str为数据库对应的列名,sql为传入的SQL语句*@返回一个数据列表*/ public ArrayList getUsers(){ Connection conn=null; Statement stmt=null; ResultSet rs=null; String sql=null; UserBean user=null; ArrayList list=null; try{ //从数据库中查询User_ID值为user_id的记录 sql=select * from TUser; //创建一个连接 conn=(); //创建Statement的对象 stmt=(); list=new ArrayList(); //执行操作,返回一个结果集 rs=(sql); //从结果集中循环读取记录,并且将对应的属性值赋值给user while(()){ //实例化UserBean的一个对象 user =new UserBean(); _ID((User_ID)); _Number((Mobile_Number)); _Status((Roaming_Status)); _Level((Com_Level)); _ID((Customer_ID)); _ID((Account_ID));//将user添加到list中(user); } //关闭相关的连接 (rs); (stmt); (conn); //捕获异常 }catch(SQLException se){ (); }catch(Exception e){ (); } //返回list对象 return list;}}********************与数据库的连接类DBUtil:package bean;import .*;public class DBUtil {static String serverName=localhost;//主机地址static String sDBDriver=;//oracle驱动static String dbInstance=cloud; //数据库的名称static String sConnStr=jdbc:oracle:thin:@+serverName+:1521:+dbInstance; //数据库的连接字符串static String dbUser=system;//数据库的登陆名static String userPwd=manager;//数据库的登陆密码/**得到一个Connection对象*@return */ public static Connection getConnection(){ Connection conn=null; try{ (sDBDriver); conn=(sConnStr,dbUser,userPwd); }catch(ClassNotFoundException e){ (); }catch(SQLException se){ (); } return conn; } //关闭指定的结果集rs public static void closeResultSet(ResultSet rs){ if(rs!=null){ try{ (); }catch(SQLException e){ (); } } } //关闭指定的Statement public static void closeStatement(Statement stmt){ if(stmt!=null){ try{ (); }catch(SQLException e){ (); } } } //关闭连接conn public static void closeConnection(Connection conn){ if(conn!=null){ try{ (); }catch(SQLException e){ (); } } }}

现在企业流行的java框架技术是什么,有什么不同点

我将简短分析被用于支持这些框架的企业开发环境或工具箱,例如Borland JBuilder,Eclipse以及BEA Workbench。 请记住,市场上有许多有关这些开发框架的图书;然而,在任何一篇文章中,要对它们进行深入描述是不可能的。 不过,我将尽力讨论最广泛地使用的概念。 1. 共同点 几乎所有现代的网络开发框架都遵循了模型-视图-控制(MVC)设计模式--商业逻辑和描述被分开,由一个逻辑流控制器来协调来自客户端的请求和服务器上将采取的行动。 这条途径成为了网络开发的事实上的标准。 每个框架的内在的机制当然是不同的,但是开发者们使用来设计和实现他们的Web应用软件的API是很类似的。 差别还存在于每个框架提供的扩展方面,例如标签库,JavaServer Faces或JavaBean包装器等。 所有的框架使用不同的技术来协调在Web应用程序之内的导航,例如XML配制文件,java属性文件或定制属性。 所有的框架在控制器模块实现的方法方面也存在明显的不同。 例如,EJB可能实例化在每个请求中需要的类或使用Java反射动态地调用一个适当的行动(Action)类。 另外,不同框架在各自引入的概念上也有所不同。 例如,一个框架可能定义用户请求和反应(以及错误)场所,而另外一个框架可能仅仅定义一个完整的流--从一个请求到多个响答和随后的再请求…… 各种Java框架在它们组织数据流的方法方面是很类似的。 在请求发出后,在应用程序服务器上产生一些行动;而作为响应,一些可能包含对象集的数据总是被发送到JSP层。 然后,从那些对象--可能是有setter和getter方法的简单类,javabeans,值对象,或者一些集合对象--中提取数据。 现代的Java框架还想方设法简化开发者的开发任务,如通过使用简易的API,数据库连接池,甚至数据库调用包等提供自动化的追踪方式来实现。 一些框架或者能够钩进(hooked into)另外的J2EE技术中,例如JMS(Java消息服务)或JMX,或把这些技术集成到一起。 服务器数据持续性和日志也有可能成为框架的一部分。 2. 企业开发环境 一些框架在Web开发者社区和企业发展领域变得相当流行。 随着这些框架的日渐成熟并开始发行稳定的版本,商业的IDE(集成发展环境)开始为这些框架提供支持并把他们纳入到自己的产品中。 一些IDE甚至基于框架的概念开发出整个的产品,例如,BEA WebLogic Workshop就是基于Struts框架建立起来的。 Borland Jbuilder为Struts提供了内建的支持,也支持JSF和JSTL。 Eclipse平台已成为一个很流行的开发工具,部分因为它是基于插件的,部分因为它对于Web框架的支持。 现在,出现了众多的Eclipse插件,甚至完整的基于Eclipse的IDE。 许多插件被设计适合于Struts框架开发,例如MyEclipse()或M7。 大多数IDE都具有图形化的流程和可视化对象(类代理)。 例如,下面是一个JBuilder的行动(Action)设计器,用于规划Web应用程序的页面顺序。 WebLogic Workshop引入Java页面流程技术,它扩展了Struts框架而提供了一个简化的开发模型并增加了另外一些特性。 Workshop使用页面流(Page Flows),实现轻易地把用户接口与导航和商业逻辑分离开来。 页面流由JSP页组成,这些页面包含用户接口元素和一个控制器文件(JPF)--它包含由用户提供的数据将怎样被处理的指令以及下一步什么页面将被返回到用户的信息。 页面流动提供给开发者一个可视化的Web应用程序总体轮廓,它让开发者能够看到直观地分析不同的JSP页彼此相关联,并实现Web应用程序整体结构的快速建立。 MyEclipse提供类似的特征,并带有更多吸引人的代价标签。 3. Apache Struts框架 Struts框架是一开源产品,基于模型-视图-控制器(MVC)设计范例来开发Web应用软件。 它使用并且扩展了Java Servlet API,最初由Craig McClanahan创建。 在2000年5月,它被捐赠到Apache Foundation。 Struts框架展示了一个强有力的定制标签库,平铺显示,表单检验和I18N(国际化)。 另外,Struts支持许多描述层,包括JSP,XML/XSLT,JavaServerFaces(JSF)和Velocity;还支持一些模型层,包括JavaBeans和EJB。 4. Spring框架 Spring框架是一个分层的Java/J2EE应用程序框架,基于Expert One-on-One J2EE设计和发行的代码。 Spring框架提供一种简单的开发技术,用于自动化处理工程中大量的属性文件和助理类。 Spring框架包括的主要特色有: 1 强有力的基于JavaBeans的配置管理,使用Inversion-of-Control(IoC)原则。 2 一个核心bean工厂,可用在任何环境,从applets到J2EE容器程序。 3 通用的抽象层适合于数据库事务管理,允许可插入的事务管理器,并且不需要处理低层次的问题就可容易地划分各事务的界限。 4 一个很有意义的异常处理的JDBC抽象层。 5 与Hibernate集成到一起,DAO实现支持以及事务策略。 5. Hibernate框架 Hibernate是一适合于Java语言的对象-关系映射(ORM)解决方案。 它也是开源软件,类似Struts,并且在LGPL保护下发布。 Hibernate被一群来自世界各地的Java软件开发者所共同开发。 它提供一个易用的框架来实现把一个面向对象的域模型映射到一传统的关系数据库。 它不仅负责从Java类到数据库表格(以及来自Java数据类型的SQL数据类型)的映射,而且还提供数据查询和检索能力,并能大大减少花在SQL和JDBC手工数据处理上的开发时间。 Hibernate的目标是减轻开发者的与大量普通的数据持续性相联系的编程任务。 Hibernate还能够适应开发进程,无论它是刚开始设计还是来自一现成的数据库。 Hibernate可以自动生成SQL,使开发者摆脱了手工处理结果集和进行对象转化的繁琐任务,并能使应用程序移植到所有的SQL数据库。 它还能提供透明的持续性,对持续性类的唯一的要求的是实现一个无参数的构造器。 这个框架典型地使用在JavaSwing应用软件、基于Servlet的Java应用软件和使用EJBsession beans的J2EE应用软件中。 6. 结论 本文概述了现代最流行的Java Web开发框架。 当然,还有更多框架尚未描述,开源和商业化的都有,例如WebWork(或Tapestry(而许多框架通过扩展另外的MVC框架在内部被成功开发。 当前,最流行的框架是Apache Struts。 当Web开发竞技场继续演变它的工具和编程方法时,Java应用程序框架也将继续成长下去。 Java Web开发框架的未来一片明亮!

team foundation 和 git 的区别

本文版权声明本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请联系本站客服,一经查实,本站将立刻删除。

发表评论

热门推荐