当前位置: 移动技术网 > IT编程>开发语言>Java > java连接ElasticSearch集群操作

java连接ElasticSearch集群操作

2020年09月18日  | 移动技术网IT编程  | 我要评论
我就废话不多说了,大家还是直接看代码吧~/* *es配置类 * */ @configurationpublic class elasticsearchdatasourceconfigurer {

我就废话不多说了,大家还是直接看代码吧~

/*
 *es配置类
 *
 */
 
@configuration
public class elasticsearchdatasourceconfigurer {
 
  private static final logger log = logmanager.getlogger(elasticsearchdatasourceconfigurer.class);
  @bean
  public transportclient getesclient() {
    //设置集群名称
    settings settings = settings.builder().put("cluster.name", "bigdata-cluster").put("client.transport.sniff", true).build();
    //创建client
    transportclient client = null;
    try {
      client = new prebuilttransportclient(settings)
          .addtransportaddress(new inetsockettransportaddress(inetaddress.getbyname(""), 9300));//集群ip
      log.info("esclient连接建立成功");
    } catch (unknownhostexception e) {
      log.info("esclient连接建立失败");
      e.printstacktrace();
    }
    return client;
  } 
}
/**
 * simple to introduction
 *
 * @description: [添加类]
 */
@repository
public class userdaoimpl implements userdao {
 
	private static final string indexname = "user";//小写
	private static final string typename = "info";
 
	@resource
	transportclient transportclient;
 
	@override
	public int adduser(user[] user) {
		indexresponse indexresponse = null;
		int successnum = 0;
		for (int i = 0; i < user.length; i++) {
			uuid uuid = uuid.randomuuid();
			string str = uuid.tostring();
			string jsonvalue = null;
			try {
				jsonvalue = jsonutil.object2jsonstring(user[i]);
				if (jsonvalue != null) {
					indexresponse = transportclient.prepareindex(indexname, typename, str).setsource(jsonvalue)
							.execute().actionget();
					successnum++;
				}
			} catch (jsonprocessingexception e) {
				e.printstacktrace();
			}
 
		}
		return successnum;
	} 
}
 
/**
 *批量插入
 */
public static void bathadduser(transportclient client, list<user> users) {
 
		bulkrequestbuilder bulkrequest = transportclient.preparebulk();
		for (int i = 0; i < users.size(); i++) {
			uuid uuid = uuid.randomuuid();
			string str = uuid.tostring();
 
			string jsonvalue = null;
			try {
				jsonvalue = jsonutil.object2jsonstring(users.get(i));
			} catch (jsonprocessingexception e) {
				e.printstacktrace();
			}
			bulkrequest.add(client.prepareindex("user", "info", str).setsource(jsonvalue));
			// 一万条插入一次
			if (i % 10000 == 0) {
				bulkrequest.execute().actionget();
			}
			system.out.println("已经插入第" + i + "多少条");
		}
 
	}

补充知识:使用java创建es(elasticsearch)连接池

1.首先要有一个创建连接的工厂类

package com.aly.util; 
import org.apache.commons.pool2.pooledobject;
import org.apache.commons.pool2.pooledobjectfactory;
import org.apache.commons.pool2.impl.defaultpooledobject;
import org.apache.http.httphost;
import org.elasticsearch.client.restclient;
import org.elasticsearch.client.resthighlevelclient;
 
/**
 * eliasticsearch连接池工厂对象
 * @author 00000
 *
 */
public class esclientpoolfactory implements pooledobjectfactory<resthighlevelclient>{
 
	@override
	public void activateobject(pooledobject<resthighlevelclient> arg0) throws exception {
		system.out.println("activateobject");
		
	}
	
	/**
	 * 销毁对象
	 */
	@override
	public void destroyobject(pooledobject<resthighlevelclient> pooledobject) throws exception {
		resthighlevelclient highlevelclient = pooledobject.getobject();
		highlevelclient.close();
	}
	
	/**
	 * 生产对象
	 */
//	@suppresswarnings({ "resource" })
	@override
	public pooledobject<resthighlevelclient> makeobject() throws exception {
//		settings settings = settings.builder().put("cluster.name","elasticsearch").build();
		resthighlevelclient client = null;
		try {
			/*client = new prebuilttransportclient(settings)
          .addtransportaddress(new transportaddress(inetaddress.getbyname("localhost"),9300));*/
			client = new resthighlevelclient(restclient.builder(
					new httphost("192.168.1.121", 9200, "http"), new httphost("192.168.1.122", 9200, "http"),
					new httphost("192.168.1.123", 9200, "http"), new httphost("192.168.1.125", 9200, "http"),
					new httphost("192.168.1.126", 9200, "http"), new httphost("192.168.1.127", 9200, "http")));
 
		} catch (exception e) {
			e.printstacktrace();
		}
		return new defaultpooledobject<resthighlevelclient>(client);
	}
 
	@override
	public void passivateobject(pooledobject<resthighlevelclient> arg0) throws exception {
		system.out.println("passivateobject");
	}
 
	@override
	public boolean validateobject(pooledobject<resthighlevelclient> arg0) {
		return true;
	}	
}

2.然后再写我们的连接池工具类

package com.aly.util; 
import org.apache.commons.pool2.impl.genericobjectpool;
import org.apache.commons.pool2.impl.genericobjectpoolconfig;
import org.elasticsearch.client.resthighlevelclient;
 
/**
 * elasticsearch 连接池工具类
 * 
 * @author 00000
 *
 */
public class elasticsearchpoolutil {
	// 对象池配置类,不写也可以,采用默认配置
	private static genericobjectpoolconfig poolconfig = new genericobjectpoolconfig();
	// 采用默认配置maxtotal是8,池中有8个client
	static {
		poolconfig.setmaxtotal(8);
	}
	// 要池化的对象的工厂类,这个是我们要实现的类
	private static esclientpoolfactory esclientpoolfactory = new esclientpoolfactory();
	// 利用对象工厂类和配置类生成对象池
	private static genericobjectpool<resthighlevelclient> clientpool = new genericobjectpool<>(esclientpoolfactory,
			poolconfig);
 
	/**
	 * 获得对象
	 * 
	 * @return
	 * @throws exception
	 */
	public static resthighlevelclient getclient() throws exception {
		// 从池中取一个对象
		resthighlevelclient client = clientpool.borrowobject();
		return client;
	}
 
	/**
	 * 归还对象
	 * 
	 * @param client
	 */
	public static void returnclient(resthighlevelclient client) {
		// 使用完毕之后,归还对象
		clientpool.returnobject(client);
	}
}

以上这篇java连接elasticsearch集群操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持移动技术网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网