对于流式计算系统,我们都预期能够完全正确的处理每一条数据,即所有的数据不多也不少的处理每一条数据,为了达到这样的功能,我们还有很多额外的工作需要处理。
1.首先了解spark+kafka解决这个问题的来龙去脉。
为什么spark checkpoint解决不了?为什么前面kafka使用KafkaUtils.createStream创建Dstream,而后面升级了api,推荐使用新的KafkaUtils.createDirectStream。这些疑问都在下面两篇博客里可以找到答案:
2.使用新的KafkaUtils.createDirectStream,需要额外做什么?
先搞清楚createDirectStream,它重要的参数有2个,kafkaParams: Map[String, String], fromOffsets: Map[TopicAndPartition, Long]。先看fromOffsets,这个参数指topic的每个分区从哪里开始消费,fromOffsets和zookeeper没有什么关系,我们想让这个offset从哪里开始就从哪里开始,我们可以把这个offset存到文件,每次读取文件,也可以存到zookeeper,每次读zookeeper,存到zookeeper的好处就是监控kafka的那些工具会生效,如果存文件,则基于zookeeper的kafka监控工具就会失效。kafkaParams传递的是kafka的一些信息,包括kafka的brokers的链接,尤其要注意参数auto.offset.reset,默认是largest,就是从topic每个分区的最大处开始消费(它不会管zookeeper中的comsumer的消费信息),只有新进kafka的消息才会消费到,smallest则是从头开始消费所有的数据。
保证数据exactly-once有且只被处理一次,我们就只能利用fromOffsets,我们这里把offset存到zookeeper。大体思路就是每次createDirectStream的fromOffsets取得上一批次最后处理的编号,从这个编号开始处理当前批次,当前批次完全处理成功之后,把当前处理的最后编号存到zookeeper中(存到zookeeper之前,一定要判断当前批次是否都成功处理,成功处理了才能写入zookeeper,如果没有成功处理,则要删除这一批次的结果),供下一批次使用。
3.将offset存入zookeeper的方案
实现将offset写入zookeeper有2种方式,一种是使用spark-streaming-kafka的相关类,来实现写zookeeper,还一种是自己调用zookeeper的api,写入offset。
使用spark-streaming-kafka的相关类写入zookeeper,这种方式网上有很多例子,也进行了一些封装,有些思路很值得参考,文章地址:,。但是方案也是我一开始想要实现的方式,但是碰到了困难,一开始报类找不到,后面把类打包进去,又报类中的方法找不到,大概原因是其一是类是私有的private[spark],其二是kafka版本更新等等问题。所以后面直接采用zookeeper的基础api,把offset写入zookeeper。向zookeeper中写入kafka的api: