SparkStreaming与Kafka整合遇到的问题及解决方案 (sparks是什么意思)

技术教程 2025-05-04 21:25:43 浏览
sparks是什么意思

SparkStreaming与Kafka整合遇到的问题及解决方案

2017-08-03 09:37:35最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticSearch中,可以实现准实时定位系统日志。

前言

最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志。

实现

Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式。

一. 基于Receiver方式

SparkStreaming与Kafka整合遇到的问题及

这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。代码如下:

刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。官方现在也已经不推荐这种整合方式,官网相关地址,下面我们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。

二.基于Direct的方式

这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的***的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

代码如下:

这种direct方式的优点如下:

1.简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。

2.一次且仅一次的事务机制:基于receiver的方式,在spark和zk中通信,很有可能导致数据的不一致。

3.高效率:在receiver的情况下,如果要保证数据的不丢失,需要开启wal机制,这种方式下,为、数据实际上被复制了两份,一份在kafka自身的副本中,另外一份要复制到wal中, direct方式下是不需要副本的。

三.基于Direct方式丢失消息的问题

貌似这种方式很***,但是还是有问题的,当业务需要重启sparkstreaming程序的时候,业务日志依然会打入到kafka中,当job重启后只能从***的offset开始消费消息,造成重启过程中的消息丢失。kafka中的offset如下图(使用kafkaManager实时监控队列中的消息):

当停止业务日志的接受后,先重启spark程序,但是发现job并没有将先前打入到kafka中的数据消费掉。这是因为消息没有经过zk,topic的offset也就没有保存

SparkStreaming与Kafka整合遇到的问题及

四.解决消息丢失的处理方案

一般有两种方式处理这种问题,可以先spark streaming 保存offset,使用spark checkpoint机制,第二种是程序中自己实现保存offset逻辑,我比较喜欢第二种方式,以为这种方式可控,所有主动权都在自己手中。

先看下大体流程图,

基本使用这种方式就可以解决数据丢失的问题。


【详细教程】Kafka应用场景、基础组件、架构探索

Kafka应用场景、基础组件、架构探索的详细教程如下:

一、Kafka应用场景

二、Kafka基础组件

三、Kafka架构探索

怎样利用 Spark Streaming 和 Hadoop 实现近实时的会话连接

