当前位置: 移动技术网 > IT编程>开发语言>Java > spring异步service中处理线程数限制详解

spring异步service中处理线程数限制详解

2019年09月09日  | 移动技术网IT编程  | 我要评论

情况简介

spring项目,controller异步调用service的方法,产生大量并发。

具体业务:

前台同时传入大量待翻译的单词,后台业务接收单词,并调用百度翻译接口翻译接收单词并将翻译结果保存到数据库,前台不需要实时返回翻译结果。

处理方式:

controller接收文本调用service中的异步方法,将单词先保存到队列中,再启动2个新线程,从缓存队列中取单词,并调用百度翻译接口获取翻译结果并将翻译结果保存到数据库。

本文主要知识点:

多线程同时(异步)调用方法后,开启新线程,并限制线程数量。

代码如下:

@service
public class lgtsasyncserviceimpl {
 /** logger日志. */
 public static final logger logger = logger.getlogger(lgtsasyncserviceimpl2.class);

 private final blockingqueue<lgts> que = new linkedblockingqueue<>();// 待翻译的队列
 private final atomicinteger threadcnt = new atomicinteger(0);// 当前翻译中的线程数
 private final vector<string> existskey = new vector<>();// 保存已入队列的数据
 private final int maxthreadcnt = 2;// 允许同时执行的翻译线程数
 private static final int num_of_every_time = 50;// 每次提交的翻译条数
 private static final string translationfrom = "zh";

 @async
 public void saveasync(lgts t) {
  if (objects.isnull(t) || stringutils.isanyblank(t.getgco(), t.getcode())) {
   return;
  }
  offer(t);
  save();
  return;
 }

 private boolean offer(lgts t) {
  string key = t.getgco() + "-" + t.getcode();
  if (!existskey.contains(key)) {
   existskey.add(key);
   boolean result = que.offer(t);
   // logger.trace("待翻译文字[" + t.getgco() + ":" + t.getcode() + "]加入队列结果[" + result
   // + "],队列中数据总个数:" + que.size());
   return result;
  }
  return false;
 }

 @autowired
 private lgtsservice lgtsservice;

 private void save() {
  int cnt = threadcnt.incrementandget();// 当前线程数+1
  if (cnt > maxthreadcnt) {
   // 已启动的线程大于设置的最大线程数直接丢弃
   threadcnt.decrementandget();// +1的线程数再-回去
   return;
  }
  gwalluser user = userutils.getuser();
  thread thr = new thread() {
   public void run() {
    long sleeptime = 30000l;
    userutils.setuser(user);
    boolean continueflag = true;
    int maxcontinuecnt = 5;// 最大连续休眠次数,连续休眠次数超过最大休眠次数后,while循环退出,当前线程销毁
    int continuecnt = 0;// 连续休眠次数

    while (continueflag) {// 队列不为空时执行
     if (objects.isnull(que.peek())) {
      try {
       if (continuecnt > maxcontinuecnt) {
        // 连续休眠次数达到最大连续休眠次数,当前线程将销毁。
        continueflag = false;
        continue;
       }
       // 队列为空,准备休眠
       thread.sleep(sleeptime);
       continuecnt++;
       continue;
      } catch (interruptedexception e) {
       // 休眠失败,无需处理
       e.printstacktrace();
      }
     }
     continuecnt = 0;// 重置连续休眠次数为0

     list<lgts> params = new arraylist<>();
     int totalcnt = que.size();
     que.drainto(params, num_of_every_time);
     stringbuilder utf8q = new stringbuilder();
     string code = "";
     list<lgts> needremove = new arraylist<>();
     for (lgts lgts : params) {
      if (stringutils.isanyblank(code)) {
       code = lgts.getcode();
      }
      // 移除existskey中保存的key,以免下面翻译失败时再次加入队列时,加入不进去
      string key = lgts.getgco() + "-" + lgts.getcode();
      existskey.remove(key);

      if (!code.equalsignorecase(lgts.getcode())) {// 要翻译的目标语言与当前列表中的第一个不一致
       offer(lgts);// 重新将待翻译的语言放回队列
       needremove.add(lgts);
       continue;
      }
      utf8q.append(lgts.getgco()).append("\n");
     }
     params.removeall(needremove);
     logger.debug("队列中共" + totalcnt + " 个,获取" + params.size() + " 个符合条件的待翻译内容,编码:" + code);
     string to = "en";
     if (stringutils.isanyblank(utf8q, to)) {
      logger.warn("调用翻译出错,未找到[" + code + "]对应的百度编码。");
      continue;
     }
     map<string, string> result = getbaidutranslation(utf8q.tostring(), translationfrom, to);
     if (objects.isnull(result) || result.isempty()) {// 把没有获取到翻译结果的重新放回队列
      for (lgts lgts : params) {
       offer(lgts);
      }
      logger.debug("本次翻译结果为空。");
      continue;
     }
     int sucesscnt = 0, ignorecnt = 0;
     for (lgts lgts : params) {
      lgts.setbdcode(to);
      string gna = result.get(lgts.getgco());
      if (stringutils.isanyblank(gna)) {
       offer(lgts);// 重新将待翻译的语言放回队列
       continue;
      }
      lgts.setstat(1);
      lgts.setgna(gna);
      int saveresult = lgtsservice.saveignore(lgts);
      if (0 == saveresult) {
       ignorecnt++;
      } else {
       sucesscnt++;
      }
     }
     logger.debug("待翻译个数:" + params.size() + ",翻译成功个数:" + sucesscnt + ",已存在并忽略个数:" + ignorecnt);
    }
    threadcnt.decrementandget();// 运行中的线程数-1
    distory();// 清理数据,必须放在方法最后,否则distory中的判断需要修改
   }

   /**
    * 如果是最后一个线程,清空队列和existskey中的数据
    */
   private void distory() {
    if (0 == threadcnt.get()) {
     // 最后一个线程退出时,执行清理操作
     existskey.clear();
     que.clear();
    }
   }
  };
  thr.setdaemon(true);// 守护线程,如果主线程执行完毕,则此线程会自动销毁
  thr.setname("baidufanyi-" + randomutils.nextint(1000, 9999));
  thr.start();// 启动插入线程
 }

