当前位置: 移动技术网 > IT编程>开发语言>Java > Spring Boot整合FTPClient线程池的实现示例

Spring Boot整合FTPClient线程池的实现示例

2019年07月19日  | 移动技术网IT编程  | 我要评论

最近在写一个ftp上传工具,用到了apache的ftpclient,但是每个线程频繁的创建和销毁ftpclient对象对服务器的压力很大,因此,此处最好使用一个ftpclient连接池。仔细翻了一下apache的api,发现它并没有一个ftpclientpool的实现,所以,不得不自己写一个ftpclientpool。下面就大体介绍一下开发连接池的整个过程,供大家参考。

我们可以利用apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool 包中的objectpool和poolableobjectfactory两个接口即可。

线程池的意义

为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。

pom引入依赖

 <!-- ftpclient依赖包-->
    <dependency>
      <groupid>commons-net</groupid>
      <artifactid>commons-net</artifactid>
      <version>3.5</version>
    </dependency>

    <!-- 线程池-->
    <dependency>
      <groupid>commons-pool</groupid>
      <artifactid>commons-pool</artifactid>
      <version>1.6</version>
    </dependency>

    <dependency>
      <groupid>org.apache.commons</groupid>
      <artifactid>commons-pool2</artifactid>
      <version>2.0</version>
    </dependency>

创建ftp配置信息

在resources目录下创建ftp.properties配置文件,目录结构如下:

添加如下的配置信息:

########### ftp用户名称 ###########
ftp.username=hrabbit
########### ftp用户密码 ###########
ftp.password=123456
########### ftp主机ip ###########
ftp.host=127.0.0.1
########### ftp主机端口号 ###########
ftp.port=21
########### 保存根路径 ###########
ftp.baseurl=/

创建ftpproperties.java配置文件

加载配置内容到spring中,配置信息基本延用我的就可以。

/**
 * ftp的配置信息
 * @auther: hrabbit
 * @date: 2018-12-03 2:06 pm
 * @description:
 */
@data
@component
@propertysource("classpath:ftp.properties")
@configurationproperties(prefix = "ftp")
public class ftpproperties {

  private string username;

  private string password;

  private string host;

  private integer port;

  private string baseurl;

  private integer passivemode = ftp.binary_file_type;

  private string encoding="utf-8";

  private int clienttimeout=120000;

  private int buffersize;

  private int transferfiletype=ftp.binary_file_type;

  private boolean renameuploaded;

  private int retrytime;
}

创建ftpclientpool线程池

/**
 * 自定义实现ftp连接池
 * @auther: hrabbit
 * @date: 2018-12-03 3:40 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientpool implements objectpool<ftpclient> {

  private static final int default_pool_size = 10;

  public blockingqueue<ftpclient> blockingqueue;

  private ftpclientfactory factory;

  public ftpclientpool(ftpclientfactory factory) throws exception {
    this(default_pool_size, factory);
  }

  public ftpclientpool(int poolsize, ftpclientfactory factory) throws exception {
    this.factory = factory;
    this.blockingqueue = new arrayblockingqueue<ftpclient>(poolsize);
    initpool(poolsize);
  }

  /**
   * 初始化连接池
   * @param maxpoolsize
   *         最大连接数
   * @throws exception
   */
  private void initpool(int maxpoolsize) throws exception {
    int count = 0;
    while(count < maxpoolsize) {
      this.addobject();
      count++;
    }
  }

  /**
   * 从连接池中获取对象
   */
  @override
  public ftpclient borrowobject() throws exception {
    ftpclient client = blockingqueue.take();
    if(client == null) {
      client = factory.makeobject();
    } else if(!factory.validateobject(client)) {
      invalidateobject(client);
      client = factory.makeobject();
    }
    return client;
  }

  /**
   * 返还一个对象(链接)
   */
  @override
  public void returnobject(ftpclient client) throws exception {
    if ((client != null) && !blockingqueue.offer(client,2,timeunit.minutes)) {
      try {
        factory.destroyobject(client);
      } catch (exception e) {
        throw e;
      }
    }
  }

  /**
   * 移除无效的对象(ftp客户端)
   */
  @override
  public void invalidateobject(ftpclient client) throws exception {
    blockingqueue.remove(client);
  }

  /**
   * 增加一个新的链接,超时失效
   */
  @override
  public void addobject() throws exception {
    blockingqueue.offer(factory.makeobject(), 2, timeunit.minutes);
  }

  /**
   * 重新连接
   */
  public ftpclient reconnect() throws exception {
    return factory.makeobject();
  }

  /**
   * 获取空闲链接数(这里暂不实现)
   */
  @override
  public int getnumidle() {
    return blockingqueue.size();
  }

  /**
   * 获取正在被使用的链接数
   */
  @override
  public int getnumactive() {
    return default_pool_size - getnumidle();
  }

  @override
  public void clear() throws exception {

  }

  /**
   * 关闭连接池
   */
  @override
  public void close() {
    try {
      while(blockingqueue.iterator().hasnext()) {
        ftpclient client = blockingqueue.take();
        factory.destroyobject(client);
      }
    } catch(exception e) {
      log.error("close ftp client pool failed...{}", e);
    }
  }

  /**
   * 增加一个新的链接,超时失效
   */
  public void addobject(ftpclient ftpclient) throws exception {
    blockingqueue.put(ftpclient);
  }
}