Spark Streaming 是Apache Spark 中最有趣的组件之一。 你用Spark Streaming可以创建数据管道来用批量加载数据一样的API处理流式数据。 此外,Spark Steaming的“micro-batching”方式提供相当好的弹性来应对一些原因造成的任务失败。 在这篇文章中,我将通过网站的事件近实时回话的例子演示使你熟悉一些常见的和高级的Spark Streaming功能,然后加载活动有关的统计数据到Apache HBase,用不喜欢的BI用具来绘图分析。 (Sessionization指的是捕获的单一访问者的网站会话时间范围内所有点击流活动。 )你可以在这里找到了这个演示的代码。 像这样的系统对于了解访问者的行为(无论是人还是机器)是超级有用的。 通过一些额外的工作它也可以被设计成windowing模式来以异步方式检测可能的欺诈。 Spark Streaming 代码我们的例子中的main class是让我们来看看这段代码段(忽略1-59行,其中包含imports 和其他无聊的东西)。 60到112行:设置Spark Streaming 这些行是非常基本的,用来设置的Spark Streaming,同时可以选择从HDFS或socket接收数据流。 如果你在Spark Streaming方面是一个新手,我已经添加了一些详细的注释帮助理解代码。 (我不打算在这里详谈,因为仍然在样例代码里。 )//This is just creating a Spark Config object.I dont do much here but//add the app are tons of options to put into the Spark config,//but none are needed for this simple sparkConf = new SparkConf()(SessionizeData + args(0))(, )//These two lines will get us out SparkContext and our StreamingContext.//These objects have all the root functionality we need to get sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(10))//Here are are loading our HBase Configuration will have//all the information needed to connect to our HBase cluster.//There is nothing different here from when you normally interact with conf = ();(new Path(/etc/hbase/conf/));(new Path(/etc/hbase/conf/));//This is a HBaseContext is a nice abstraction that will hide//any complex HBase stuff from us so we can focus on our business case//HBaseContext is from the SparkOnHBase project which can be found at//hbaseContext = new HBaseContext(sc, conf);//This is create a reference to our root are like RDDs but//with the context of being in micro batch world.I set this to null now//because I later give the option of populating this data from HDFS or from//a is no reason this could not also be populated by Kafka,//Flume, MQ system, or anything else.I just focused on these because//there are the easiest to set lines: DStream[String] = null//Options for data be adding Kafka and Flume at some pointif (args(0)(socket)) {val host = args(FIXED_ARGS);val port = args(FIXED_ARGS + 1);println(host: + host)println(port: + (port))//Simple example of how you set up a receiver from a Socket Streamlines = (host, )} else if (args(0)(newFile)) {val directory = args(FIXED_ARGS)println(directory: + directory)//Simple example of how you set up a receiver from a HDFS folderlines = [LongWritable, Text, TextInputFormat](directory, (t: Path) => true, true)(_._)} else {throw new RuntimeException(bad input type)}114到124行: 字符串解析 这里是Spark Streaming的开始的地方. 请看下面四行::val ipKeyLines = [(String, (Long, Long, String))](eventRecord => {//Get the time and ip address out of the original eventval time = ((([) + 1, (])))()val ipAddress = (0, ( ))//We are return the time twice because we will use the first at the start time//and the second as the end time(ipAddress, (time, time, eventRecord))})上面第一命令是在DSTREAM对象“lines”上进行了map函数和,解析原始事件来分离出的IP地址,时间戳和事件的body。 对于那些Spark Streaming的新手,一个DSTREAM保存着要处理的一批记录。 这些记录由以前所定义的receiver对象填充,并且此map函数在这个micro-batch内产生另一个DSTREAM存储变换后的记录来进行额外的处理。 当看像上面的Spark Streaming示意图时,有一些事情要注意::每个micro-batch在到达构建StreamingContext时设定的那一秒时被销毁Receiver总是用被下一个micro-batch中的RDDS填充之前micro batch中老的RDDs将被清理丢弃126到135行:产生Sessions 现在,我们有从网络日志中获得的IP地址和时间,是时候建立sessions了。 下面的代码是通过micro-batch内的第一聚集事件建立session,然后在DSTREAM中reduce这些会话。 val latestSessionInfo = [(String, (Long, Long, Long))](a => {//transform to (ipAddress, (time, time, counter))(a._1, (a._2._1, a._2._2, 1))})((a, b) => {//transform to (ipAddress, (lowestStartTime, MaxFinishTime, sumOfCounter))((a._1, b._1), (a._2, b._2), a._3 + b._3)})(updateStatbyOfSessions)这里有一个关于records如何在micro-batch中被reduce的例子:在会话范围内的 micro-batch 内加入,我们可以用超酷的updateStateByKey功能(做join/reduce-like操作)下图说明了就DStreams而言,随着时间变化这个处理过程是怎样的。 现在,让我们深入到updateStatbyOfSessions函数,它被定义在文件的底部。 此代码(注意详细注释)含有大量的魔法,使sessionization发生在micro-batch的连续模式中。 /*** This function will be called for to union of keys in the Reduce DStream* with the active sessions from the last micro batch with the ipAddress* being the key** To goal is that this produces a stateful RDD that has all the active* we add new sessions and remove sessions that have timed* out and extend sessions that are still going*/def updateStatbyOfSessions(//(sessionStartTime, sessionFinishTime, countOfEvents)a: Seq[(Long, Long, Long)],//(sessionStartTime, sessionFinishTime, countOfEvents, isNewSession)b: Option[(Long, Long, Long, Boolean)]): Option[(Long, Long, Long, Boolean)] = {//This function will return a Optional value.//If we want to delete the value we can return a optional None.//This value contains four parts//(startTime, endTime, countOfEvents, isNewSession)var result: Option[(Long, Long, Long, Boolean)] = null// These if statements are saying if we didnt get a new event for//this sessions ip address for longer then the session//timeout + the batch time then it is safe to remove this key value//from the future Stateful DStreamif ( == 0) {if (() - ._2 > SESSION_TIMEOUT + ) {result = None} else {if (._4 == false) {result = b} else {result = Some((._1, ._2, ._3, false))}}}//Now because we used the reduce function before this function we are//only ever going to get at most one event in the (c => {if () {//If there was no value in the Stateful DStream then just add it//new, with a true for being a new sessionresult = Some((c._1, c._2, c._3, true))} else {if (c._1 - ._2 < SESSION_TIMEOUT) {//If the session from the stateful DStream has not timed out//then extend the sessionresult = Some(((c._1, ._1),//(c._2, ._2),//._3 + c._3,//newSumOfEventsfalse //This is not a new session))} else {//Otherwise remove the old session with a new oneresult = Some((c._1,//newStartTimec._2,//._3,//newSumOfEventstrue //new session))}}})result}}在这段代码做了很多事,而且通过很多方式,这是整个工作中最复杂的部分。 总之,它跟踪活动的会话,所以你知道你是继续现有的会话还是启动一个新的。 126到207行:计数和HBase 这部分做了大多数计数工作。 在这里有很多是重复的,让我们只看一个count的例子,然后一步步地我们把生成的同一个记录counts存储在HBase中。 val onlyActiveSessions = (t => () - t._2._2 < SESSION_TIMEOUT)…val newSessionCount = (t => {//is the session newer then that last micro batch//and is the boolean saying this is a new session true(() - t._2._2 > && t._2._4)})[HashMap[String, Long]](t => HashMap((NEW_SESSION_COUNTS, t)))总之,上面的代码是过滤除了活动的会话其他所有会话,对他们进行计数,并把该最终计记录到一个的HashMap实例中。 它使用HashMap作为容 器,所以在所有的count做完后,我们可以调用下面的reduce函数把他们都到一个单一的记录。 (我敢肯定有更好的方法来实现这一点,但这种方法工 作得很好。 )接下来,下面的代码处理所有的那些HashMap,并把他们所有的值在一个HashMap中。 val allCounts = (totalSessionCount)(totals)(totalEventsCount)(deadSessionsCount)(totalSessionEventCount)((a, b) => b ++ a)用HBaseContext来使Spark Streaming与HBase交互超级简单。 所有你需要做的就是用HashMap和函数将其转换为一个put对象提供给DSTREAM。 [HashMap[String, Long]](allCounts,//The input RDDhTableName,//The name of the table we want to put Too(t) => {//Here we are converting our input record into a put//The rowKey is C for Count and a backward counting time so the newest//count show up first in HBases sorted orderval put = new Put((C. + ( - ())))//We are iterating through the HashMap to make all the columns with their (kv => ((hFamily), (kv._1), (kv._)))put},false)现在,HBase的这些信息可以用Apache Hive table包起来,然后通过你喜欢的BI工具执行一个查询来获取像下面这样的图,它每次micro-batch会刷新。 209到215行:写入HDFS 最后的任务是把拥有事件数据的活动会话信息加入,然后把事件以会话的开始时间来持久化到HDFS。 //Persist to (onlyActiveSessions)(t => {//Session root start time | Event (new Date(t._2._2._1)) + t + t._2._1._3})(outputDir + /session, txt)

