当前位置: 移动技术网 > IT编程>开发语言>Java > java多线程处理执行solr创建索引示例

java多线程处理执行solr创建索引示例

2019年07月22日  | 移动技术网IT编程  | 我要评论

复制代码 代码如下:

public class solrindexer implements indexer, searcher, disposablebean {
 //~ static fields/initializers =============================================

 static final logger logger = loggerfactory.getlogger(solrindexer.class);

 private static final long shutdown_timeout    = 5 * 60 * 1000l; // long enough

 private static final int  input_queue_length  = 16384;

 //~ instance fields ========================================================

 private commonshttpsolrserver server;

 private blockingqueue<operation> inputqueue;

 private thread updatethread;
 volatile boolean running = true;
 volatile boolean shuttingdown = false;

 //~ constructors ===========================================================

 public solrindexer(string url) throws malformedurlexception {
  server = new commonshttpsolrserver(url);

  inputqueue = new arrayblockingqueue<operation>(input_queue_length);

  updatethread = new thread(new updatetask());
  updatethread.setname("solrindexer");
  updatethread.start();
 }

 //~ methods ================================================================

 public void setsotimeout(int timeout) {
  server.setsotimeout(timeout);
 }

 public void setconnectiontimeout(int timeout) {
  server.setconnectiontimeout(timeout);
 }

 public void setallowcompression(boolean allowcompression) {
  server.setallowcompression(allowcompression);
 }


 public void addindex(indexable indexable) throws indexingexception {
  if (shuttingdown) {
   throw new illegalstateexception("solrindexer is shutting down");
  }
  inputqueue.offer(new operation(indexable, operationtype.update));
 }
 

 public void delindex(indexable indexable) throws indexingexception {
  if (shuttingdown) {
   throw new illegalstateexception("solrindexer is shutting down");
  }
  inputqueue.offer(new operation(indexable, operationtype.delete));
 }

 
 private void updateindices(string type, list<indexable> indices) throws indexingexception {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("updating {} indices", indices.size());

  updaterequest req = new updaterequest("/" + type + "/update");
  req.setaction(updaterequest.action.commit, false, false);

  for (indexable idx : indices) {
   doc doc = idx.getdoc();

   solrinputdocument solrdoc = new solrinputdocument();
   solrdoc.setdocumentboost(doc.getdocumentboost());
   for (iterator<field> i = doc.iterator(); i.hasnext();) {
    field field = i.next();
    solrdoc.addfield(field.getname(), field.getvalue(), field.getboost());
   }

   req.add(solrdoc);   
  }

  try {
   req.process(server);   
  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  } catch (ioexception e) {
   logger.error("ioexception occurred", e);
   throw new indexingexception(e);
  }
 }

 
 private void delindices(string type, list<indexable> indices) throws indexingexception {
  if (indices == null || indices.size() == 0) {
   return;
  }

  logger.debug("deleting {} indices", indices.size());

  updaterequest req = new updaterequest("/" + type + "/update");
  req.setaction(updaterequest.action.commit, false, false);
  for (indexable indexable : indices) {   
   req.deletebyid(indexable.getdocid());
  }

  try {
   req.process(server);
  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  } catch (ioexception e) {
   logger.error("ioexception occurred", e);
   throw new indexingexception(e);
  }
 }

 
 public queryresult search(query query) throws indexingexception {
  solrquery sq = new solrquery();
  sq.setquery(query.getquery());
  if (query.getfilter() != null) {
   sq.addfilterquery(query.getfilter());
  }
  if (query.getorderfield() != null) {
   sq.addsortfield(query.getorderfield(), query.getorder() == query.order.desc ? solrquery.order.desc : solrquery.order.asc);
  }
  sq.setstart(query.getoffset());
  sq.setrows(query.getlimit());

  queryrequest req = new queryrequest(sq);
  req.setpath("/" + query.gettype() + "/select");

  try {
   queryresponse rsp = req.process(server);
   solrdocumentlist docs = rsp.getresults();

   queryresult result = new queryresult();
   result.setoffset(docs.getstart());
   result.settotal(docs.getnumfound());
   result.setsize(sq.getrows());

   list<doc> resultdocs = new arraylist<doc>(result.getsize());
   for (iterator<solrdocument> i = docs.iterator(); i.hasnext();) {
    solrdocument solrdocument = i.next();

    doc doc = new doc();
    for (iterator<map.entry<string, object>> iter = solrdocument.iterator(); iter.hasnext();) {
     map.entry<string, object> field = iter.next();
     doc.addfield(field.getkey(), field.getvalue());
    }

    resultdocs.add(doc);
   }

   result.setdocs(resultdocs);
   return result;

  } catch (solrserverexception e) {
   logger.error("solrserverexception occurred", e);
   throw new indexingexception(e);
  }
 }
 

