当前位置: 移动技术网 > IT编程>开发语言>Java > Java负载均衡服务器实现上传文件同步

Java负载均衡服务器实现上传文件同步

2020年09月20日  | 移动技术网IT编程  | 我要评论
负载服务器z,应用服务器a 和b ,从a上传的附件,如何在b上下载下来?这个问题我的解决思路如下(后来被一个装逼的面试官给批评了这种做法,不过我瞧不起他)服务器a、b 上传附件的时候,将这个附件备份到

负载服务器z,应用服务器a 和b ,从a上传的附件,如何在b上下载下来?

这个问题我的解决思路如下(后来被一个装逼的面试官给批评了这种做法,不过我瞧不起他)

服务器a、b 上传附件的时候,将这个附件备份到服务器z ,当a、b下载文件的时候,首先会在自己服务器的目录下寻找,如果找不到,就会从服务器z 上下载一份到当前服务器。

服务器之间的文件备份通过sftp,参考:(下文中的sftpcustom 类就是这个链接里的 “sftp上传下载文件例子” 中的类)

这里主要介绍一下重写上传、下载的方法时应该添加的代码

上传文件,异步操作

 new thread(() -> {

  sftpcustom fu = new sftpcustom();
  fu.upload(file.getabsolutepath(), getfilename(filedescr));
  fu.closechannel();
}).start();

下载文件,先从当前服务器寻找

string tmppath = roots[0].getpath() + '/' + getfilename(filedescr);
file file2 = new file(tmppath);
if (file2.exists()) {
  return fileutils.openinputstream(file2);
}

sftpcustom fu = new sftpcustom();
fu.download(getfilename(filedescr), tmppath);
file2 = new file(tmppath);
inputstream = fileutils.openinputstream(file2);
fu.closechannel();
return inputstream;

cuba 框架中重写上传文件类filestorage.java 的代码如下:

package com.haulmont.cuba.core.app.custom;

import com.google.common.util.concurrent.threadfactorybuilder;
import com.haulmont.cuba.core.app.filestorageapi;
import com.haulmont.cuba.core.app.serverconfig;
import com.haulmont.cuba.core.entity.filedescriptor;
import com.haulmont.cuba.core.global.*;
import com.haulmont.cuba.core.sys.appcontext;
import com.haulmont.cuba.core.sys.securitycontext;
import org.apache.commons.io.fileutils;
import org.apache.commons.io.ioutils;
import org.apache.commons.lang.stringutils;
import org.slf4j.logger;
import org.slf4j.loggerfactory;

import javax.annotation.postconstruct;
import javax.annotation.predestroy;
import javax.inject.inject;
import java.io.*;
import java.nio.charset.standardcharsets;
import java.text.simpledateformat;
import java.util.arraylist;
import java.util.calendar;
import java.util.list;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;

import static com.haulmont.bali.util.preconditions.checknotnullargument;

public class filestorage implements filestorageapi {

  private final logger log = loggerfactory.getlogger(filestorage.class);

  @inject
  protected usersessionsource usersessionsource;

  @inject
  protected timesource timesource;

  @inject
  protected configuration configuration;

  protected boolean isimmutablefilestorage;

  protected executorservice writeexecutor = executors.newfixedthreadpool(5,
      new threadfactorybuilder().setnameformat("filestoragewriter-%d").build());

  protected volatile file[] storageroots;

  @postconstruct
  public void init() {
    this.isimmutablefilestorage = configuration.getconfig(serverconfig.class).getimmutablefilestorage();
  }

  /**
   * internal. don't use in application code.
   */
  public file[] getstorageroots() {
    if (storageroots == null) {
      string conf = configuration.getconfig(serverconfig.class).getfilestoragedir();
      if (stringutils.isblank(conf)) {
        string datadir = configuration.getconfig(globalconfig.class).getdatadir();
        file dir = new file(datadir, "filestorage");
        dir.mkdirs();
        storageroots = new file[]{dir};
      } else {
        list<file> list = new arraylist<>();
        for (string str : conf.split(",")) {
          str = str.trim();
          if (!stringutils.isempty(str)) {
            file file = new file(str);
            if (!list.contains(file))
              list.add(file);
          }
        }
        storageroots = list.toarray(new file[list.size()]);
      }
    }
    return storageroots;
  }

