当前位置: 移动技术网 > IT编程>开发语言>.net > EF Core 实现读写分离的最佳方案

EF Core 实现读写分离的最佳方案

2019年10月07日  | 移动技术网IT编程  | 我要评论

前言

公司之前使用ado.net和dapper进行数据访问层的操作, 进行读写分离也比较简单, 只要使用对应的数据库连接字符串即可. 而最近要迁移到新系统中,新系统使用.net core和ef core进行数据访问. 所以趁着国庆假期拿出一两天时间研究了一下如何ef core进行读写分离.

思路

根据园子里的jeffcky大神的博客, 参考
entityframework core进行读写分离最佳实践方式,了解一下(一)?
entityframework core进行读写分离最佳实践方式,了解一下(二)?

最简单的思路就是使用手动切换ef core上下文的连接, 即context.database.getdbconnection().connectionstring = "xxx", 但必须要先创建上下文, 再关闭之前的连接, 才能进行切换
另一种方式是通过监听diagnostic来将进行查询的sql切换到从库执行, 这种方式虽然可以实现无感知的切换操作, 但不能满足公司的业务需求. 在后台管理或其他对数据实时性要求比较高的项目里,查询操作也都应该走主库,而这种方式却会切换到从库去. 另一方面就是假若公司的库比较多,每种业务都对应了一个库, 每个库都对应了一种dbcontext, 这种情况下, 要实现自动切换就变得很复杂了.

上面的两种方式都是从切换数据库连接入手,但是频繁的切换数据库连接势必会对性能造成影响. 我认为最理想的方式是要避免数据库连接的切换, 且能够适应多dbcontext的情况, 在创建上下文实例时,就指定好是访问主库还是从库, 而不是在后期再进行数据库切换. 因此, 在上下文实例化时,就传入相应的数据库连接字符串, 这样一来dbcontext的创建就需要交由我们自己来进行, 就不是由di容器进行创建了. 同时仓储应该区分为只读和可读可写两种,以防止其他人对从库进行写操作.

实现

    public interface ireadonlyrepository<tentity, tkey>
        where tentity : class, ientity<tkey>
        where tkey : iequatable<tkey>
    {}

    public interface irepository<tentity, tkey> : ireadonlyrepository<tentity, tkey>
    where tentity : class, ientity<tkey>
    where tkey : iequatable<tkey>
    {}

ireadonlyrepository接口是只读仓储接口,提供查询相关方法,irepository接口是可读可写仓储接口,提供增删查改等方法, 接口的实现就那些东西这里就省略了.

    public interface irepositoryfactory
    {
        irepository<tentity, tkey> getrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;
         ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>(iunitofwork unitofwork)
                where tentity : class, ientity<tkey>
                where tkey : iequatable<tkey>;
    }
    public class repositoryfactory : irepositoryfactory
    {
        public repositoryfactory()
        {
        }

        public irepository<tentity, tkey> getrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return new repository<tentity, tkey>(unitofwork);
        }

        public ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>(iunitofwork unitofwork)
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return new readonlyrepository<tentity, tkey>(unitofwork);
        }
    }

repositoryfactory提供仓储对象的实例化

    public interface iunitofwork : idisposable
    {
        public dbcontext dbcontext { get; }

        /// <summary>
        /// 获取只读仓储对象
        /// </summary>
        ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;

        /// <summary>
        /// 获取仓储对象
        /// </summary>
        irepository<tentity, tkey> getrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>;
        int savechanges();
        task<int> savechangesasync(cancellationtoken canceltoken = default);
    }
    
    public class unitofwork : iunitofwork
    {
        private readonly iserviceprovider _serviceprovider;
        private readonly dbcontext _dbcontext;
        private readonly irepositoryfactory _repositoryfactory;
        private bool _disposed;

        public unitofwork(iserviceprovider serviceprovider, dbcontext context)
        {
            check.notnull(serviceprovider, nameof(serviceprovider));
            _serviceprovider = serviceprovider;
            _dbcontext = context;
            _repositoryfactory = serviceprovider.getrequiredservice<irepositoryfactory>();
        }
        public dbcontext dbcontext { get => _dbcontext; }
        public ireadonlyrepository<tentity, tkey> getreadonlyrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return _repositoryfactory.getreadonlyrepository<tentity, tkey>(this);
        }

        public irepository<tentity, tkey> getrepository<tentity, tkey>()
            where tentity : class, ientity<tkey>
            where tkey : iequatable<tkey>
        {
            return _repositoryfactory.getrepository<tentity, tkey>(this);
        }
        
        public void dispose()
        {
            if (_disposed)
            {
                return;
            }

            _dbcontext?.dispose();
            _disposed = true;
        }
        
        // 其他略
    }
    /// <summary>
    /// 数据库提供者接口
    /// </summary>
    public interface idbprovider : idisposable
    {
        /// <summary>
        /// 根据上下文类型及数据库名称获取unitofwork对象, dbname为null时默认为第一个数据库名称
        /// </summary>
        iunitofwork getunitofwork(type dbcontexttype, string dbname = null);
    }

