当前位置: 移动技术网 > IT编程>网页制作>HTML > 使用sparkstreaming计算uv并存入redis集群

使用sparkstreaming计算uv并存入redis集群

2020年07月17日  | 移动技术网IT编程  | 我要评论
使用sparkstreaming计算uv并存入redis集群

首先这是我存入kafka的待分析数据(\t隔开):

192.168.101.2	-	-	200	3717	97	1594541195000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.2	-	-	200	3717	117	1594541196000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.2	-	-	200	3161	96	1594541386000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.222	-	-	200	19	127	1594541387000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.22	-	-	200	10	293	1594541399000	POST	/ibikeSeries/recharge	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.21	-	-	200	10	116	1594541400000	POST	/ibikeSeries/log/addPayLog	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.32	-	-	200	10	312	1594541401000	POST	/ibikeSeries/log/savelog	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.22	-	-	200	3711	481	1594541401000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.23	-	-	200	3715	172	1594541594000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.32	-	-	200	3419	154	1594541594000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari
192.168.101.23	-	-	200	3412	88	1594541596000	POST	/ibikeSeries/findNearAll	HTTP/1.1	https://servicewechat.com/wx7986c9c17e13557a/devtools/page-frame.html	Mac OS X (iPhone)	Mobile Safari

需求就是记录不同ip访问次数,相同的ip不累加,简单来说就是记录不同用户的访问量uv

kafka和redis集群的配置
application.conf

kafka.group.id = "ibike_streaming_analysis"
kafka.topic = "accesslog"
kafka.broker.list = "node1:9092,node2:9092,node3:9092"

# redis
# host做了本地域名映射
redis.host="node1,node2,node3"
redis.db.index=1

util类

package com.yc.ibike.analysis.util

import com.typesafe.config.{Config, ConfigFactory}
import org.apache.kafka.common.serialization.StringDeserializer

object ConfUtil {
  //解析配置文件
  private lazy val config: Config = ConfigFactory.load()
  val topic = config.getString("kafka.topic")
  val groupId: String = config.getString("kafka.group.id")
  val redisHost: String = config.getString("redis.host")
  val selectDBIndex = config.getInt("redis.db.index")
  val broker: String = config.getString("kafka.broker.list")

  import scala.collection.JavaConversions._

  val kafkaParams = Map[String, Object](
    "bootstrap.servers" -> broker,
    "key.deserializer" -> classOf[StringDeserializer],
    "value.deserializer" -> classOf[StringDeserializer],
    "group.id" -> groupId,
    "auto.offset.reset" -> "earliest",
    "enable.auto.commit" -> "true"
  )
}

package com.yc.ibike.analysis.util


import org.apache.commons.pool2.impl.GenericObjectPoolConfig
import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPool}

object RedisPoolUtil {

  private val poolConfig = new GenericObjectPoolConfig()
  poolConfig.setMaxIdle(5) //最大的空闲连接数,连接池中最大的空闲连接数,默认是8
  poolConfig.setMaxTotal(2000) //只支持最大的连接数,连接池中最大的连接数,默认是8

  //连接池是私有的不能对外公开访问
  private lazy val jedisPool = new JedisPool(poolConfig, ConfUtil.redisHost)   //单节点的redis使用

  //redis集群
  val hosts = ConfUtil.redisHost.split(",")
  val jedisClusterNodes = new java.util.HashSet[HostAndPort]()
  for (host <- hosts) {
    jedisClusterNodes.add(new HostAndPort(host, 6379))
  }
  private lazy val jedisCluster = new JedisCluster(jedisClusterNodes)

  def getJedis() = {
    //jedisCluster.select(ConfUtil.selectDBIndex)
    jedisCluster

    //以下是单机redis带联接池
    // val jedis = jedisPool.getResource
    //jedis.select(ConfUtil.selectDBIndex)
    //jedis
  }
}

scala代码

import com.yc.ibike.analysis.util.{ConfUtil, RedisPoolUtil}
import com.yc.ibike.analysis.util.RedisPoolUtil.getJedis
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{HashPartitioner, SparkConf}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import redis.clients.jedis.JedisCluster

object AccessLogAnalysis {
  /**
   * String: 聚合的key
   * Seq[Int]:当前批次阁下生批次该单词在每一个分区出现的次数
   * Option:初始值或累加的中间结果
   */
  val updateFunc = (iter: Iterator[(String, Seq[Int], Option[Int])]) => {   //   ("a",[1,1,1,1,1],  5   )
    //方案一:  当成元组元素来操作
    //iter.map(  t=>(t._1,t._2.sum+t._3.getOrElse(0)))
    iter.map { case (x, y, z) => (x, y.sum + z.getOrElse(0)) }
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR) //配置日志
    val conf = new SparkConf().setAppName("AccessLogAnalysis").setMaster("local[*]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //  ssc.cache()
    //状态要更新的话,要将中间结果保存下来,
    ssc.checkpoint("./chpoint") //   也可以是hdfs

    //    val kafkaParams = Map[String, Object](
    //      "bootstrap.servers" -> "node1:9092,node2:9092,node3:9092",
    //      "key.deserializer" -> classOf[StringDeserializer],
    //      "value.deserializer" -> classOf[StringDeserializer],
    //      "group.id" -> "accesslogAnalysis",
    //      "auto.offset.reset" -> "latest",
    //      "enable.auto.commit" -> (true: java.lang.Boolean)
    //    )
    val topics = Array(   ConfUtil.topic )
    val stream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      PreferConsistent,
      Subscribe[String, String](topics, ConfUtil.kafkaParams) //订阅一组主题,以获取消息
    )
    //需求2: 计算总的UV
    val line:DStream[String]=stream.map(record=>(record.value()))
    val words:DStream[String]=line.map( _.split("\t")(0))

    words.foreachRDD { rdd =>
      //统计人数
      rdd.foreachPartition { partition =>
        //从分区所属executor的redis线程池获取一个连接.
        val jedisCluster1 :JedisCluster = getJedis()
        partition.foreach { case ( words) =>
          //统计当前userId
          //这里也可以按照每天的访问情况分别统计
  	      //jedisCluster1.pfadd(s"accesslog_analysis_total_uv:$date", words)
          jedisCluster1.pfadd("accesslog_analysis_total_uv", words)
        }
        //jedisCluster1.close()
      }
    }

    //启动sparkstreaming程序
    ssc.start()
    //优雅退出
    ssc.awaitTermination()
  }
}

这里的重点是我计算uv使用的是HyperLogLog方案,而不是sql语句或者bitmap方案
因为若要计算很多页面的UV,用bitmap还是比较费空间的,N个页面就得有N个500M.这时候HyperLogLog结构就是一个比较好的选择.
HyperLogLog是一种基数统计算法,计算结果是近似值, 12 KB 内存就可以计算2^64 个不同元素的基数.但是,因为 HyperLogLog 只会根据输入元素来计算基数,而不会储存输入元素本身,所以 HyperLogLog 不能像集合那样,返回输入的各个元素。

这样最后在redis里查到的数据就是这个样子的:
在这里插入图片描述

参考资料:
用Spark Streaming实时计算海量用户UV.
SparkSQL 实现UV & PV计算.
spark状态stream统计uv(updateStateByKey).

本文地址:https://blog.csdn.net/weixin_43761767/article/details/107395520

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网