Java Ftp 连接池(支持多线程)

环境及版本:

  • 框架:SpringMVC 5.1.7.RELEASE
    commons-net 3.6
    commons-pool 2 2.7.0

1. pom.xml中添加依赖

 <!-- ftp -->
 <dependency>
     <groupId>commons-net</groupId>
     <artifactId>commons-net</artifactId>
     <version>3.6</version>
 </dependency>
 <!-- 使用commons-pool2 实现ftp连接池 -->
 <dependency>
     <groupId>org.apache.commons</groupId>
     <artifactId>commons-pool2</artifactId>
     <version>2.7.0</version>
 </dependency>

2. 配置文件

#ftp服务器配置
ftp.host=192.168.241.128
ftp.port=21
ftp.username=ftp_user
ftp.password=123
#超时时间(0表示一直连接)
ftp.clientTimeout=0
ftp.connectTimeout=0
#编码格式
ftp.encoding=UTF-8
#缓冲器大小
ftp.bufferSize=1024
#每次数据连接之前,ftp client告诉ftp server开通一个端口来传输数据
ftp.passiveMode=true
#连接池数量
ftp.defaultpoolsize=10


#FTP连接池配置
#最大数
ftpPool.maxTotal=50
#最小空闲
ftpPool.minIdle=0
#最大空闲
ftpPool.maxIdle=50
#最大等待时间
ftpPool.maxWait=-1
#池对象耗尽之后是否阻塞,maxWait<0时一直等待
ftpPool.blockWhenExhausted=true
#取对象是验证
ftpPool.testOnBorrow=true
#回收验证
ftpPool.testOnReturn=true
#创建时验证
ftpPool.testOnCreate=true
#空闲验证
ftpPool.testWhileIdle=false
#后进先出
ftpPool.lifo=false

3. FtpClient工厂类

package com.longway.busi.component;

import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.pool2.BasePooledObjectFactory;
import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
 * @description:
 * @title: FTP 工厂
 * @author: lrxc
 * @date: 2019/11/18 19:12
 */
@Component
public class FtpClientFactory extends BasePooledObjectFactory<FTPClient> {

    @Value("${ftp.host}")
    private String host;
    @Value("${ftp.port}")
    private int port;
    @Value("${ftp.username}")
    private String username;
    @Value("${ftp.password}")
    private String password;
    @Value("${ftp.clientTimeout}")
    private int clientTimeout;
    @Value("${ftp.connectTimeout}")
    private int connectTimeout;
    @Value("${ftp.encoding}")
    private String encoding;
    @Value("${ftp.bufferSize}")
    private int bufferSize;
    @Value("${ftp.passiveMode}")
    private boolean passiveMode;

    private final static Logger log = Logger.getLogger(FtpClientFactory.class.getName());

    /**
     * 创建FtpClient对象
     */
    @Override
    public FTPClient create() {
        FTPClient ftpClient = new FTPClient();
        ftpClient.setConnectTimeout(connectTimeout);
        try {
            ftpClient.connect(host, port);
            int replyCode = ftpClient.getReplyCode();
            if (!FTPReply.isPositiveCompletion(replyCode)) {
                ftpClient.disconnect();
                log.warn("FTPServer 连接失败,replyCode: " + replyCode);
                return null;
            }

            if (!ftpClient.login(username, password)) {
                log.warn("ftpClient 登录失败: " + username + password);
                return null;
            }
            ftpClient.setFileType(FTP.BINARY_FILE_TYPE);//文件类型
            ftpClient.setControlEncoding(encoding);
            ftpClient.setBufferSize(bufferSize);
            if (passiveMode) {
                //这个方法的意思就是每次数据连接之前,ftp client告诉ftp server开通一个端口来传输数据
                ftpClient.enterLocalPassiveMode();
            }
            ftpClient.setSoTimeout(clientTimeout);
        } catch (IOException e) {
            log.error("FtpClient 创建错误: " + e.toString());
        }
        return ftpClient;
    }

    /**
     * 用PooledObject封装对象放入池中
     */
    @Override
    public PooledObject<FTPClient> wrap(FTPClient ftpClient) {
        return new DefaultPooledObject<>(ftpClient);
    }

    /**
     * 销毁FtpClient对象
     */
    @Override
    public void destroyObject(PooledObject<FTPClient> ftpPooled) {
        if (ftpPooled == null) {
            return;
        }

        FTPClient ftpClient = ftpPooled.getObject();
        try {
            if (ftpClient.isConnected()) {
                ftpClient.logout();
            }
        } catch (Exception io) {
            log.error("销毁FtpClient错误..." + io.toString());
        } finally {
            try {
                ftpClient.disconnect();
            } catch (IOException io) {
                log.error("销毁FtpClient错误..." + io.toString());
            }
        }
    }