idbprovider 接口, 根据上下文类型和配置文件中的数据库连接字符串名称创建iunitofwork, 在di中的生命周期是scoped,在销毁的同时会销毁数据库上下文对象, 下面是它的实现, 为了提高性能使用了expression来代替反射.

public class dbprovider : idbprovider
    {
        private readonly iserviceprovider _serviceprovider;
        private readonly concurrentdictionary<string, iunitofwork> _works = new concurrentdictionary<string, iunitofwork>();
        private static concurrentdictionary<type, func<iserviceprovider, dbcontextoptions, dbcontext>> _expressionfactorydict =
            new concurrentdictionary<type, func<iserviceprovider, dbcontextoptions, dbcontext>>();

        public dbprovider(iserviceprovider serviceprovider)
        {
            _serviceprovider = serviceprovider;
        }

        public iunitofwork getunitofwork(type dbcontexttype, string dbname = null)
        {
            var key = string.format("{0}${1}$", dbname, dbcontexttype.fullname);
            iunitofwork unitofwork;
            if (_works.trygetvalue(key, out unitofwork))
            {
                return unitofwork;
            }
            else
            {
                dbcontext dbcontext;
                var dbconnectionoptionsmap = _serviceprovider.getrequiredservice<ioptions<fxoptions>>().value.dbconnections;
                if (dbconnectionoptionsmap == null || dbconnectionoptionsmap.count <= 0)
                {
                    throw new exception("无法获取数据库配置");
                }

                dbconnectionoptions dbconnectionoptions = dbname == null ? dbconnectionoptionsmap.first().value : dbconnectionoptionsmap[dbname];

                var builderoptions = _serviceprovider.getservices<dbcontextoptionsbuilderoptions>()
                     ?.where(d => (d.dbname == null || d.dbname == dbname) && (d.dbcontexttype == null || d.dbcontexttype == dbcontexttype))
                     ?.orderbydescending(d => d.dbname)
                     ?.orderbydescending(d => d.dbcontexttype);
                if (builderoptions == null || !builderoptions.any())
                {
                    throw new exception("无法获取匹配的dbcontextoptionsbuilder");
                }

                var dbuser = _serviceprovider.getservices<idbcontextoptionsbuilderuser>()?.firstordefault(u => u.type == dbconnectionoptions.databasetype);
                if (dbuser == null)
                {
                    throw new exception($"无法解析类型为“{dbconnectionoptions.databasetype}”的 {typeof(idbcontextoptionsbuilderuser).fullname} 实例");
                }
                
                var dbcontextoptions = dbuser.use(builderoptions.first().builder, dbconnectionoptions.connectionstring).options;
                if (_expressionfactorydict.trygetvalue(dbcontexttype, out func<iserviceprovider, dbcontextoptions, dbcontext> factory))
                {
                    dbcontext = factory(_serviceprovider, dbcontextoptions);
                }
                else
                {
                    // 使用expression创建dbcontext
                    var constructormethod = dbcontexttype.getconstructors()
                        .where(c => c.ispublic && !c.isabstract && !c.isstatic)
                        .orderbydescending(c => c.getparameters().length)
                        .firstordefault();
                    if (constructormethod == null)
                    {
                        throw new exception("无法获取有效的上下文构造器");
                    }

                    var dbcontextoptionsbuildertype = typeof(dbcontextoptionsbuilder<>);
                    var dbcontextoptionstype = typeof(dbcontextoptions);
                    var dbcontextoptionsgenerictype = typeof(dbcontextoptions<>);
                    var serviceprovidertype = typeof(iserviceprovider);
                    var getservicemethod = serviceprovidertype.getmethod("getservice");
                    var lambdaparameterexpressions = new parameterexpression[2];
                    lambdaparameterexpressions[0] = (expression.parameter(serviceprovidertype, "serviceprovider"));
                    lambdaparameterexpressions[1] = (expression.parameter(dbcontextoptionstype, "dbcontextoptions"));
                    var paramtypes = constructormethod.getparameters();
                    var argumentexpressions = new expression[paramtypes.length];
                    for (int i = 0; i < paramtypes.length; i++)
                    {
                        var ptype = paramtypes[i];
                        if (ptype.parametertype == dbcontextoptionstype ||
                            (ptype.parametertype.isgenerictype && ptype.parametertype.getgenerictypedefinition() == dbcontextoptionsgenerictype))
                        {
                            argumentexpressions[i] = expression.convert(lambdaparameterexpressions[1], ptype.parametertype);
                        }
                        else if (ptype.parametertype == serviceprovidertype)
                        {
                            argumentexpressions[i] = lambdaparameterexpressions[0];
                        }
                        else
                        {
                            argumentexpressions[i] = expression.call(lambdaparameterexpressions[0], getservicemethod);
                        }
                    }

                    factory = expression
                        .lambda<func<iserviceprovider, dbcontextoptions, dbcontext>>(
                            expression.convert(expression.new(constructormethod, argumentexpressions), typeof(dbcontext)), lambdaparameterexpressions.asenumerable())
                        .compile();
                    _expressionfactorydict.tryadd(dbcontexttype, factory);

                    dbcontext = factory(_serviceprovider, dbcontextoptions);
                }

                var unitofworkfactory = _serviceprovider.getrequiredservice<iunitofworkfactory>();
                unitofwork = unitofworkfactory.getunitofwork(_serviceprovider, dbcontext);
                _works.tryadd(key, unitofwork);
                return unitofwork;
            }
        }

        public void dispose()
        {
            if (_works != null && _works.count > 0)
            {
                foreach (var unitofwork in _works.values)
                    unitofwork.dispose();
                _works.clear();
            }
        }
    }
    
    public static class dbproviderextensions
    {
        public static iunitofwork getunitofwork<tdbcontext>(this idbprovider provider, string dbname = null)
        {
            if (provider == null)
                return null;
            return provider.getunitofwork(typeof(tdbcontext), dbname);
        }
    }
    /// <summary>
    /// 业务系统配置选项
    /// </summary>
    public class fxoptions
    {
        public fxoptions()
        {
        }

        /// <summary>
        /// 默认数据库类型
        /// </summary>
        public databasetype defaultdatabasetype { get; set; } = databasetype.sqlserver;

        /// <summary>
        /// 数据库连接配置
        /// </summary>
        public idictionary<string, dbconnectionoptions> dbconnections { get; set; }

    }
    
    public class fxoptionssetup: iconfigureoptions<fxoptions>
    {
        private readonly iconfiguration _configuration;

        public fxoptionssetup(iconfiguration configuration)
        {
            _configuration = configuration;
        }

        /// <summary>
        /// 配置options各属性信息
        /// </summary>
        /// <param name="options"></param>
        public void configure(fxoptions options)
        {
            setdbconnectionsoptions(options);
            // ...
        }

        private void setdbconnectionsoptions(fxoptions options)
        {
            var dbconnectionmap = new dictionary<string, dbconnectionoptions>();
            options.dbconnections = dbconnectionmap;
            iconfiguration section = _configuration.getsection("fxcore:dbconnections");
            dictionary<string, dbconnectionoptions> dict = section.get<dictionary<string, dbconnectionoptions>>();
            if (dict == null || dict.count == 0)
            {
                string connectionstring = _configuration["connectionstrings:defaultdbcontext"];
                if (connectionstring == null)
                {
                    return;
                }
                dbconnectionmap.add("defaultdb", new dbconnectionoptions
                {
                    connectionstring = connectionstring,
                    databasetype = options.defaultdatabasetype
                });

                return;
            }

            var ambiguous = dict.keys.groupby(d => d).firstordefault(d => d.count() > 1);
            if (ambiguous != null)
            {
                throw new exception($"数据上下文配置中存在多个配置节点拥有同一个数据库连接名称,存在二义性:{ambiguous.first()}");
            }
            foreach (var db in dict)
            {
                dbconnectionmap.add(db.key, db.value);
            }
        }
    }
    
    /// <summary>
    /// dbcontextoptionsbuilder配置选项
    /// </summary>
    public class dbcontextoptionsbuilderoptions
    {
        /// <summary>
        /// 配置dbcontextoptionsbuilder, dbname指定数据库名称, 为null时表示所有数据库,默认为null
        /// </summary>
        /// <param name="build"></param>
        /// <param name="dbname"></param>
        /// <param name="dbcontexttype"></param>
        public dbcontextoptionsbuilderoptions(dbcontextoptionsbuilder build, string dbname = null, type dbcontexttype = null)
        {
            builder = build;
            dbname = dbname;
            dbcontexttype = dbcontexttype;
        }

        public dbcontextoptionsbuilder builder { get; }
        public string dbname { get; }
        public type dbcontexttype { get; }
    }

