博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark streaming和kafka整合,保证数据exactly-once有且只被处理一次
阅读量:6241 次
发布时间:2019-06-22

本文共 1440 字,大约阅读时间需要 4 分钟。

  hot3.png

对于流式计算系统,我们都预期能够完全正确的处理每一条数据,即所有的数据不多也不少的处理每一条数据,为了达到这样的功能,我们还有很多额外的工作需要处理。

1.首先了解spark+kafka解决这个问题的来龙去脉。

为什么spark checkpoint解决不了?为什么前面kafka使用KafkaUtils.createStream创建Dstream,而后面升级了api,推荐使用新的KafkaUtils.createDirectStream。这些疑问都在下面两篇博客里可以找到答案:

2.使用新的KafkaUtils.createDirectStream,需要额外做什么?

先搞清楚createDirectStream,它重要的参数有2个,kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long]。先看fromOffsets,这个参数指topic的每个分区从哪里开始消费,fromOffsets和zookeeper没有什么关系,我们想让这个offset从哪里开始就从哪里开始,我们可以把这个offset存到文件,每次读取文件,也可以存到zookeeper,每次读zookeeper,存到zookeeper的好处就是监控kafka的那些工具会生效,如果存文件,则基于zookeeper的kafka监控工具就会失效。kafkaParams传递的是kafka的一些信息,包括kafka的brokers的链接,尤其要注意参数auto.offset.reset,默认是largest,就是从topic每个分区的最大处开始消费(它不会管zookeeper中的comsumer的消费信息),只有新进kafka的消息才会消费到,smallest则是从头开始消费所有的数据。

保证数据exactly-once有且只被处理一次,我们就只能利用fromOffsets,我们这里把offset存到zookeeper。大体思路就是每次createDirectStream的fromOffsets取得上一批次最后处理的编号,从这个编号开始处理当前批次,当前批次完全处理成功之后,把当前处理的最后编号存到zookeeper中(存到zookeeper之前,一定要判断当前批次是否都成功处理,成功处理了才能写入zookeeper,如果没有成功处理,则要删除这一批次的结果),供下一批次使用。

3.将offset存入zookeeper的方案

实现将offset写入zookeeper有2种方式,一种是使用spark-streaming-kafka的相关类,来实现写zookeeper,还一种是自己调用zookeeper的api,写入offset。

使用spark-streaming-kafka的相关类写入zookeeper,这种方式网上有很多例子,也进行了一些封装,有些思路很值得参考,文章地址:,。但是方案也是我一开始想要实现的方式,但是碰到了困难,一开始报类找不到,后面把类打包进去,又报类中的方法找不到,大概原因是其一是类是私有的private[spark],其二是kafka版本更新等等问题。所以后面直接采用zookeeper的基础api,把offset写入zookeeper。向zookeeper中写入kafka的api:

转载于:https://my.oschina.net/cjun/blog/743257

你可能感兴趣的文章
localStorage 和 sessionStorage 的用法
查看>>
day23-python操作数据库三
查看>>
第二次冲刺——第3天
查看>>
SpringMVC+Hibernate+Junit4+json基本框架近乎0配置
查看>>
Pro Android学习笔记(一三七):Home Screen Widgets(3):配置Activity
查看>>
Hadoop学习笔记(九)HDFS架构分析
查看>>
DB2数据库常用基本操作命令
查看>>
RHEL5.8安装Sybase 15.7_x86_64
查看>>
函数适配器bind2nd 、mem_fun_ref 源码分析、函数适配器应用举例
查看>>
武汉科技大学ACM :1002: A+B for Input-Output Practice (II)
查看>>
extjs中form.reset(true)出现的bug修复
查看>>
Some Android functions
查看>>
ORB-SLAM2学习4 initializer.h
查看>>
正向代理和反向代理
查看>>
1092 回文字符串(LCSL_DP)
查看>>
day01-Python介绍,安装,idea
查看>>
AX函数,将EXCEL列号转为列名
查看>>
UNDO -- Concept
查看>>
养生《一》
查看>>
es6的模块化--AMD/CMD/commonJS/ES6
查看>>