当前位置: 移动技术网 > IT编程>开发语言>.net > .NetCore下ES查询驱动 PlainElastic .Net 升级官方驱动 Elasticsearch .Net

.NetCore下ES查询驱动 PlainElastic .Net 升级官方驱动 Elasticsearch .Net

2019年07月23日  | 移动技术网IT编程  | 我要评论

华珍卡宾达,泗洪高利贷,北京南站到北京北站

1.背景

由于历史原因,笔者所在的公司原有的es查询驱动采用的是 plainelastic.net, 经过询问原来是之前plainelastic.net在园子里文档较多,上手比较容易,所以最初作者选用了该驱动,而发布也由于历史原因都部署在 windows 服务器上,基于 .net framework开发。

后来由于迁移 .net core 平台的需要,对代码进行了升级,同时部署平台也迁移至 centos7 服务器,升级过程比较顺利,由于没有使用特殊api,所以几乎没有对业务代码做更多的修改,同时测试阶段由于没有多余的机器,仍然放在了原有windows服务器上做的测试,一切都没有问题,完美上线。

事发突然,某天接到运维部门反馈,部署查询服务的机器突然出现 tcp 连接数超高的问题,同时这台机器其他的tcp服务也无法建立新的连接,但已经建立的连接不受影响。联想到 elasticsearch 查询服务是基于http 请求的,脑子里马上联想到 .net core 下 httpclient 如果每次访问都创建新实例,则会每次都建立新的tcp连接,而 linux 对已释放端口回收的时间窗口,会导致在高并发情况下,客户端机器端口占用持续增加,同时被调用服务端连接数也会持续增加。

基于此猜测,立马去扒了一下plainelastic.net源代码:

源码地址:https://github.com/yegoroff/plainelastic.net/blob/master/src/plainelastic.net/connection/elasticconnection.cs
果然如猜测的那样,每次都创建了新的 httpwebrequest 实例,看了作者的最后维护时间也已经是3年前了,可能是后来官方驱动日趋完善,作者也便停止了维护。

既然如此,那么让我们看下官方最新驱动源码是否如我们想象,是基于httpclientfactory来解决这个问题的?

源码地址:https://github.com/elastic/elasticsearch-net/blob/master/src/elasticsearch.net/connection/httpconnection.cs

上述代码看来,官方驱动并非是采用微软官方建议的 httpclientfactory ,而是官方底层自己维护的一个线程安全的字典来管理 httpclient 实例池,虽是自己实现,但效果一样:相同地址的请求,是链接复用的,这样就解决不断开启 tcp 连接的问题。

问题找到,立马进行驱动升级:

2.驱动升级

说明: elasticsearch.net官方驱动地址:

官方驱动分为 low level client 和 nest(heigh level client),其中low level client 仅仅做了最基本的封装,几乎等价于http原生调用,带来了极大的灵活性的同时,也带来使用成本,而对于开发人员来说使用 nest 提供的更加高级的api,可以更加快速的进行开发工作,也同时可以利用到 .net 所提供的各种语法糖,比如 => 表达式。

话不多说,看示例代码:

实例创建

public elasticservice()
{
    var uris = new uri[] { new uri("http://172.17.78.111:9200"), new uri("http://172.17.78.112:9200") }; //支持多个节点
    var connectionpool = new sniffingconnectionpool(uris);
    var settings = new connectionsettings(connectionpool).defaultindex("testindex");//注意index不可以大写
    settings.basicauthentication("", ""); //设置账号密码,没有可以跳过
    this._client = new elasticclient(settings);
}

插入待测试数据

public class people 
{
    public guid id { get; set; }
    public string name { get; set; }
    public int age { get; set; }
    public datetime birthday { get; set; }
    public bool gender { get; set; }
    public string address { get; set; }
    public datetime createtime { get; set; } = datetime.now;
}

