当前位置: 移动技术网 > IT编程>数据库>MongoDB > windows7下使用MongoDB实现仓储设计

windows7下使用MongoDB实现仓储设计

2017年12月01日  | 移动技术网IT编程  | 我要评论

简单的介绍一下,我使用mongodb的场景。

我们现在的物联网环境下,有部分数据,采样频率为2000条记录/分钟,这样下来一天24*60*2000=2880000约等于300万条数据,以后必然还会增加。之前数据库使用的是mssql,对于数据库的压力很大,同时又需要保证历史查询的响应速度,这种情况下,在单表中数据量大,同时存在读写操作。不得已采用mongodb来存储数据。如果使用mongodb,则至少需要三台机器,两台实现读写分离,一台作为仲裁(当然条件不允许也可以不用),每台机器的内存暂时配置在16g,公司小,没办法,据说,使用这个mongodb需要机器内存最少92g,我没有验证过,但是吃内存是公认的,所以内存绝对要保证,就算保证了,也不一定完全就没有意外发生。我们上面的这些特殊的数据是允许少量的丢失的,这些只是做分析使用的,几个月了,暂时还没出现数据丢失的情况,可能最新版本早就修复了吧,新手使用建议多看下官网上的说明。下面直接奔入主题:

一、安装部署和配置环境

1.安装部署mongo-server(v3.4)

参考

这个时候不要启动,接着配置config文件

2.配置config文件

dbpath=c:/program files/mongodb/server/3.4/bin/data/db 
logpath=c:/program files/mongodb/server/3.4/bin/data/log/master.log 
pidfilepath=c:/program files/mongodb/server/3.4/bin/master.pid 
directoryperdb=true 
logappend=true 
replset=testrs 
bind_ip=10.1.5.25
port=27016 
oplogsize=10000
noauth = true
 
storageengine = wiredtiger
wiredtigercachesizegb = 2
syncdelay = 30
wiredtigercollectionblockcompressor = snappy

以上是详细的配置参数,其中路径部分根据需要更改, 这里设置的oplogsize大小为10g,根据业务场景进行调整,另外auth权限为null,因为设置权限会增加服务开销,影响效率,最下面几行是内存引擎,可以控制副本集同步及内存限制,防止内存泄露。

3.启动mongo-server

4.添加副本集配置

conf=
{
  "_id" : "testrs",
  "members" : [
    { "_id" : 0, "host" : "10.1.5.25:27016" },
    { "_id" : 1, "host" : "10.1.5.26:27016" },
    { "_id" : 2, "host" : "10.1.5.27:27016" }
  ]
}

rs.initiate(conf)

此时副本集集群配置已经完成,然后在命令行中输入:rs.status(),查看副本集状态,需要查看同步情况,可以输入命令:db.serverstatus().

5.设置副本集可读写

rs.slaveok()

6..net操作mongo

连接设置,请参考个人封装unitoon.mongo代码所示。

7.性能对比

读写速度:redis>mongo>mssqlserver

可容纳数据量:mssqlserver~mongo>redis

存储数据类型:mongo>mssqlserver>redis

note:内存持续上升,内部没有内存回收机制,若限制内存 ,则可能出现查询速度变慢,数据丢失等问题,建议优化查询效率,建立索引

db.test.ensureindex({"username":1, "age":-1})

强制释放内存命令:db.runcommand({closealldatabases:1})

二、仓储设计

1.基类baseentity

namespace unitooniot.mongo
{
  /// <summary>
  /// 实体基类,方便生成objid
  /// </summary>
  [serializable]
  [protocontract(implicitfields = implicitfields.allpublic)]
  //[protoinclude(10, typeof(normalhistory))]
  public class baseentity
  {
    //[bsonrepresentation(bsontype.objectid)]
    public objectid id { get; set; }
 
    /// <summary>
    /// 数据库名称
    /// </summary>
    public string dbname { get; set; }
 
    /// <summary>
    /// 给对象初值
    /// </summary>
    public baseentity()
    {
      // this.objid = objectid.generatenewid().tostring();
      //this.id = objectid.newobjectid().tostring();
    }
  }
}