创建一个ftpclientfactory工厂类

创建ftpclientfactory实现poolableobjectfactory的接口,ftpclient工厂类,通过ftpclient工厂提供ftpclient实例的创建和销毁

/**
 * ftpclient 工厂
 * @auther: hrabbit
 * @date: 2018-12-03 3:41 pm
 * @description:
 */
@slf4j
@suppresswarnings("all")
public class ftpclientfactory implements poolableobjectfactory<ftpclient> {

  private ftpproperties ftpproperties;

  public ftpclientfactory(ftpproperties ftpproperties) {
    this.ftpproperties = ftpproperties;
  }

  @override
  public ftpclient makeobject() throws exception {
    ftpclient ftpclient = new ftpclient();
    ftpclient.setcontrolencoding(ftpproperties.getencoding());
    ftpclient.setconnecttimeout(ftpproperties.getclienttimeout());
    try {
      ftpclient.connect(ftpproperties.gethost(), ftpproperties.getport());
      int reply = ftpclient.getreplycode();
      if (!ftpreply.ispositivecompletion(reply)) {
        ftpclient.disconnect();
        log.warn("ftpserver refused connection");
        return null;
      }
      boolean result = ftpclient.login(ftpproperties.getusername(), ftpproperties.getpassword());
      ftpclient.setfiletype(ftpproperties.gettransferfiletype());
      if (!result) {
        log.warn("ftpclient login failed... username is {}", ftpproperties.getusername());
      }
    } catch (exception e) {
      log.error("create ftp connection failed...{}", e);
      throw e;
    }

    return ftpclient;
  }

  @override
  public void destroyobject(ftpclient ftpclient) throws exception {
    try {
      if(ftpclient != null && ftpclient.isconnected()) {
        ftpclient.logout();
      }
    } catch (exception e) {
      log.error("ftp client logout failed...{}", e);
      throw e;
    } finally {
      if(ftpclient != null) {
        ftpclient.disconnect();
      }
    }

  }

  @override
  public boolean validateobject(ftpclient ftpclient) {
    try {
      return ftpclient.sendnoop();
    } catch (exception e) {
      log.error("failed to validate client: {}");
    }
    return false;
  }

  @override
  public void activateobject(ftpclient obj) throws exception {
    //do nothing

  }

  @override
  public void passivateobject(ftpclient obj) throws exception {
    //do nothing

  }
}

创建ftputils.java的工具类

ftputils.java中封装了上传、下载等方法,在项目启动的时候,在@postconstruct注解的作用下通过执行init()的方法,创建ftpclientfactory工厂中,并初始化了ftpclientpool线程池,这样每次调用方法的时候,都直接从ftpclientpool中取出一个ftpclient对象

/**
 * @auther: hrabbit
 * @date: 2018-12-03 3:47 pm
 * @description:
 */
@slf4j
@component
public class ftputils {

  /**
   * ftp的连接池
   */
  @autowired
  public static ftpclientpool ftpclientpool;
  /**
   * ftpclient对象
   */
  public static ftpclient ftpclient;


  private static ftputils ftputils;

  @autowired
  private ftpproperties ftpproperties;

  /**
   * 初始化设置
   * @return
   */
  @postconstruct
  public boolean init() {
    ftpclientfactory factory = new ftpclientfactory(ftpproperties);
    ftputils = this;
    try {
      ftpclientpool = new ftpclientpool(factory);
    } catch (exception e) {
      e.printstacktrace();
      return false;
    }
    return true;
  }


  /**
   * 获取连接对象
   * @return
   * @throws exception
   */
  public static ftpclient getftpclient() throws exception {
    //初始化的时候从队列中取出一个连接
    if (ftpclient==null) {
      synchronized (ftpclientpool) {
        ftpclient = ftpclientpool.borrowobject();
      }
    }
    return ftpclient;
  }


  /**
   * 当前命令执行完成命令完成
   * @throws ioexception
   */
  public void complete() throws ioexception {
    ftpclient.completependingcommand();
  }