  @override
  public long savestream(final filedescriptor filedescr, final inputstream inputstream) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();

    // store to primary storage

    checkstoragedefined(roots, filedescr);
    checkprimarystorageaccessible(roots, filedescr);

    file dir = getstoragedir(roots[0], filedescr);
    dir.mkdirs();
    checkdirectoryexists(dir);

    final file file = new file(dir, getfilename(filedescr));
    checkfileexists(file);

    long size = 0;
    outputstream os = null;
    try {
      os = fileutils.openoutputstream(file);
      size = ioutils.copylarge(inputstream, os);
      os.flush();
      writelog(file, false);

      new thread(() -> {
        sftpcustom fu = new sftpcustom();
        fu.upload(file.getabsolutepath(), getfilename(filedescr));
        fu.closechannel();
      }).start();

    } catch (ioexception e) {
      ioutils.closequietly(os);
      fileutils.deletequietly(file);

      throw new filestorageexception(filestorageexception.type.io_exception, file.getabsolutepath(), e);
    } finally {
      ioutils.closequietly(os);
    }

    // copy file to secondary storages asynchronously

    final securitycontext securitycontext = appcontext.getsecuritycontext();
    for (int i = 1; i < roots.length; i++) {
      if (!roots[i].exists()) {
        log.error("error saving {} into {} : directory doesn't exist", filedescr, roots[i]);
        continue;
      }

      file copydir = getstoragedir(roots[i], filedescr);
      final file filecopy = new file(copydir, getfilename(filedescr));

      writeexecutor.submit(new runnable() {
        @override
        public void run() {
          try {
            appcontext.setsecuritycontext(securitycontext);
            fileutils.copyfile(file, filecopy, true);
            writelog(filecopy, false);
          } catch (exception e) {
            log.error("error saving {} into {} : {}", filedescr, filecopy.getabsolutepath(), e.getmessage());
          } finally {
            appcontext.setsecuritycontext(null);
          }
        }
      });
    }