这里需要注意时间格式,mongodb默认时间格式为国际时间,所以在写入数据时和读取数据时,时间格式要一致,此例中没有对时间进行特殊处理,由传入的时间格式确定。

2.repository继承接口imongorepository

namespace unitooniot.mongo
{
  public interface imongorepository<tentity> where tentity : class
  {
  }
}


3.mongorepository

using mongodb.driver;
using mongodb.bson;
using system;
using system.collections.generic;
using system.linq;
using system.linq.expressions;
using system.text;
using system.threading.tasks;
using mongodb.bson.serialization.attributes;
using mongodb.driver.linq;
using system.configuration;
using system.io;
using unitooniot.appsetting;
 
namespace unitooniot.mongo
{
 
  public class mongodb
  {
    private static string connectionstringhost ;
    private static string username ;
    private static string password;
    private static imongodatabase _db = null;
    private static readonly object lockhelper = new object();
    /// <summary>
    /// mongodb初始化
    /// </summary>
    public static void init()
    {
      connectionstringhost = "10.1.5.24:27016,10.1.5.24:27016,10.1.5.26:27017";
      //appsettings.getconfigvalue("mongohost");//"10.1.5.24:27016";
      username = appsettings.getconfigvalue("mongousername");
      password = appsettings.getconfigvalue("mongopwd");
    }
    static mongodb()
    {
     
    }
    public static imongodatabase getdb(string dbname,string options=null)
    {
     
      if (_db != null) return _db;
      lock (lockhelper)
      {
 
        if (_db != null) return _db;
        var database = dbname;
        var username = username;
        var password = password;
        var authentication = string.empty;
        var host = string.empty;
        if (!string.isnullorwhitespace(username))
        {
          authentication = string.concat(username, ':', password, '@');
        }
        if (!string.isnullorempty(options) && !options.startswith("?"))
        {
          options = string.concat('?', options);
        }
 
 
 
        host = string.isnullorempty(connectionstringhost) ? "localhost" : connectionstringhost;
        database = database ?? "testdb";
        //mongodb://[username:password@]host1[:port1][,host2[:port2],…[,hostn[:portn]]][/[database][?options]]
 
        var constring = options!=null? $"mongodb://{authentication}{host}/{database}{options}"
          : $"mongodb://{authentication}{host}/{database}";
 
        var url = new mongourl(constring);
        var mcs = mongoclientsettings.fromurl(url);
        mcs.maxconnectionlifetime = timespan.frommilliseconds(1000);
        var client = new mongoclient(mcs);
               
        _db = client.getdatabase(url.databasename);
      }
      return _db;
    }
  }
  /// <summary>
  /// mongodb 数据库操作类
  /// </summary>
  public class mongorepository<t>: imongorepository<t> where t : baseentity
  {
    #region readonly field
    /// <summary>
    /// 表名
    /// </summary>
    private readonly imongocollection<t> _collection = null;
    /// <summary>
    /// 数据库对象
    /// </summary>
    private readonly imongodatabase _database;
    #endregion
 
    /// <summary>
    /// 构造函数
    /// </summary>
    public mongorepository()
    {
      this._database = mongodb.getdb(activator.createinstance<t>().dbname, "readpreference =secondarypreferred ");//primarypreferred/secondarypreferred/nearest
      _collection = _database.getcollection<t>(typeof(t).name);
    }
   
 
    #region 增加
    /// <summary>
    /// 插入对象
    /// </summary>
    /// <param name="t">插入的对象</param>
    public virtual t insert(t t)
    {
      // var flag = objectid.generatenewid();
      // t.gettype().getproperty("id").setvalue(t, flag);  
      //t.time = datetime.now;
 
      _collection.insertone(t);
      return t;
    }
    /// <summary>
    /// 批量插入
    /// </summary>
    /// <param name="ts">要插入的对象集合</param>
    public virtual ienumerable<t> insertbatch(ienumerable<t> ts)
    {
      _collection.insertmany(ts);
      return ts;
    }
 