 /**
  * 百度翻译
  * 
  * @param utf8q
  *   待翻译的字符串,需要utf8格式的
  * @param from
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist
  * @param to
  *   百度翻译语言列表中的代码
  *   参见:http://api.fanyi.baidu.com/api/trans/product/apidoc#languagelist
  * @return 翻译结果
  */
 private map<string, string> getbaidutranslation(string utf8q, string from, string to) {
  map<string, string> result = new hashmap<>();
  string baiduurlstr = "http://api.fanyi.baidu.com/api/trans/vip/translate";
  if (stringutils.isanyblank(baiduurlstr)) {
   logger.warn("百度翻译api接口url相关参数为空!");
   return result;
  }
  map<string, string> params = buildparams(utf8q, from, to);
  if (params.isempty()) {
   return result;
  }

  string sendurl = geturlwithquerystring(baiduurlstr, params);
  try {
   httpclient httpclient = new httpclient();
   httpclient.setmethod("get");
   string remoteresult = httpclient.pub(sendurl, "");
   result = convertremote(remoteresult);
  } catch (exception e) {
   logger.info("百度翻译api返回结果异常!", e);
  }
  return result;
 }

 private map<string, string> convertremote(string remoteresult) {
  map<string, string> result = new hashmap<>();
  if (stringutils.isblank(remoteresult)) {
   return result;
  }
  jsonobject jsonobject = jsonobject.parseobject(remoteresult);
  jsonarray trans_result = jsonobject.getjsonarray("trans_result");
  if (objects.isnull(trans_result) || trans_result.isempty()) {
   return result;
  }
  for (object object : trans_result) {
   jsonobject trans = (jsonobject) object;
   result.put(trans.getstring("src"), trans.getstring("dst"));
  }
  return result;
 }

 private map<string, string> buildparams(string utf8q, string from, string to) {
  if (stringutils.isblank(from)) {
   from = "auto";
  }
  map<string, string> params = new hashmap<string, string>();
  string skstr = "sk";
  string appidstr = "appid";
  if (stringutils.isanyblank(skstr, appidstr)) {
   logger.warn("百度翻译api接口相关参数为空!");
   return params;
  }

  params.put("q", utf8q);
  params.put("from", from);
  params.put("to", to);

  params.put("appid", appidstr);

  // 随机数
  string salt = string.valueof(system.currenttimemillis());
  params.put("salt", salt);

  // 签名
  string src = appidstr + utf8q + salt + skstr; // 加密前的原文
  params.put("sign", md5util.md5encrypt(src).tolowercase());
  return params;
 }

 public static string geturlwithquerystring(string url, map<string, string> params) {
  if (params == null) {
   return url;
  }

  stringbuilder builder = new stringbuilder(url);
  if (url.contains("?")) {
   builder.append("&");
  } else {
   builder.append("?");
  }

  int i = 0;
  for (string key : params.keyset()) {
   string value = params.get(key);
   if (value == null) { // 过滤空的key
    continue;
   }

   if (i != 0) {
    builder.append('&');
   }

   builder.append(key);
   builder.append('=');
   builder.append(encode(value));

   i++;
  }

  return builder.tostring();
 }

 /**
  * 对输入的字符串进行url编码, 即转换为%20这种形式
  * 
  * @param input
  *   原文
  * @return url编码. 如果编码失败, 则返回原文
  */
 public static string encode(string input) {
  if (input == null) {
   return "";
  }

  try {
   return urlencoder.encode(input, "utf-8");
  } catch (unsupportedencodingexception e) {
   e.printstacktrace();
  }

  return input;
 }
}

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对移动技术网的支持。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网