当前位置: 移动技术网 > IT编程>开发语言>Java > spark streaming中维护kafka偏移量到外部介质

spark streaming中维护kafka偏移量到外部介质

2019年04月04日  | 移动技术网IT编程  | 我要评论

在线电视网站,总裁梁子结大了,天海翼ol痴汉电车

spark streaming中维护kafka偏移量到外部介质

以kafka偏移量维护到redis为例。

redis存储格式

使用的数据结构为string,其中key为topic:partition,value为offset

例如bobo这个topic下有3个分区,则key-value结构如下:

  • bobo:0的偏移量为x
  • bobo:1的偏移量为y
  • bobo:2的偏移量为z

消费时指定offset

主要是如下两个方法:

  • createkafkastream()创建kakfa流
  • getoffsets()从redis中获取offsets
/**
  * kakfa参数
  */
private val kafkaparams = map[string, object](
  "bootstrap.servers" -> "crpprdap25:6667,crpprdap26:6667,crpprdap27:6667",
  "key.deserializer" -> classof[stringdeserializer],
  "value.deserializer" -> classof[stringdeserializer],
  "group.id" -> "use_a_separate_group_id_for_each_stream",
  // 注意这里是none。
  "auto.offset.reset" -> "none",
  "enable.auto.commit" -> (false: java.lang.boolean)
)

// `bobo`topic下有3个分区
private val topicpartitions = map[string, int]("bobo" -> 3)

// 从redis中获取offsets
def getoffsets: map[topicpartition, long] = {
  val jedis = internalredisclient.getresource

  // 设置每个分区起始的offset
  val offsets = mutable.map[topicpartition, long]()

  topicpartitions.foreach { it =>
    val topic = it._1
    val partitions = it._2
    // 遍历分区,设置每个topic下对应partition的offset
    for (partition <- 0 until partitions) {
      val topicpartitionkey = topic + ":" + partition
      var lastoffset = 0l
      val lastsavedoffset = jedis.get(topicpartitionkey)

      if (null != lastsavedoffset) {
        try {
          lastoffset = lastsavedoffset.tolong
        } catch {
          case e: exception =>
            log.error("get lastsavedoffset error", e)
            system.exit(1)
        }
      }
      log.info("from redis topic: {}, partition: {}, lastoffset: {}", topic, partition, lastoffset)

      // 添加
      offsets += (new topicpartition(topic, partition) -> lastoffset)
    }
  }

  internalredisclient.returnresource(jedis)

  offsets.tomap
}

/**
  * 创建kakfa流
  *
  * @param ssc streamingcontext
  * @return inputdstream
  */
def createkafkastream(ssc: streamingcontext): inputdstream[consumerrecord[string, string]] = {
  val offsets = getoffsets

  // 创建kafka stream
  val stream = kafkautils.createdirectstream[string, string](
    ssc,
    locationstrategies.preferconsistent,
    consumerstrategies.assign[string, string](offsets.keys.tolist, kafkaparams, offsets)
  )
  stream
}

其中:核心是通过consumerstrategies.assign方法来指定topic下对应partitionoffset信息。

更新offset到redis

最后将offset信息维护到redis即可。

/**
  * 消费
  *
  * @param stream inputdstream
  */
def consume(stream: inputdstream[consumerrecord[string, string]]): unit = {
  stream.foreachrdd { rdd =>
    // 获取offset信息
    val offsetranges = rdd.asinstanceof[hasoffsetranges].offsetranges

    // 计算相关指标,这里就统计下条数了
    val total = rdd.count()

    val jedis = internalredisclient.getresource
    val pipeline = jedis.pipelined()
    // 会阻塞redis
    pipeline.multi()

    // 更新相关指标
    pipeline.incrby("totalrecords", total)

    // 更新offset
    offsetranges.foreach { offsetrange =>
      log.info("save offsets, topic: {}, partition: {}, offset: {}", offsetrange.topic, offsetrange.partition, offsetrange.untiloffset)
      val topicpartitionkey = offsetrange.topic + ":" + offsetrange.partition
      pipeline.set(topicpartitionkey, offsetrange.untiloffset + "")
    }

    // 执行,释放
    pipeline.exec()
    pipeline.sync()
    pipeline.close()
    internalredisclient.returnresource(jedis)
  }
}

参考

spark代码

顺便贴一下自己整理的spark相关的代码。

github地址:

主要包括:

  • rdd的基本使用
  • sql
    • jdbc(读、写)
    • hive(读、写、动态分区)
  • streaming
    • 消费kafka(手动提交、手动维护offset)
    • 写入hbase
    • 写入hive

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网