//批量插入
public async task<ibulkresponse> addpeopleasync(people[] peoples)
{
    var descriptor = new bulkdescriptor();
    foreach (var p in peoples)
    {
        var response = await _client.indexdocumentasync(p);
        descriptor.index<people>(op => op.document(p));
    }
    return await _client.bulkasync(descriptor);//批量插入
}

多查询条件拼接

public querycontainer buildquerycontainer(searchcondition condition)
{
    var querycombin = new list<func<querycontainerdescriptor<people>, querycontainer>>();
    if (!string.isnullorempty(condition.name))
        querycombin.add(mt => mt.match(m => m.field(t => t.name).query(condition.name))); //字符串匹配

    if (condition.age.hasvalue)
        querycombin.add(mt => mt.range(m => m.field(t => t.address).greaterthanorequals(condition.age))); //数值区间匹配

    if (!string.isnullorempty(condition.address))
        querycombin.add(mt => mt.matchphrase(m => m.field(t => t.address).query(condition.address))); //短语匹配

    if (!condition.gender.hasvalue)
        querycombin.add(mt => mt.term(m => m.field(t => t.gender).value(condition.gender)));//精确匹配

    return query<people>.bool(b => b
        .must(querycombin)
        .filter(f => f
            .daterange(dr => dr.field(t => t.createtime) //时间范围匹配
                .greaterthanorequals(datemath.anchored(condition.begincreatetime.tostring("yyyy-mm-ddthh:mm:ss")))
                .lessthanorequals(datemath.anchored(condition.endcreatetime.tostring("yyyy-mm-ddthh:mm:ss"))))));
}

提示:match 和 matchphrase 的区别,例如对于"长宁区"

  1. match 会将"长宁区"进行分词匹配,例如只要包含"区"的数据(比如静安区),也会被查询命中
  2. matchphrase 则可以理解为短语匹配,只有当数据包含“长宁区”完整短语的数据,才会被查询命中

增加分页查询接口

public async task<pagedresult<people[]>> querypeopleasync(searchcondition condition, int pageindex, int pagesize)
{
    var query = this.buildquerycontainer(condition);
    var response = await this._client.searchasync<people>(s => s
            .index("testindex")
            .from(pageindex * pagesize)
            .size(pagesize)
            .query(q => query)
            .sort(st => st.descending(d => d.createtime)));

    if (response.apicall.success)
    {
        return new pagedresult<people[]>
        {
            pageindex = pageindex,
            pagesize = pagesize,
            total = response.total,
            returnobj = response.hits.select(s => s.source).toarray()
        };
    }

    return new pagedresult<people[]> { issuccess = false };
}

编写单元测试

[testmethod]
public async task querypeopletest()
{
    var condition = new searchcondition
    {
        address="长宁区",
        begincreatetime = datetime.now.adddays(-1),
        endcreatetime = datetime.now
    };

    var result = await this._elasticservice.querypeopleasync(condition, 0, 3);
    assert.istrue(result.issuccess);
}

利用 wireshark 抓包分析http调用细节

将抓包的数据转换为http流,查看请求细节:

提示:通过wireshark抓包是排查错误很有效的方式,有时候通过查询文档进行分析,还不如先抓包查看请求数据来得直接,同时可以将抓包数据放在kabana所提供的 dev tools中验证自己的想法。

利用 kibana 提供的 dev tools 验证/测试 查询条件

3.总结

从.net framework 平台转向 .net core 平台,其实不仅仅是开发框架的升级,或者从 windows 转向 linux 的迁移,而是需要我们有更多的开源思维,即:

  1. 由于会使用到更多的三方组件,开发人员需要更多关注社区的变化
  2. 开源代码,意味着开发人员可以并且需要更多关注源代码的底层实现

本文示例代码地址:https://github.com/xboo/articles/tree/master/src/elasticsearchnetdemo

如对本文有疑问,请在下面进行留言讨论,广大热心网友会与你互动!! 点击进行留言回复

相关文章:

验证码:
移动技术网