【Flink 精选】如何分析及处理反压?

反压(backpressure)是流式计算中十分常见的问题。 反压意味着数据管道中某个节点成为瓶颈,处理速率跟不上上游发送数据的速率,而需要对上游进行限速 。 由于实时计算应用通常使用消息队列来进行生产端和消费端的解耦,消费端数据源是pull-based的,所以 反压通常是从某个节点传导至数据源并降低数据源(比如 Kafka consumer)的摄入速率。 反压并不会直接影响作业的可用性,它表明作业处于亚健康的状态,有潜在的性能瓶颈并可能导致更大的数据处理延迟 。 通常来说,对于一些对延迟要求不高或者数据量较少的应用,反压的影响可能并不明显。 然而对于规模比较大的 Flink 作业,反压可能会导致严重的问题。 网络流控的实现: 动态反馈/自动反压 Flink 的数据交换有3种:① 同一个 Task 的数据交换 ,② 不同 Task 同 JVM 下的数据交换 ,③ 不同 Task 且不同 TaskManager 之间的交换 。 通过 算子链 operator chain 串联多个算子 ,主要作用是避免了 序列化 和 网络通信 的开销。 在 TaskA 中,算子输出的数据首先通过 record Writer 进行序列化,然后传递给 result Partition 。 接着,数据通过 local channel 传递给 TaskB 的 Input Gate,然后传递给 record reader 进行反序列。 与上述(2)的不同点是数据先传递给 netty ,通过 netty 把数据推送到远程端的 Task 。 1.5 版本之前是采用 TCP 流控机制,而没有采用feedback机制 。 发送端 Flink 有一层Network Buffer,底层用Netty通信即有一层Channel Buffer,最后Socket通信也有Buffer,同理接收端也有对应的3级 Buffer。 Flink (before V1.5)实质是利用 TCP 的流控机制来实现 feedback 。 TCP报文段首部有16位窗口字段,当接收方收到发送方的数据后,ACK响应报文中就将自身缓冲区的剩余大小设置到放入16位窗口字段 。 该窗口字段值是随网络传输的情况变化的,窗口越大,网络吞吐量越高。 参考:1. 【计算机网络】3.1 运输层 - TCP/UDP协议 2. Apache Flink 进阶教程(七):网络流控及反压剖析 例子:TCP 利用滑动窗口限制流量步骤1 :发送端将4,5,6发送,接收端也能接收全部数据。 步骤2 :consumer 消费了 2 ,接收端的窗口会向前滑动一格,即窗口空余1格。 接着向发送端发送ACK = 7、window = 1 。 步骤3:发送端将 7 发送后,接收端接收到 7 ,但是接收端的 consumer 故障不能消费数据。 这时候接收端向发送端发送ACK = 8、window = 0,由于这个时候 window = 0,发送端是不能发送任何数据,也就会使发送端的发送速度降为 0。 在 Flink 层面实现反压机制,通过 ResultPartition 和 InputGate 传输 feedback。 Storm 在每一个 Bolt 都会有一个监测反压的线程(Backpressure Thread),这个线程一但检测到 Bolt 里的接收队列(recv queue)出现了严重阻塞就会把这个情况写到 ZooKeeper 里,ZooKeeper 会一直被 Spout 监听,监听到有反压的情况就会停止发送 。 因此,通过这样的方式匹配上下游的发送接收速率。 组件 RateController 监听负责监听“OnBatchCompleted”事件,然后从中抽取processingDelay 及schedulingDelay信息。 RateEstimator 依据这些信息估算出最大处理速度(rate),最后由基于Receiver的Input Stream将 rate 转发给 Executor 的 BlockGenerator,并更新RateLimiter 。 Flink、Storm、Spark Streaming 的反压机制都采用动态反馈/自动反压原理,可以动态反映节点限流情况,进而实现自动的动态反压。 Flink Web UI 的反压监控提供了Subtask 级别 的反压监控。 监控的原理是 通过() 采集在 TaskManager 上正在运行的所有线程,收集在缓冲区请求中阻塞的线程数(意味着下游阻塞),并计算缓冲区阻塞线程数与总线程数的比值 rate 。 其中,rate < 0.1 为 OK,0.1 <= rate <= 0.5 为 LOW,rate > 0.5 为 HIGH。 Network 和 task I/O metrics 是轻量级反压监视器,用于正在持续运行的作业,其中一下几个 metrics 是最有用的反压指标。 采用 Metrics 分析反压的思路: 如果一个 Subtask 的发送端 Buffer 占用率很高,则表明它被下游反压限速了;如果一个 Subtask 的接受端 Buffer 占用很高,则表明它将反压传导至上游 。 下表把inPoolUsage 分为floatingBuffersUsage 和 exclusiveBuffersUsage ,并且总结上游 TaskoutPoolUsage 与floatingBuffersUsage 、 exclusiveBuffersUsage 的关系,进一步的分析一个 Subtask 和其上游 Subtask 的反压情况。 上述主要通过 TaskThread 定位反压,而分析反压原因 类似一个普通程序的性能瓶颈 。 通过 Web UI 各个 SubTask 的 Records Sent 和 Record Received 来确认 ,另外 Checkpoint detail 里不同 SubTask 的 State size 也是一个分析数据倾斜的有用指标。 解决方式把数据分组的 key 进行本地/预聚合来消除/减少数据倾斜。 对TaskManager 进行 CPU profile ,分析 TaskThread 是否跑满一个 CPU 核:如果没有跑满,需要分析 CPU 主要花费在哪些函数里面,比如生产环境中偶尔会卡在 Regex 的用户函数(ReDoS);如果没有跑满,需要看 Task Thread 阻塞在哪里,可能是 用户函数本身有些同步的调用 ,可能是checkpoint 或者 GC 等系统活动 。 TaskManager JVM 各区内存不合理导致的频繁 Full GC甚至失联。 可以加上-XX:+PrintGCDetails 来打印 GC 日志的方式来观察 GC 的问题。 推荐TaskManager 启用 G1 垃圾回收器来优化 GC。

本文版权声明本文内容由互联网用户自发贡献,该文观点仅代表作者本人。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,请联系本站客服,一经查实,本站将立刻删除。

发表评论

热门推荐