    /// <summary>
    /// 插入对象
    /// </summary>
    /// <param name="t">插入的对象</param>
    public virtual void insertasync(t t)
    {
      //var flag = objectid.generatenewid();
      // t.gettype().getproperty("id").setvalue(t, flag);
      // t.time = datetime.now;
       _collection.insertoneasync(t);
    }
    /// <summary>
    /// 批量插入
    /// </summary>
    /// <param name="ts">要插入的对象集合</param>
    public virtual void insertbatchasync(ienumerable<t> ts)
    {
       _collection.insertmanyasync(ts);
    }
    #endregion
 
    #region 删除
    /// <summary>
    /// 删除
    /// </summary>
    /// <returns></returns>
    public virtual long delete(t t)
    {
      var filter = builders<t>.filter.eq("id", t.id);
      var result = _collection.deleteone(filter);
      return result.deletedcount;     
    }
    /// <summary>
    /// 删除
    /// </summary>
    /// <returns></returns>
    public virtual void deleteasync(t t)
    {
      var filter = builders<t>.filter.eq("id", t.id);
      _collection.deleteoneasync(filter);
    }
 
    /// <summary>
    /// 按条件表达式删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual long delete(expression<func<t, bool>> predicate)
    {
      var result = _collection.deleteone(predicate);
      return result.deletedcount;
    }
    /// <summary>
    /// 按条件表达式删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual void deleteasync(expression<func<t, bool>> predicate)
    {
      _collection.deleteoneasync(predicate);
    }
 
 
    /// <summary>
    /// 按条件表达式批量删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual long deletebatch(expression<func<t, bool>> predicate)
    {
      var result = _collection.deletemany(predicate);
      return result.deletedcount;
    }
    /// <summary>
    /// 按条件表达式批量删除
    /// </summary>
    /// <param name="predicate">条件表达式</param>
    /// <returns></returns>
    public virtual void deletebatchasync(expression<func<t, bool>> predicate)
    {
       _collection.deletemanyasync(predicate);
    }
 
    /// <summary>
    /// 按检索条件删除
    /// 建议用builders<t>构建复杂的查询条件
    /// </summary>
    /// <param name="filter">条件</param>
    /// <returns></returns>
    public virtual long delete(filterdefinition<t> filter)
    {
      var result = _collection.deleteone(filter);
      return result.deletedcount;
    }
 
    /// <summary>
    /// 按检索条件删除
    /// 建议用builders<t>构建复杂的查询条件
    /// </summary>
    /// <param name="filter">条件</param>
    /// <returns></returns>
    public virtual void deleteasync(filterdefinition<t> filter)
    {
       _collection.deleteoneasync(filter);
    }
    #endregion
 