  /**
   * 当前线程任务处理完成,加入到队列的最后
   * @return
   */
  public void disconnect() throws exception {
    ftpclientpool.addobject(ftpclient);
  }

  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param input
   *      本地文件流
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, inputstream input) {
    boolean result = false;
    try {
      getftpclient();
      ftpclient.enterlocalpassivemode();
      result = ftpclient.storefile(remotefile, input);
      input.close();
      ftpclient.disconnect();
    } catch (exception e) {
      e.printstacktrace();
    }
    return result;
  }

  /**
   * description: 向ftp服务器上传文件
   *
   * @version1.0
   * @param remotefile
   *      上传到ftp服务器上的文件名
   * @param localfile
   *      本地文件
   * @return 成功返回true,否则返回false
   */
  public static boolean uploadfile(string remotefile, string localfile){
    fileinputstream input = null;
    try {
      input = new fileinputstream(new file(localfile));
    } catch (filenotfoundexception e) {
      e.printstacktrace();
    }
    return uploadfile(remotefile, input);
  }

  /**
   * 拷贝文件
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean copyfile(string fromfile, string tofile) throws exception {
    inputstream in=getfileinputstream(fromfile);
    getftpclient();
    boolean flag = ftpclient.storefile(tofile, in);
    in.close();
    return flag;
  }

  /**
   * 获取文件输入流
   * @param filename
   * @return
   * @throws ioexception
   */
  public static inputstream getfileinputstream(string filename) throws exception {
    bytearrayoutputstream fos=new bytearrayoutputstream();
    getftpclient();
    ftpclient.retrievefile(filename, fos);
    bytearrayinputstream in=new bytearrayinputstream(fos.tobytearray());
    fos.close();
    return in;
  }

  /**
   * description: 从ftp服务器下载文件
   *
   * @version1.0
   * @return
   */
  public static boolean downfile(string remotefile, string localfile){
    boolean result = false;
    try {
      getftpclient();
      outputstream os = new fileoutputstream(localfile);
      ftpclient.retrievefile(remotefile, os);
      ftpclient.logout();
      ftpclient.disconnect();
      result = true;
    } catch (exception e) {
      e.printstacktrace();
    } finally {
      try {
      } catch (exception e) {
        e.printstacktrace();
      }
    }
    return result;
  }

  /**
   * 从ftp中获取文件流
   * @param filepath
   * @return
   * @throws exception
   */
  public static inputstream getinputstream(string filepath) throws exception {
    getftpclient();
    inputstream inputstream = ftpclient.retrievefilestream(filepath);
    return inputstream;
  }

  /**
   * ftp中文件重命名
   * @param fromfile
   * @param tofile
   * @return
   * @throws exception
   */
  public boolean rename(string fromfile,string tofile) throws exception {
    getftpclient();
    boolean result = ftpclient.rename(fromfile,tofile);
    return result;
  }

  /**
   * 获取ftp目录下的所有文件
   * @param dir
   * @return
   */
  public ftpfile[] getfiles(string dir) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }

  /**
   * 获取ftp目录下的某种类型的文件
   * @param dir
   * @param filter
   * @return
   */
  public ftpfile[] getfiles(string dir, ftpfilefilter filter) throws exception {
    getftpclient();
    ftpfile[] files = new ftpfile[0];
    try {
      files = ftpclient.listfiles(dir, filter);
    }catch (throwable thr){
      thr.printstacktrace();
    }
    return files;
  }

  /**
   * 创建文件夹
   * @param remotedir
   * @return 如果已经有这个文件夹返回false
   */
  public boolean makedirectory(string remotedir) throws exception {
    getftpclient();
    boolean result = false;
    try {
      result = ftpclient.makedirectory(remotedir);
    } catch (ioexception e) {
      e.printstacktrace();
    }
    return result;
  }

  public boolean mkdirs(string dir) throws exception {
    boolean result = false;
    if (null == dir) {
      return result;
    }
    getftpclient();
    ftpclient.changeworkingdirectory("/");
    stringtokenizer dirs = new stringtokenizer(dir, "/");
    string temp = null;
    while (dirs.hasmoreelements()) {
      temp = dirs.nextelement().tostring();
      //创建目录
      ftpclient.makedirectory(temp);
      //进入目录
      ftpclient.changeworkingdirectory(temp);
      result = true;
    }
    ftpclient.changeworkingdirectory("/");
    return result;
  }
}

创建ftpclienttest.java测试类

上传一张图片到ftp服务器,并将文件重新命名为hrabbit.jpg,代码如下:

/**
 * ftpclient测试
 * @auther: hrabbit
 * @date: 2018-12-21 9:14 pm
 * @description:
 */
@runwith(springrunner.class)
@springboottest
public class ftpclienttest {

  /**
   * 测试上传
   */
  @test
  public void uploadfile(){
    boolean flag = ftputils.uploadfile("hrabbit.jpg", "/users/mrotaku/downloads/klklklkl_4x.jpg");
    assert.assertequals(true, flag);
  }

}

程序完美运行,这时候我们查看我们的ftp服务器,http://localhost:8866/hrabbit.jpg

码云地址:

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如对本文有疑问, 点击进行留言回复!!

相关文章:

验证码:
移动技术网