 public void destroy() throws exception {
  shutdown(shutdown_timeout, timeunit.milliseconds);  
 }

 public boolean shutdown(long timeout, timeunit unit) {
  if (shuttingdown) {
   logger.info("suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingdown = true;
  string basename = updatethread.getname();
  updatethread.setname(basename + " - shutting down");
  boolean rv = false;
  try {
   // conditionally wait
   if (timeout > 0) {
    updatethread.setname(basename + " - shutting down (waiting)");
    rv = waitforqueue(timeout, unit);
   }
  } finally {
   // but always begin the shutdown sequence
   running = false;
   updatethread.setname(basename + " - shutting down (informed client)");
  }
  return rv;
 }

 /**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitforqueue(long timeout, timeunit unit) {
  countdownlatch latch = new countdownlatch(1);
  inputqueue.add(new stopoperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (interruptedexception e) {
   throw new runtimeexception("interrupted waiting for queues", e);
  }
 }

 

 class updatetask implements runnable {
  public void run() {
   while (running) {
    try {
     syncindices();
    } catch (throwable e) {
     if (shuttingdown) {
      logger.warn("exception occurred during shutdown", e);
     } else {
      logger.error("problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("shut down solrindexer");
  }
 }

 void syncindices() throws interruptedexception {
  operation op = inputqueue.poll(1000l, timeunit.milliseconds);

  if (op == null) {
   return;
  }

  if (op instanceof stopoperation) {
   ((stopoperation) op).stop();
   return;
  }

  // wait 1 second
  try {
   thread.sleep(1000);
  } catch (interruptedexception e) {

  }

  list<operation> ops = new arraylist<operation>(inputqueue.size() + 1);
  ops.add(op);
  inputqueue.drainto(ops);

  map<string, list<indexable>> deletemap = new hashmap<string, list<indexable>>(4);
  map<string, list<indexable>> updatemap = new hashmap<string, list<indexable>>(4);

  for (operation o : ops) {
   if (o instanceof stopoperation) {
    ((stopoperation) o).stop();
   } else {
    indexable indexable = o.indexable;
    if (o.type == operationtype.delete) {
     list<indexable> docs = deletemap.get(indexable.gettype());
     if (docs == null) {
      docs = new linkedlist<indexable>();
      deletemap.put(indexable.gettype(), docs);
     }
     docs.add(indexable);
    } else {
     list<indexable> docs = updatemap.get(indexable.gettype());
     if (docs == null) {
      docs = new linkedlist<indexable>();
      updatemap.put(indexable.gettype(), docs);
     }
     docs.add(indexable);
    }
   }
  }

  for (iterator<map.entry<string, list<indexable>>> i = deletemap.entryset().iterator(); i.hasnext();) {
   map.entry<string, list<indexable>> entry = i.next();
   delindices(entry.getkey(), entry.getvalue());
  }

  for (iterator<map.entry<string, list<indexable>>> i = updatemap.entryset().iterator(); i.hasnext();) {
   map.entry<string, list<indexable>> entry = i.next();
   updateindices(entry.getkey(), entry.getvalue());
  }
 }

 enum operationtype { delete, update, shutdown }

 static class operation {
  operationtype type;
  indexable indexable;

  operation() {}

  operation(indexable indexable, operationtype type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

 static class stopoperation extends operation {
  countdownlatch latch;

  stopoperation(countdownlatch latch) {
   this.latch = latch;
   this.type = operationtype.shutdown;
  }

  public void stop() {
   latch.countdown();
  }
 }

 //~ accessors ===============

}

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网