    #region 修改
    /// <summary>
    /// 修改(id不变)
    /// </summary>  
    /// <returns></returns>
    public virtual long update(t t)
    {     
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id",t.id);
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
    /// <summary>
    /// 修改(id不变)
    /// </summary>  
    /// <returns></returns>
    public virtual void updateasync(t t)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", t.id);
       _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
       
    }
 
 
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(expression<func<t, bool>> filter, t t)
    {
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
 
 
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(filterdefinition<t> filter, t t)
    {
      var update = _collection.replaceone(filter, t, new updateoptions() { isupsert = true });
      return update.modifiedcount;
    }
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(expression<func<t, bool>> filter, t t)
    {
      _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
      
    }
    /// <summary>
    /// 用新对象替换新文档
    /// </summary>
    /// <param name="filter">查询条件</param>
    /// <param name="t">对象</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(filterdefinition<t> filter, t t)
    {
       _collection.replaceoneasync(filter, t, new updateoptions() { isupsert = true });
       
    }
    /// <summary>
    /// 根据id和条件文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="id">对象id</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(string id, updatedefinition<t> update)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据id和条件文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="id">对象id</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(string id, updatedefinition<t> update)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });    
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void update(updatedefinition<t> update,expression<func<t, bool>> filter)
    {
      _collection.updateone(filter, update, new updateoptions() { isupsert = true });
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long update(updatedefinition<t> update, filterdefinition<t> filter)
    {
      var result = _collection.updateone(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
      _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });
    }
    /// <summary>
    /// 根据条件修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updateasync(updatedefinition<t> update, filterdefinition<t> filter)
    {
       _collection.updateoneasync(filter, update, new updateoptions() { isupsert = true });
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long updatebatch(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
      var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual long updatebatch(updatedefinition<t> update, filterdefinition<t> filter)
    {
      var result = _collection.updatemany(filter, update, new updateoptions() { isupsert = true });
      return result.modifiedcount;
    }
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updatebatchasync(updatedefinition<t> update, expression<func<t, bool>> filter)
    {
       _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true });
    }
 
    /// <summary>
    /// 根据条件批量修改文档
    /// </summary>
    /// <param name="update">修改条件-形如:builders/<t/>.update.set(filed, value)</param>
    /// <param name="filter">查询条件builders/<t/>.filter.eq(filed, value)</param>
    /// <returns>修改影响文档数</returns>
    public virtual void updatebatchasync(updatedefinition<t> update, filterdefinition<t> filter)
    {
      _collection.updatemanyasync(filter, update, new updateoptions() { isupsert = true });
    }
    #endregion
 
    #region 查询 
 
    #region getcollection
 
    /// <summary>
    /// 获取操作对象的imongocollection集合,强类型对象集合
    /// </summary>
    /// <returns></returns>
    public virtual imongocollection<t> getcollection()
    {
      return _database.getcollection<t>(typeof(t).name);
    }
 
    #endregion
 
    #region getsingle
    /// <summary>
    /// 查询数据库,检查是否存在指定id的对象
    /// </summary>
    /// <param name="id">对象的id值</param>
    /// <returns>存在则返回指定的对象,否则返回null</returns>
    public virtual t getbyid(string id)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var data = _collection.find(filter).firstordefault();
      return data;
    }
    /// <summary>
    /// 查询数据库,检查是否存在指定id的对象
    /// </summary>
    /// <param name="id">对象的id值</param>
    /// <returns>存在则返回指定的对象,否则返回null</returns>
    public virtual async task<t> getasyncbyid(string id)
    {
      var filterbuilder = builders<t>.filter;
      var filter = filterbuilder.eq("id", new objectid(id));
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual t get(filterdefinition<t> filter)
    {
      return _collection.find(filter).firstordefault();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual t get(expression<func<t,bool>> filter)
    {
      return _collection.find(filter).firstordefault();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<t> getasync(filterdefinition<t> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
    /// <summary>
    /// 查询数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual async task<t> getasync(expression<func<t, bool>> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.singleordefaultasync();
    }
 
 
    #endregion
 
    #region getmany
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual ienumerable<t> getmany(filterdefinition<t> filter)
    {
      return _collection.find(filter).toenumerable();
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">条件表达式</param>
    /// <returns></returns>
    public virtual ienumerable<t> getmany(expression<func<t,bool>> filter)
    {
      //return _collection.asqueryable().where(filter).tolist();
      //return _collection.asqueryable().where(filter);
      return _collection.find(filter).toenumerable(); //.toenumerable(); 
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<ienumerable<t>> getmanyasync(filterdefinition<t> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.tolistasync();
    }
 
    /// <summary>
    /// 查询部分数据
    /// </summary>
    /// <param name="filter">过滤条件</param>
    /// <returns></returns>
    public virtual async task<ienumerable<t>> getmanyasync(expression<func<t, bool>> filter)
    {
      var data = await _collection.findasync(filter);
      return await data.tolistasync();
    }
     
    #endregion
 
    #region getall
 
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual ienumerable<t> getall()
    {
      var data = _collection.asqueryable();
      return data.toenumerable();
    }
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual async task<ienumerable<t>> getallasync()
    {
      var data = _collection.asqueryable();
      return await data.tolistasync();
    }
 
    /// <summary>
    /// 查询所有记录,复杂查询直接用linq处理(避免全表扫描)
    /// </summary>
    /// <returns>要查询的对象</returns>
    public virtual iqueryable<t> getallqueryable()
    {
      return _collection.asqueryable();
    }
 
    #endregion
 
    #region mapreduce
    /// <summary> 
    /// mapreduce
    /// </summary>   
    /// <returns>返回一个list列表数据</returns> 
    public ienumerable<t> getmap(bsonjavascript map,bsonjavascript reduce)
    {
      return _collection.mapreduce<t>(map,reduce).tolist();
    }
 
 
    #endregion
 
    #endregion
  }
 
}


好了,就介绍到这里。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网