
SparkStreaming与Kafka整合遇到的问题及解决方案
2017-08-03 09:37:35最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志。

前言
最近工作中是做日志分析的平台,采用了sparkstreaming+kafka,采用kafka主要是看中了它对大数据量处理的高性能,处理日志类应用再好不过了,采用了sparkstreaming的流处理框架 主要是考虑到它本身是基于spark核心的,以后的批处理可以一站式服务,并且可以提供准实时服务到elasticsearch中,可以实现准实时定位系统日志。
实现
Spark-Streaming获取kafka数据的两种方式-Receiver与Direct的方式。
一. 基于Receiver方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的,然后Spark Streaming启动的job会去处理那些数据。代码如下:
刚开始的时候系统正常运行,没有发现问题,但是如果系统异常重新启动sparkstreaming程序后,发现程序会重复处理已经处理过的数据,这种基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。官方现在也已经不推荐这种整合方式,官网相关地址,下面我们使用官网推荐的第二种方式kafkaUtils的createDirectStream()方式。
二.基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的***的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
代码如下:
这种direct方式的优点如下:
1.简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
2.一次且仅一次的事务机制:基于receiver的方式,在spark和zk中通信,很有可能导致数据的不一致。

3.高效率:在receiver的情况下,如果要保证数据的不丢失,需要开启wal机制,这种方式下,为、数据实际上被复制了两份,一份在kafka自身的副本中,另外一份要复制到wal中, direct方式下是不需要副本的。
三.基于Direct方式丢失消息的问题
貌似这种方式很***,但是还是有问题的,当业务需要重启sparkstreaming程序的时候,业务日志依然会打入到kafka中,当job重启后只能从***的offset开始消费消息,造成重启过程中的消息丢失。kafka中的offset如下图(使用kafkaManager实时监控队列中的消息):
当停止业务日志的接受后,先重启spark程序,但是发现job并没有将先前打入到kafka中的数据消费掉。这是因为消息没有经过zk,topic的offset也就没有保存
四.解决消息丢失的处理方案
一般有两种方式处理这种问题,可以先spark streaming 保存offset,使用spark checkpoint机制,第二种是程序中自己实现保存offset逻辑,我比较喜欢第二种方式,以为这种方式可控,所有主动权都在自己手中。
先看下大体流程图,
基本使用这种方式就可以解决数据丢失的问题。
各位大哥大姐能不能帮我解决个问题啊??
你说的情况我也遇到过,可能是你QQ游戏内部分文件损坏造成的,你可以将原来的游戏大厅卸载了,在另一个分区重新安装游戏大厅(如:原来安装在C区,这次你可以安装在D区或E区),登陆后一切OK!
重装系统出现这个开不开机
技术教程精华区[系统] U盘启动时提示starting cmain,3种终极解决方案索菲亚DE智慧|08-18| 只看楼主U盘启动时提示“starting cmain”一般是这样子的:这种情况,一般是制作好了PE启动U盘之后,启动不了才会这样,一般正常情况的话,这一句英文是一闪而过直接进入PE菜单的。 到这里卡住了,说明:已经从U盘启动了,可是加载不了U盘的PE文件。 当然,不同的情况都会导致这样的问题,索菲亚在网上随便搜索了一下,基本上都是说“硬盘没有格式化”,这样的答案我就不能理解了,难道每次进一次PE还要格式一下硬盘?很明显这又是一大伪技术。 我总结了一下,这种问题不外乎三种解决方案:方法一:换USB插口这种方法最简单,很多情况下换一个插口就可以了,建议不要使用3.0插口。 方法二:修改BIOS如果你的本本买来的时候是自带win8系统的,那么可能需要在bios里简单设置一下开机,看见本本LOGO 的时候按F2进入bios,如图找到“BOOT”选项,不同的电脑位置会有所不同,默认的会是UEFI,改成legacy。 不改的话U盘启动会进不去如果以上两种方法都不行的话那么可以在其他电脑上尝试一下,如果都不行那么可以尝试下下面的方法:方法三:使用ZIP-FAT32模式 制作启动工具的时候,选择ZIP-FAT32模式 以老毛桃为例:默认是hdd-fat32的,那么点击下拉框,选择ZIP-FAT32模式 ,重新制作USB启动盘即可
三国志12安装时出现 “无法定位序数20于动态链接库HttpFile.dll上”,请问怎么解决
这个问题我是用虚拟光驱安装搞定的
发表评论