SparkStreaming入门案例
1 | //wordcount案例 |
SparkStreaming概念
SparkStreaming是一个SparkCore API的一个扩展.底层基于SparkCore,是一个高吞吐可扩展可容错的分布式实时流式计算框架
原理:Driver端将任务划分微批次小任务(周期性)提交到spark计算引擎中执行,任务一直在运行,在sparkcore上对driver端的增强
Dstream可以从很多的数据源创建,对Dstream进行操作,本质上是对Dstream中某个时间点的RDD进行操作,对RDD进行操作,本质上是对每个RDD的分一个分区进行操作,Dstream上的方法也分为Transformation和Action,还有一些方法既不是Transformation也不是Action,比如foreachRDD,调用Transformation后可以生成一个新的Dstream
特点:可通错性 高吞吐 可扩展 丰富的API 可以保证精确一次性语义,但是SparkStreaming是一个微批次准实时的流式计算系统,由于在Driver端定期生成微批次的job,然后在调度到executor中,会有一定的延迟性
SparkStreaming 整合kafka
方案一:Receiver方式
类似于接自来水,但是如果自来水流速过快就会出现问题,为了保证数据安全性,要将数据写入磁盘记录日志中(write ahead log),效率低.使用kafka老版本的消费API,将偏移量写入到zookeeper中,效率低
方案二:直连kafka
建议使用kafka0.10及以上的版本
一个kafka Topic分区对应一个RDD的分区,即RDD的分区的数量和kafka的分区时一一对应的,生成的task直连kafka Topic的leader分区拉取数据,一个消费者task对应一个kafka的活跃分区
在Driver端调用foreachRDD获取Dstram中的kafkaRDD,传入的foreachRDD,传入的foreachRDD方法的函数在Driver端会周期性的执行.KafkaDstream必须先调用foreachRDD方法或者Tansform,才能获取kafkaRDD,只有kafkaRDD中才有偏移量.kafkaRDD实现了HasoffsetRanger接口,可以获取kafka分区的偏移量,一个批次处理完毕之后,偏移量可以写入到kafak Redis mysql中
自动提交偏移量
自动提交偏移量到主题 consumer_offsets 比较省事但是假设出现一种情况:消费者消费完数据之后挂掉了,偏移量信息没有来得及更 新,不太准确,会重复消费数据,无法保证数据的一致性
(去掉即可)
1 | "enable.auto.commit" -> (false: java.lang.Boolean) //不让消费者自动提交偏移量 |
1 | inputDStream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges) |
手动提交偏移量
官方提供的异步提交方法
一个队列实现的commitAsync 但是这会出现如下的问题,例如还没有运算完,就提交了偏移量或者数据处理失败了,但是偏移量却提交了
1 | //kafka集成 |
聚类运算Mysql优化版
1 | //获取历史的偏移量 |
1 | //mysql的事务回顾 |
1 | -- 数据表和偏移量表 |
1 | import org.apache.kafka.clients.consumer.ConsumerRecord |
聚类运算Redis版
1 | //Redis事务复习 |
1 | //连接池 |
1 | //获取历史偏移量 |
1 | /保证ExactlyOnce,精确一次性语义(要求数据处理且被处理一次) |
非聚类运算Hbase版
数据在kafka中已经有唯一的id,可作为hbase中的rowkey
每一行的最后一个数据记录偏移量,即可无需读取,查找以一个组内的最大的偏移量即可
但是hbase是不支持分组查询的
解决方案:
1.自定义协处理器
2.使用Phoenix来SQL查询
如果消费者多读了,但是hbase的id是唯一的,所以会把先前的数据给覆盖掉
1 | //每一次启动该程序,都要从Hbase查询历史偏移量 |
1 | object KafkaToHbase { |