当前位置: 移动技术网 > IT编程>数据库>MongoDB > 基于Morphia实现MongoDB按小时、按天聚合操作方法

基于Morphia实现MongoDB按小时、按天聚合操作方法

2019年09月06日  | 移动技术网IT编程  | 我要评论

mongodb按照天数或小时聚合

需求

最近接到需求,需要对用户账户下的设备状态,分别按照天以及小时进行聚合,以此为基础绘制设备状态趋势图.
实现思路是启动定时任务,对各用户的设备状态数据分别按照小时以及天进行聚合,并存储进数据库中供用户后续查询.
涉及到的技术栈分别为:spring boot,mongodb,morphia.

数据模型

@data
@builder
@entity(value = "rawdevstatus", noclassnamestored = true)
// 设备状态索引
@indexes({
    // 设置数据超时时间(ttl,mongodb根据ttl在后台进行数据删除操作)
    @index(fields = @field("time"), options = @indexoptions(expireafterseconds = 3600 * 24 * 72)),
    @index(fields = {@field("userid"), @field(value = "time", type = indextype.desc)})
})
public class rawdevstatus {
  @id
  @jsonproperty(access = jsonproperty.access.write_only)
  private objectid objectid;
  private string userid;
  private instant time;
  @embedded("points")
  list<point> protocolpoints;
  @data
  @allargsconstructor
  public static class point {
    /**
     * 协议类型
     */
    private protocol protocol;
    /**
     * 设备总数
     */
    private integer total;
    /**
     * 设备在线数目
     */
    private integer onlinenum;
    /**
     * 处于启用状态设备数目
     */
    private integer enablenum;
  }
}

上述代码是设备状态实体类,其中设备状态数据是按照设备所属协议进行区分的.

@data
@builder
@entity(value = "aggregationdevstatus", noclassnamestored = true)
@indexes({
    @index(fields = @field("expireat"), options = @indexoptions(expireafterseconds = 0)),
    @index(fields = {@field("userid"), @field(value = "time", type = indextype.desc)})
})
public class aggregationdevstatus {
  @id
  @jsonproperty(access = jsonproperty.access.write_only)
  private objectid objectid;
  /**
   * 用户id
   */
  private string userid;
  /**
   * 设备总数
   */
  private double total;
  /**
   * 设备在线数目
   */
  private double onlinenum;
  /**
   * 处于启用状态设备数目
   */
  private double enablenum;
  /**
   * 聚合类型(按照小时还是按照天聚合)
   */
  @property("aggduration")
  private aggregationduration aggregationduration;
  private instant time;
  /**
   * 动态设置文档过期时间
   */
  private instant expireat;
}

上述代码是期待的聚合结果,其中构建两个索引:(1)超时索引;(2)复合索引,程序会根据用户名以及时间查询设备状态聚合结果.

聚合操作符介绍

聚合操作类似于管道,管道中的每一步操作产生的中间结果作为下一步的输入源,最终输出聚合结果.

此次聚合主要涉及以下操作:

•$project:指定输出文档中的字段.
•$unwind:拆分数据中的数组;
•match:选择要处理的文档数据;
•group:根据key分组聚合结果.

原始聚合语句

db.getcollection('raw_dev_status').aggregate([
  {$match:
    {
      time:{$gte: isodate("2019-06-27t00:00:00z")},
    }
  },
  {$unwind: "$points"},
  {$project:
    {
      userid:1,points:1,
      tmp: {$datetostring: { format: "%y:%m:%dt%h:00:00z", date: "$time" } }
    }
  },
  {$project:
    {
      userid:1,points:1,
      grouptime: {$datefromstring: { datestring: "$tmp", format: "%y:%m:%dt%h:%m:%sz", } }
    }
  },
  {$group:
    {
      _id:{user_id:'$userid', cal_time:'$grouptime'},
      devtotal:{'$avg':'$points.total'},
      onlinetotal:{'$avg':'$points.onlinenum'},
      enabletotal:{'$avg':'$points.enablenum'}
    }
  },
])

上述代码是按小时聚合数据,以下来逐步介绍处理思路:

(1) $match

根据小时聚合数据,因为只需要获取近24小时的聚合结果,所以对数据进行初步筛选.