    /**
     * 验证FtpClient对象
     */
    @Override
    public boolean validateObject(PooledObject<FTPClient> ftpPooled) {
        try {
            FTPClient ftpClient = ftpPooled.getObject();
            return ftpClient.sendNoOp();
        } catch (IOException e) {
            log.error("验证FtpClient对象错误: " + e.toString());
        }
        return false;
    }
}

4. 创建连接池

package com.longway.busi.component;

import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.pool2.impl.GenericObjectPool;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;

/**
 * @description:
 * @title: 官方FTP连接池
 * @author: lrxc
 * @date: 2019/12/03 19:09
 */
@Component
public class FtpClientPool {

    @Value("${ftpPool.maxTotal}")
    private int maxTotal;
    @Value("${ftpPool.minIdle}")
    private int minIdle;
    @Value("${ftpPool.maxIdle}")
    private int maxIdle;
    @Value("${ftpPool.maxWait}")
    private long maxWait;
    @Value("${ftpPool.blockWhenExhausted}")
    private boolean blockWhenExhausted;
    @Value("${ftpPool.testOnBorrow}")
    private boolean testOnBorrow;
    @Value("${ftpPool.testOnReturn}")
    private boolean testOnReturn;
    @Value("${ftpPool.testOnCreate}")
    private boolean testOnCreate;
    @Value("${ftpPool.testWhileIdle}")
    private boolean testWhileIdle;
    @Value("${ftpPool.lifo}")
    private boolean lifo;

    //连接池
    private GenericObjectPool<FTPClient> ftpClientPool;

    @Autowired
    private FtpClientFactory ftpClientFactory;

    /**
     * 初始化连接池
     */
    @PostConstruct //加上该注解表明该方法会在bean初始化后调用
    public void init() {
        // 初始化对象池配置
        GenericObjectPoolConfig<FTPClient> poolConfig = new GenericObjectPoolConfig<FTPClient>();
        poolConfig.setBlockWhenExhausted(blockWhenExhausted);
        poolConfig.setMaxWaitMillis(maxWait);
        poolConfig.setMinIdle(minIdle);
        poolConfig.setMaxIdle(maxIdle);
        poolConfig.setMaxTotal(maxTotal);
        poolConfig.setTestOnBorrow(testOnBorrow);
        poolConfig.setTestOnReturn(testOnReturn);
        poolConfig.setTestOnCreate(testOnCreate);
        poolConfig.setTestWhileIdle(testWhileIdle);
        poolConfig.setLifo(lifo);

        // 初始化对象池
        ftpClientPool = new GenericObjectPool<FTPClient>(ftpClientFactory, poolConfig);
    }

    public FTPClient borrowObject() throws Exception {
        return ftpClientPool.borrowObject();
    }

    public void returnObject(FTPClient ftpClient) {
        ftpClientPool.returnObject(ftpClient);
    }
}

5. 封装ftp上传下载工具类

package com.longway.busi.component;

import org.apache.commons.io.IOUtils;
import org.apache.commons.net.ftp.FTP;
import org.apache.commons.net.ftp.FTPClient;
import org.apache.commons.net.ftp.FTPFile;
import org.apache.commons.net.ftp.FTPReply;
import org.apache.commons.net.io.CopyStreamAdapter;
import org.apache.log4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

import java.io.*;

/**
 * @description:
 * @title: 实现文件上传下载
 * @author: lrxc
 * @date: 2019/11/19 14:09
 */
@Component
public class FtpClientTemplate {

    private final static Logger log = Logger.getLogger(FtpClientTemplate.class.getName());

    @Autowired
    private FtpClientPool ftpClientPool;

    /***
     * 上传Ftp文件
     *
     * @param localFile 本地文件路径
     * @param remotePath 上传服务器路径 - (/abc/1.txt)
     * @return true or false
     */
    public boolean uploadFile(File localFile, String remotePath) {
        FTPClient ftpClient = null;
        BufferedInputStream inStream = null;
        try {
            //从池中获取对象
            ftpClient = ftpClientPool.borrowObject();
            // 验证FTP服务器是否登录成功
            int replyCode = ftpClient.getReplyCode();
            if (!FTPReply.isPositiveCompletion(replyCode)) {
                log.warn("FTP服务器校验失败, 上传replyCode:{}" + replyCode+"   "+localFile);
                return false;
            }

            //切换到上传目录
            if (!ftpClient.changeWorkingDirectory(remotePath)) {
                //如果目录不存在创建目录
                String[] dirs = remotePath.split("/");
                String tempPath = "";
                for (String dir : dirs) {
                    if (null == dir || "".equals(dir)) continue;
                    tempPath += "/" + dir;
                    if (!ftpClient.changeWorkingDirectory(tempPath)) {
                        if (!ftpClient.makeDirectory(tempPath)) {
                            return false;
                        } else {
                            ftpClient.changeWorkingDirectory(tempPath);
                        }
                    }
                }
            }

            inStream = new BufferedInputStream(new FileInputStream(localFile));
            //设置上传文件的类型为二进制类型
            ftpClient.setFileType(FTP.BINARY_FILE_TYPE);

            //尝试上传三次
            for (int j = 0; j < 3; j++) {
                //避免进度回调过于频繁
                final int[] temp = {0};
                //上传进度监控
                ftpClient.setCopyStreamListener(new CopyStreamAdapter() {
                    @Override
                    public void bytesTransferred(long totalBytesTransferred, int bytesTransferred, long streamSize) {
                        int percent = (int) (totalBytesTransferred * 100 / localFile.length());
                        if (temp[0] < percent) {
                            temp[0] = percent;
                            log.info("↑↑   上传进度    " + percent + "     " + localFile.getAbsolutePath());
                        }
                    }
                });

                boolean success = ftpClient.storeFile(localFile.getName(), inStream);
                if (success) {
                    log.info("文件上传成功! " + localFile.getName());
                    return true;
                }
                log.info("文件上传失败" + localFile.getName() + "  重试 " + j);
            }
            log.info("文件上传多次仍失败" + localFile.getName());
        } catch (Exception e) {
            log.error("文件上传错误! " + localFile.getName(), e);
        } finally {
            IOUtils.closeQuietly(inStream);
            //将对象放回池中
            ftpClientPool.returnObject(ftpClient);
        }
        return false;
    }

