在线电视网站,总裁梁子结大了,天海翼ol痴汉电车
以kafka偏移量维护到redis为例。
使用的数据结构为string
,其中key为topic:partition
,value为offset
。
例如bobo
这个topic
下有3个分区,则key-value结构如下:
bobo:0
的偏移量为xbobo:1
的偏移量为ybobo:2
的偏移量为z主要是如下两个方法:
createkafkastream()
创建kakfa流getoffsets()
从redis中获取offsets/** * kakfa参数 */ private val kafkaparams = map[string, object]( "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667", "key.deserializer" -> classof[stringdeserializer], "value.deserializer" -> classof[stringdeserializer], "group.id" -> "use_a_separate_group_id_for_each_stream", // 注意这里是none。 "auto.offset.reset" -> "none", "enable.auto.commit" -> (false: java.lang.boolean) ) // `bobo`topic下有3个分区 private val topicpartitions = map[string, int]("bobo" -> 3) // 从redis中获取offsets def getoffsets: map[topicpartition, long] = { val jedis = internalredisclient.getresource // 设置每个分区起始的offset val offsets = mutable.map[topicpartition, long]() topicpartitions.foreach { it => val topic = it._1 val partitions = it._2 // 遍历分区,设置每个topic下对应partition的offset for (partition <- 0 until partitions) { val topicpartitionkey = topic + ":" + partition var lastoffset = 0l val lastsavedoffset = jedis.get(topicpartitionkey) if (null != lastsavedoffset) { try { lastoffset = lastsavedoffset.tolong } catch { case e: exception => log.error("get lastsavedoffset error", e) system.exit(1) } } log.info("from redis topic: {}, partition: {}, lastoffset: {}", topic, partition, lastoffset) // 添加 offsets += (new topicpartition(topic, partition) -> lastoffset) } } internalredisclient.returnresource(jedis) offsets.tomap } /** * 创建kakfa流 * * @param ssc streamingcontext * @return inputdstream */ def createkafkastream(ssc: streamingcontext): inputdstream[consumerrecord[string, string]] = { val offsets = getoffsets // 创建kafka stream val stream = kafkautils.createdirectstream[string, string]( ssc, locationstrategies.preferconsistent, consumerstrategies.assign[string, string](offsets.keys.tolist, kafkaparams, offsets) ) stream }
其中:核心是通过consumerstrategies.assign
方法来指定topic
下对应partition
的offset
信息。
最后将offset信息维护到redis即可。
/** * 消费 * * @param stream inputdstream */ def consume(stream: inputdstream[consumerrecord[string, string]]): unit = { stream.foreachrdd { rdd => // 获取offset信息 val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges // 计算相关指标,这里就统计下条数了 val total = rdd.count() val jedis = internalredisclient.getresource val pipeline = jedis.pipelined() // 会阻塞redis pipeline.multi() // 更新相关指标 pipeline.incrby("totalrecords", total) // 更新offset offsetranges.foreach { offsetrange => log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetrange.topic, offsetrange.partition, offsetrange.untiloffset) val topicpartitionkey = offsetrange.topic + ":" + offsetrange.partition pipeline.set(topicpartitionkey, offsetrange.untiloffset + "") } // 执行,释放 pipeline.exec() pipeline.sync() pipeline.close() internalredisclient.returnresource(jedis) } }
顺便贴一下自己整理的spark相关的代码。
github地址:
主要包括:
如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复
浅析我对 String、StringBuilder、StringBuffer 的理解
使用IDEA搭建SSM框架的详细教程(spring + springMVC +MyBatis)
Springboot整合freemarker 404问题解决方案
引入mybatis-plus报 Invalid bound statement错误问题的解决方法
网友评论