fxoptions是业务系统的配置选项(随便取得), 在通过service.getservice<ioptions>()时会调用iconfigureoptions完成fxoptions的初始化. dbcontextoptionsbuilderoptions用来提供dbcontextoptionsbuilder的相关配置

    public interface idbcontextoptionsbuilderuser
    {
        /// <summary>
        /// 获取 数据库类型名称,如 sqlserver,mysql,sqlite等
        /// </summary>
        databasetype type { get; }

        /// <summary>
        /// 使用数据库
        /// </summary>
        /// <param name="builder">创建器</param>
        /// <param name="connectionstring">连接字符串</param>
        /// <returns></returns>
        dbcontextoptionsbuilder use(dbcontextoptionsbuilder builder, string connectionstring);
    }
    
    public class sqlserverdbcontextoptionsbuilderuser : idbcontextoptionsbuilderuser
    {
        public databasetype type => databasetype.sqlserver;

        public dbcontextoptionsbuilder use(dbcontextoptionsbuilder builder, string connectionstring)
        {
            return builder.usesqlserver(connectionstring);
        }
    }

idbcontextoptionsbuilderuser接口用来适配不同的数据库来源

使用

{
    "fxcore": {
        "dbconnections": {
            "testdb": {
                "connectionstring": "xxx",
                "databasetype": "sqlserver"
            },
            "testdb_read": {
                "connectionstring": "xxx",
                "databasetype": "sqlserver"
            }
        }
    }
}
    class program
    {
        static void main(string[] args)
        {
            var config = new configurationbuilder()
                 .addjsonfile("appsettings.json")
                 .build();
            var services = new servicecollection()
                .addsingleton<iconfiguration>(config)
                .addoptions()
                .addsingleton<iconfigureoptions<fxoptions>, fxoptionssetup>()
                .addscoped<idbprovider, dbprovider>()
                .addsingleton<iunitofworkfactory, unitofworkfactory>()
                .addsingleton<irepositoryfactory, repositoryfactory>()
                .addsingleton<idbcontextoptionsbuilderuser, sqlserverdbcontextoptionsbuilderuser>()
                .addsingleton<dbcontextoptionsbuilderoptions>(new dbcontextoptionsbuilderoptions(new dbcontextoptionsbuilder<testdbcontext>(), null, typeof(testdbcontext)));

            var serviceprovider = services.buildserviceprovider();

            var dbprovider = serviceprovider.getrequiredservice<idbprovider>();
            var uow = dbprovider.getunitofwork<testdbcontext>("testdb"); // 访问主库

            var repodbtest = uow.getrepository<dbtest, int>();
            var obj = new dbtest { name = "123", date = datetime.now.date };
            repodbtest.insert(obj);
            uow.savechanges();
            
            console.readkey();
            
            var uow2 = dbprovider.getunitofwork<testdbcontext>("testdb_read");

             var uow2 = dbprovider.getunitofwork<testdbcontext>("testdb_read"); // 访问从库
            var repodbtest2 = uow2.getreadonlyrepository<dbtest, int>();
            var data2 = repodbtest2.getfirstordefault();
            console.writeline($"id: {data2.id} name: {data2.name}");
            console.readkey();
        }
    }

这里直接用控制台来做一个例子,中间多了一个console.readkey()是因为我本地没有配置主从模式,所以实际上我是先插入数据,然后复制到另一个数据库里,再进行读取的.

总结

本文给出的解决方案适用于系统中存在多个不同的上下文,能够适应复杂的业务场景.但对已有代码的侵入性比较大,不知道有没有更好的方案,欢迎一起探讨.

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网