(2) $unwind

raw_dev_status中的设备状态是按照协议区分的数组,因此需要对其进行展开,以便下一步进行筛选;

(3) $project

  {$project:
    {
      userid:1,points:1,
      tmp: {$datetostring: { format: "%y:%m:%dt%h:00:00z", date: "$time" } }
    }
  }

选择需要输出的数据,分别为:userid,points以及tmp.

需要注意,为了按照时间聚合,对$time属性进行操作,提取%y:%m:%dt%h时信息至$tmp作为下一步的聚合依据.

如果需要按天聚合,则format数据可修改为:%y:%m:%dt00:00:00z即可满足要求.

(4) $project

  {$project:
    {
      userid:1,points:1,
      grouptime: {$datefromstring: { datestring: "$tmp", format: "%y:%m:%dt%h:%m:%sz", } }
    }
  }

因为上一步project操作中,tmp为字符串数据,最终的聚合结果需要时间戳(主要懒,不想在程序中进行转换操作).
因此,此处对$tmp进行操作,转换为时间类型数据,即grouptime.

(5) $group

对聚合结果进行分类操作,并生成最终输出结果.

 {$group:
    {
      # 根据_id进行分组操作,依据是`user_id`以及`$grouptime`
      _id:{user_id:'$userid', cal_time:'$grouptime'},
      # 求设备总数平均值
      devtotal:{'$avg':'$points.total'},
      # 求设备在线数平均值
      onlinetotal:{'$avg':'$points.onlinenum'},
      # ...
      enabletotal:{'$avg':'$points.enablenum'}
    }
  }

代码编写

此处odm选择morphia,亦可以使用mongotemplate,原理类似.

 /**
   * 创建聚合条件
   *
   * @param pasttime   过去时间段
   * @param datetostring 格式化字符串(%y:%m:%dt%h:00:00z或%y:%m:%dt00:00:00z)
   * @return 聚合条件
   */
  private aggregationpipeline createaggregationpipeline(instant pasttime, string datetostring, string stringtodate) {
    query<rawdevstatus> query = datastore.createquery(rawdevstatus.class);
    return datastore.createaggregation(rawdevstatus.class)
        .match(query.field("time").greaterthanoreq(pasttime))
        .unwind("points", new unwindoptions().preservenullandemptyarrays(false))
        .match(query.field("points.protocol").equal("all"))
        .project(projection.projection("userid"),
            projection.projection("points"),
            projection.projection("converttime",
                projection.expression("$datetostring",
                    new basicdbobject("format", datetostring)
                        .append("date", "$time"))
            )
        )
        .project(projection.projection("userid"),
            projection.projection("points"),
            projection.projection("converttime",
                projection.expression("$datefromstring",
                    new basicdbobject("format", stringtodate)
                        .append("datestring", "$converttime"))
            )
        )
        .group(
            group.id(group.grouping("userid"), group.grouping("converttime")),
            group.grouping("total", group.average("points.total")),
            group.grouping("onlinenum", group.average("points.onlinenum")),
            group.grouping("enablenum", group.average("points.enablenum"))
        );
  }
  /**
   * 获取聚合结果
   *
   * @param pipeline 聚合条件
   * @return 聚合结果
   */
  private list<aggregationmiddevstatus> getaggregationresult(aggregationpipeline pipeline) {
    list<aggregationmiddevstatus> statuses = new arraylist<>();
    iterator<aggregationmiddevstatus> resultiterator = pipeline.aggregate(
        aggregationmiddevstatus.class, aggregationoptions.builder().allowdiskuse(true).build());
    while (resultiterator.hasnext()) {
      statuses.add(resultiterator.next());
    }
    return statuses;
  }
  //......................................................................................
  // 获取聚合结果(省略若干代码)
  aggregationpipeline pipeline = createaggregationpipeline(pasttime, datetostring, stringtodate);
  list<aggregationmiddevstatus> midstatuses = getaggregationresult(pipeline);
  if (collectionutils.isempty(midstatuses)) {
    log.warn("can not get dev status aggregation result.");
    return;
  }

总结

以上所述是小编给大家介绍的基于morphia实现mongodb按小时、按天聚合操作方法,希望对大家有所帮助

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网