    /**
     * 下载文件
     *
     * @param remotePath FTP服务器文件目录
     * @param fileName   需要下载的文件名称
     * @param localPath  下载后的文件路径
     * @return true or false
     */
    public boolean downloadFile(String remotePath, String fileName, String localPath) {
        FTPClient ftpClient = null;
        OutputStream outputStream = null;
        try {
            ftpClient = ftpClientPool.borrowObject();
            // 验证FTP服务器是否登录成功
            int replyCode = ftpClient.getReplyCode();
            if (!FTPReply.isPositiveCompletion(replyCode)) {
                log.warn("FTP服务器校验失败, 下载replyCode:{}" + replyCode + "  " + localPath + "/" + fileName);
                return false;
            }

            // 切换FTP目录
            ftpClient.changeWorkingDirectory(remotePath);
            FTPFile[] ftpFiles = ftpClient.listFiles();
            for (FTPFile file : ftpFiles) {
                if (fileName.equalsIgnoreCase(file.getName())) {
                    //保存至本地路径
                    File localFile = new File(localPath + "/" + file.getName());
                    //创建父级目录
                    if (!localFile.getParentFile().exists()) {
                        localFile.getParentFile().mkdirs();
                    }

                    //尝试下载三次
                    for (int i = 0; i < 3; i++) {
                        //避免进度回调过于频繁
                        final int[] temp = {0};
                        //下载进度监控
                        ftpClient.setCopyStreamListener(new CopyStreamAdapter() {
                            @Override
                            public void bytesTransferred(long totalBytesTransferred, int bytesTransferred, long streamSize) {
                                int percent = (int) (totalBytesTransferred * 100 / file.getSize());
                                if (temp[0] < percent) {
                                    temp[0] = percent;
                                    log.info("  ↓↓ 下载进度    " + percent + "     " + localFile.getAbsolutePath());
                                }
                            }
                        });

                        outputStream = new FileOutputStream(localFile);
                        boolean success = ftpClient.retrieveFile(file.getName(), outputStream);
                        outputStream.flush();
                        if (success) {
                            log.info("文件下载成功! " + localFile.getName());
                            return true;
                        }
                        log.info("文件下载失败" + localFile.getName() + "  重试 " + i);
                    }
                    log.info("文件下载多次仍失败" + localFile.getName());
                }
            }
        } catch (Exception e) {
            log.error("文件下载错误! " + remotePath + "/" + fileName, e);
        } finally {
            IOUtils.closeQuietly(outputStream);
            ftpClientPool.returnObject(ftpClient);
        }
        return false;
    }
}

6. 测试类

package com.bxlt.test;

import com.bxlt.component.FtpClientTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import java.io.File;

@RunWith(SpringJUnit4ClassRunner.class)//用于在JUnit环境下提供Spring Test框架的功能。
@ContextConfiguration(locations = {"classpath*:*.xml"})//用来加载配置文件
public class FtpTest {

    @Autowired
    private FtpClientTemplate ftpClientTemplate;

    @Test
    public void download() {
        ftpClientTemplate.downloadFile("/aaa", "gaofeng.tgz", "E:\\aaa");
    }

    @Test
    public void upload() {
        File file = new File("E:\\aaa\\gaofeng.tgz");
        ftpClientTemplate.uploadFile(file, "/abc");
    }
}
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,539评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,594评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 165,871评论 0 356
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,963评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,984评论 6 393
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,763评论 1 307
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,468评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,357评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,850评论 1 317
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,002评论 3 338
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,144评论 1 351
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,823评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,483评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,026评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,150评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,415评论 3 373
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,092评论 2 355