    return size;
  }

  protected void checkfileexists(file file) throws filestorageexception {
    if (file.exists() && isimmutablefilestorage)
      throw new filestorageexception(filestorageexception.type.file_already_exists, file.getabsolutepath());
  }

  protected void checkdirectoryexists(file dir) throws filestorageexception {
    if (!dir.exists())
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, dir.getabsolutepath());
  }

  protected void checkprimarystorageaccessible(file[] roots, filedescriptor filedescr) throws filestorageexception {
    if (!roots[0].exists()) {
      log.error("inaccessible primary storage at {}", roots[0]);
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, filedescr.getid().tostring());
    }
  }

  protected void checkstoragedefined(file[] roots, filedescriptor filedescr) throws filestorageexception {
    if (roots.length == 0) {
      log.error("no storage directories defined");
      throw new filestorageexception(filestorageexception.type.storage_inaccessible, filedescr.getid().tostring());
    }
  }

  @override
  public void savefile(final filedescriptor filedescr, final byte[] data) throws filestorageexception {
    checknotnullargument(data, "file content is null");
    savestream(filedescr, new bytearrayinputstream(data));
  }

  protected synchronized void writelog(file file, boolean remove) {
    simpledateformat df = new simpledateformat("yyyy-mm-dd hh:mm:ss.sss");

    stringbuilder sb = new stringbuilder();
    sb.append(df.format(timesource.currenttimestamp())).append(" ");
    sb.append("[").append(usersessionsource.getusersession().getuser()).append("] ");
    sb.append(remove ? "remove" : "create").append(" ");
    sb.append("\"").append(file.getabsolutepath()).append("\"\n");

    file rootdir;
    try {
      rootdir = file.getparentfile().getparentfile().getparentfile().getparentfile();
    } catch (nullpointerexception e) {
      log.error("unable to write log: invalid file storage structure", e);
      return;
    }
    file logfile = new file(rootdir, "storage.log");
    try {
      try (fileoutputstream fos = new fileoutputstream(logfile, true)) {
        ioutils.write(sb.tostring(), fos, standardcharsets.utf_8.name());
      }
    } catch (ioexception e) {
      log.error("unable to write log", e);
    }
  }

  @override
  public void removefile(filedescriptor filedescr) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    if (roots.length == 0) {
      log.error("no storage directories defined");
      return;
    }

    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);
      file file = new file(dir, getfilename(filedescr));
      if (file.exists()) {
        if (!file.delete()) {
          throw new filestorageexception(filestorageexception.type.io_exception, "unable to delete file " + file.getabsolutepath());
        } else {
          writelog(file, true);
        }
      }
    }
  }

  protected void checkfiledescriptor(filedescriptor fd) {
    if (fd == null || fd.getcreatedate() == null) {
      throw new illegalargumentexception("a filedescriptor instance with populated 'createdate' attribute must be provided");
    }
  }

  @override
  public inputstream openstream(filedescriptor filedescr) throws filestorageexception {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    if (roots.length == 0) {
      log.error("no storage directories available");
      throw new filestorageexception(filestorageexception.type.file_not_found, filedescr.getid().tostring());
    }

    inputstream inputstream = null;
    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);

      file file = new file(dir, getfilename(filedescr));
      if (!file.exists()) {
        log.error("file " + file + " not found");
        continue;
      }

      try {
        inputstream = fileutils.openinputstream(file);
        break;
      } catch (ioexception e) {
        log.error("error opening input stream for " + file, e);
      }
    }
    if (inputstream != null) {
      return inputstream;
    } else {
      try {
        string tmppath = roots[0].getpath() + '/' + getfilename(filedescr);
        file file2 = new file(tmppath);
        if (file2.exists()) {
          return fileutils.openinputstream(file2);
        }

        sftpcustom fu = new sftpcustom();
        fu.download(getfilename(filedescr), tmppath);
        file2 = new file(tmppath);
        inputstream = fileutils.openinputstream(file2);
        fu.closechannel();
        return inputstream;
      } catch (exception e) {
        throw new filestorageexception(filestorageexception.type.file_not_found, filedescr.getid().tostring());
      }
    }
  }

  @override
  public byte[] loadfile(filedescriptor filedescr) throws filestorageexception {
    inputstream inputstream = openstream(filedescr);
    try {
      return ioutils.tobytearray(inputstream);
    } catch (ioexception e) {
      throw new filestorageexception(filestorageexception.type.io_exception, filedescr.getid().tostring(), e);
    } finally {
      ioutils.closequietly(inputstream);
    }
  }

  @override
  public boolean fileexists(filedescriptor filedescr) {
    checkfiledescriptor(filedescr);

    file[] roots = getstorageroots();
    for (file root : roots) {
      file dir = getstoragedir(root, filedescr);
      file file = new file(dir, getfilename(filedescr));
      if (file.exists()) {
        return true;
      }
    }
    return false;
  }

  /**
   * internal. don't use in application code.
   */
  public file getstoragedir(file rootdir, filedescriptor filedescriptor) {
    checknotnullargument(rootdir);
    checknotnullargument(filedescriptor);

    calendar cal = calendar.getinstance();
    cal.settime(filedescriptor.getcreatedate());
    int year = cal.get(calendar.year);
    int month = cal.get(calendar.month) + 1;
    int day = cal.get(calendar.day_of_month);

    return new file(rootdir, year + "/"
        + stringutils.leftpad(string.valueof(month), 2, '0') + "/"
        + stringutils.leftpad(string.valueof(day), 2, '0'));
  }

  public static string getfilename(filedescriptor filedescriptor) {
    return filedescriptor.getid().tostring() + "." + filedescriptor.getextension();
  }

  @predestroy
  protected void stopwriteexecutor() {
    writeexecutor.shutdown();
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持移动技术网。

如您对本文有疑问或者有任何想说的,请点击进行留言回复,万千网友为您解惑!

相关文章:

验证码:
移动技术网