介绍
什么是线程池?
线程的创建开销很大,如果每一次线程使用后就丢弃,等下次需要使用的再重新创建,则带来的浪费就很严重。线程池解决了线程复用的问题,线程在执行完任务以后并不是立即销毁,而是继续保留,可以在后续执行其他任务。
概览
实践1
异步读写文件 使用singleThreadPool
任务描述: 现在有一个很大的csv文件(21G), 内容是1990-2016年的专利数据,希望将2012年及以后的数据提取出来,写入一个新的csv文件中。关键点是:
- 如果读一行写一行,这样总计1000万行以上的数据,时间将会很长。解决:将读取的文件写入cache中,当cache满了以后,把cache的数据交给写文件线程,cache清空,继续读取文件。
- write线程虽然和read线程是独立的,但是write线程必须始终只有一个。如果多个的话写文件的顺序会乱掉,多个线程竞争IO,也未必会带来性能的提升。(一个引出问题:多线程写文件,线程锁的问题)
- 用singleThreadPool控制写文件操作,能够保证写文件线程始终是同一个,如果新的任务提交而旧的任务还没有完成,singleThreadPool也可以阻塞等待(如何实现的?)
master, 核心部分
package Thread.multi_rw;
import java.io.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 对大文件(21G csv)在多线程环境下读写
* 将年份时间是在2012以后的按照行写入新的文件
*/
public class WriteAndReadMaster {
private File inputFile = null;
private String outputFileName = null;
private LineCache cache;
private int readExceptionCount = 0;
public WriteAndReadMaster(String sourcePath, String outfileName) {
inputFile = new File(sourcePath);
outputFileName = outfileName;
this.cache = new LineCache();
}
public void execute() {
System.out.println("Start to execute...");
long startTime = System.currentTimeMillis();
try {
BufferedReader reader = new BufferedReader(new FileReader(inputFile));
ExecutorService singleThreadPool = Executors.newSingleThreadExecutor();
String line = null;
// 此处一行行读取的,考虑是否可以一次读取多行?
while (reader.ready() && ((line = reader.readLine()) != null)) {
String[] contents = line.split(",\"");
String data;
try {
data = contents[7];
} catch (ArrayIndexOutOfBoundsException e) {
System.err.println("index exception, ignore it");
this.readExceptionCount += 1;
continue;
}
if (data != null) {
try {
String tmp = data.substring(0, 4);
int year = Integer.parseInt(tmp);
if (year >= 2012) {
cache.push(line);
}
} catch (Exception e) {
System.err.println("line error, ignore it..");
this.readExceptionCount += 1;
continue;
}
if (cache.isCacheFull()) {
WriteFileTask writeFileTask = new WriteFileTask(outputFileName,
cache.getData());
singleThreadPool.submit(writeFileTask);
cache.cleanUp();
}
}
}
WriteFileTask writeFileTask = new WriteFileTask(outputFileName,
cache.getData());
singleThreadPool.submit(writeFileTask);
cache.cleanUp();
} catch (FileNotFoundException e) {
System.err.println("file not found error");
} catch (IOException e) {
e.printStackTrace();
} finally {
System.err.println("total read exception:" + this.readExceptionCount);
}
}
}
cache 缓存类
package Thread.multi_rw;
import java.util.ArrayList;
public class LineCache {
private final int MAX_CACHE_LINE = 10000;
private volatile int cnt;
private ArrayList<String> data = null;
public LineCache(){
this.data = new ArrayList<>(MAX_CACHE_LINE);
this.cnt = 0;
}
public LineCache cleanUp(){
this.data = new ArrayList<>(MAX_CACHE_LINE);
this.cnt = 0;
return this;
}
public boolean isCacheFull(){
if(data.size() >= MAX_CACHE_LINE){
return true;
}
else return false;
}
public void push(String line){
data.add(line);
}
public synchronized ArrayList<String> getData() {
return data;
}
}
负责写文件的Runable 对象
package Thread.multi_rw;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
public class WriteFileTask implements Runnable {
private String fileName = null;
private static volatile int totalWriteLines = 0;
private ArrayList<String> writeData = null;
public volatile static int exceptionCount = 0;
public WriteFileTask(String fileName, ArrayList<String> data) {
this.fileName = fileName;
this.writeData = data;
System.err.println("data size: " + data.size());
}
@Override
public void run() {
int cnt = 0;
try {
BufferedWriter writer = new BufferedWriter(new FileWriter(fileName, true));
for (String line : this.writeData) {
writer.write(line);
writer.write('\n');
totalWriteLines += 1;
cnt += 1;
}
} catch (IOException e) {
System.err.println("Write Exception");
this.exceptionCount += 1;
e.printStackTrace();
} finally {
System.out.println("线程: " + Thread.currentThread() +
" 写入: " + cnt + "行,总计写入: " + totalWriteLines + "行");
System.err.println("总计写入错误: " + exceptionCount);
}
}
}