自己以前都走了弯路,以为学习战术设计就会ddd了,其实ddd的精华在战略设计,但是对于我们菜鸟来说,学习一些技术概念也是挺好的
经常看到这些术语,概念太多,也想简单学习一下,记忆力比较差记录一下实现的细节
/// <summary> /// 存储聚合根中的事件到eventstorage 发布事件 /// </summary> /// <typeparam name="taggregationroot"></typeparam> /// <param name="event"></param> /// <returns></returns> public async task appendeventstoragepublisheventasync<taggregationroot>(taggregationroot @event) where taggregationroot : iaggregationroot { var domaineventlist = @event.uncommittedevents.tolist(); if (domaineventlist.count == 0) { throw new exception("请添加事件!"); } await tryappendeventstorageasync(domaineventlist).continuewith(async e => { if (e.result == (int)eventstoragestatus.success) { await trypublishdomaineventasync(domaineventlist).configureawait(false); @event.clearevents(); } }); } /// <summary> /// 发布领域事件 /// </summary> /// <returns></returns> public async task publishdomaineventasync(list<idomainevent> domaineventlist) { using (var connection = new sqlconnection(connectionstr)) { if (connection.state == connectionstate.closed) { await connection.openasync().configureawait(false); } using (var transaction = await connection.begintransactionasync().configureawait(false)) { try { if (domaineventlist.count > 0) { foreach (var domainevent in domaineventlist) { await _cappublisher.publishasync(domainevent.getroutingkey(), domainevent).configureawait(false); } } await transaction.commitasync().configureawait(false); } catch (exception e) { await transaction.rollbackasync().configureawait(false); throw; } } } } /// <summary> /// 发布领域事件重试 /// </summary> /// <param name="domaineventlist"></param> /// <returns></returns> public async task trypublishdomaineventasync(list<idomainevent> domaineventlist) { var policy = policy.handle<socketexception>().or<ioexception>().or<exception>() .retryforeverasync(onretry: exception => { task.factory.startnew(() => { //记录重试的信息 _loggerhelper.loginfo("发布领域事件异常", exception.message); }); }); await policy.executeasync(async () => { await publishdomaineventasync(domaineventlist).configureawait(false); }); } /// <summary> /// 存储聚合根中的事件到eventstorage中 /// </summary> /// <returns></returns> public async task<int> appendeventstorageasync(list<idomainevent> domaineventlist) { if (domaineventlist.count == 0) { throw new exception("请添加事件!"); } var status = (int)eventstoragestatus.failure; using (var connection = new sqlconnection(connectionstr)) { try { if (connection.state == connectionstate.closed) { await connection.openasync().configureawait(false); } using (var transaction = await connection.begintransactionasync().configureawait(false)) { try { if (domaineventlist.count > 0) { foreach (var domainevent in domaineventlist) { eventstorage eventstorage = new eventstorage { id = guid.newguid(), aggregaterootid = domainevent.aggregaterootid, aggregateroottype = domainevent.aggregateroottype, createdatetime = domainevent.createdatetime, version = domainevent.version, eventdata = events(domainevent) }; var eventstoragesql = $"insert into eventstorageinfo(id,aggregaterootid,aggregateroottype,createdatetime,version,eventdata) values (@id,@aggregaterootid,@aggregateroottype,@createdatetime,@version,@eventdata)"; await connection.executeasync(eventstoragesql, eventstorage, transaction).configureawait(false); } } await transaction.commitasync().configureawait(false); status = (int)eventstoragestatus.success; } catch (exception e) { await transaction.rollbackasync().configureawait(false); throw; } } } catch (exception e) { connection.close(); throw; } } return status; } /// <summary> /// appendeventstorageasync异常重试 /// </summary> public async task<int> tryappendeventstorageasync(list<idomainevent> domaineventlist) { var policy = policy.handle<socketexception>().or<ioexception>().or<exception>() .retryforeverasync(onretry: exception => { task.factory.startnew(() => { //记录重试的信息 _loggerhelper.loginfo("存储事件异常", exception.message); }); }); var result = await policy.executeasync(async () => { var resulted = await appendeventstorageasync(domaineventlist).configureawait(false); return resulted; }); return result; } /// <summary> /// 根据domainevent序列化事件json /// </summary> /// <param name="domainevent"></param> /// <returns></returns> public string events(idomainevent domainevent) { concurrentdictionary<string, string> dictionary = new concurrentdictionary<string, string>(); //获取领域事件的类型(方便解析json) var domaineventtypename = domainevent.gettype().name; var domaineventstr = jsonconvert.serializeobject(domainevent); dictionary.getoradd(domaineventtypename, domaineventstr); var eventdata = jsonconvert.serializeobject(dictionary); return eventdata; }
解析eventstorage中存储的事件
public async task<list<idomainevent>> getaggregaterooteventstoragebyid(guid aggregaterootid) { try { using (var connection = new sqlconnection(connectionstr)) { var eventstoragelist = await connection.queryasync<eventstorage>($"select * from dbo.eventstorageinfo where aggregaterootid='{aggregaterootid}'"); list<idomainevent> domaineventlist = new list<idomainevent>(); foreach (var item in eventstoragelist) { var dictionarydomainevent = jsonconvert.deserializeobject<dictionary<string, string>>(item.eventdata); foreach (var entry in dictionarydomainevent) { var domaineventtype = typenameprovider.gettype(entry.key); if (domaineventtype != null) { var domainevent = jsonconvert.deserializeobject(entry.value, domaineventtype) as idomainevent; domaineventlist.add(domainevent); } } } return domaineventlist; } } catch (exception ex) { throw; }
1.事件没持久化就代表事件还没发生成功,事件存储可能失败,必须先存储事件,在发布事件,保证存储事件与发布事件一致性
1.使用事件驱动,必须要做好冥等的处理
2.如果业务场景中有状态时:通过状态来控制
3.新建一张表,用来记录消费的信息,消费端的代码里面,根据唯一的标识,判断是否处理过该事件
4.q端的任何更新都应该把聚合根id和事件版本号作为条件,q端的更新不用遵循聚合的原则,可以使用最简单的方式处理
5.仓储是用来重建聚合的,它的行为和集合一样只有get ,add ,delete
6.ddd不是技术,是思想,核心在战略模块,战术设计是实现的一种选择,战略设计,需要面向对象的分析能力,职责分配,深层次的分析业务
如对本文有疑问, 点击进行留言回复!!
Asp.Net Core Identity 骚断腿的究极魔改实体类
你一定看得懂的 DDD+CQRS+EDA+ES 核心思想与极简可运行代码示例
网友评论