定义
线程是现代操作系统调度的最小单元,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。
处理器在这些线程上高速切换,让使用者感觉这些线程在同时执行。
1、优先级
线程可以通过一个整型变量priority
来控制优先级,范围从 1~10,默认为5,优先级高的线程分配的时间片的数量要多于优先级低的线程,但很多操作系统忽略掉了Java的线程优先级,所以基本上用不到。
2、状态
状态名称 | 说明 |
---|---|
NEW | 初始状态,线程被构建,但是没有调用start() 方法 |
RUNNABLE | 运行状态,java线程将操作系统中的就绪和运行两种状态并称为“运行中” |
BLOCKED | 阻塞状态,表示线程阻塞于锁 |
WAITING | 等待状态,改状态表示当前线程需要等待其他线程做出一些特定动作 |
TIME_WAITING | 超时等待状态,该状态不同于WAITING,它是可以在指定的时间自行返回的 |
TERMINATED | 终止状态,表示当前线程已经执行完毕 |
3、中断
中断可以理解为线程的一个标识位属性,它标识一个运行中的线程是否被其他线程进行了中断操作。
其他线程调用该线程的interrupt()
方法中断该线程,此时通过isInterrupted()
方法来判断该线程的中断状态时,返回的是true
,若通过静态方法Thread.interrupted()
方法对当前线程的中断标识位进行复位,则isInterrupted()
方法返回的是false
。
同时,对于睡眠状态的线程执行interrupt()
方法,中断标志位不会被设置,只有正在执行的线程,被中断时才会设置。
4、等待/通知
方法名称 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait() 方法上返回,前提是该线程获取到了对象的锁 |
notifyAll() | 通知所有等待在对象上的线程 |
wait() | 调用该方法的线程会进入WAITING 状态,只有等待另外线程的通知或被中断才会返回,调用此方法后,会释放对象的锁 |
wait(long) | 超时等待一段时间,这里的参数是毫秒,如果没有通知就返回 |
wait(long,int) | 对于超时时间更细粒度的控制 |
涉及到的API:
方法名称 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait() 方法上返回,前提是该线程获取到了对象的锁 |
notifyAll() | 通知所有等待在对象上的线程 |
wait() | 调用该方法的线程会进入WAITING 状态,只有等待另外线程的通知或被中断才会返回,调用此方法后,会释放对象的锁 |
wait(long) | 超时等待一段时间,这里的参数是毫秒,如果没有通知就返回 |
wait(long,int) | 对于超时时间更细粒度的控制 |
下面通过一段代码 ,来了解加等待\唤醒机制。
public class WaitNotify {
static boolean flag = true;
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Runnable wait = () -> {
synchronized (lock) {
while (flag) {
System.out.println(Thread.currentThread() + " flag is true. wait @ " + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME) );
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread() + " flag is false. running @ "+ LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
}
};
Runnable notify = () -> {
synchronized (lock) {
System.out.println(Thread.currentThread() + "hold lock. notify @ " + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
lock.notifyAll();
flag = false;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
synchronized (lock) {
System.out.println(Thread.currentThread() + " hold lock again. sleep @ " + LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Thread waitThread = new Thread(wait, "WaitThread" );
waitThread.start();
TimeUnit.SECONDS.sleep(1);
Thread notifyThread = new Thread(notify, "NotifyThread" );
notifyThread.start();
}
}
代码中定义了一个锁对象,一个状态标识,具体的执行过程如下:
- waitThread线程获取到
lock
对象的锁,然后执行wait()
方法,并释放锁; - notifyThread线程获取到
lock
对象的锁,然后执行notifyAll()
方法,此时由于还没释放锁,所以waitThread并不会继续执行,当同步代码块执行结束后,notifyThread的第二个同步代码块与waitThread继续“争抢”lock
对象的锁; - 抢到锁的对象继续执行,整个程序执行完毕。
按照以上内容,会输出如下日志,其中第三、四条由于锁争抢的原因,位置可能调换:
Thread[WaitThread,5,main] flag is true. wait @ 2017-12-07T20:28:38.962
Thread[NotifyThread,5,main]hold lock. notify @ 2017-12-07T20:28:39.912
Thread[WaitThread,5,main] flag is false. running @ 2017-12-07T20:28:44.913
Thread[NotifyThread,5,main] hold lock again. sleep @ 2017-12-07T20:28:45.913
通知唤醒机制的经典使用场景是消费者与生产者。
当消费者获取到锁,并判断条件不满足时,便调用锁的wait()
方法,当生产者改变了条件时,就通过notifyAll
来进行通知。
消费者遵循如下原则:
- 获取对象的锁;
- 如果条件不满足,则调用
wait()
方法; - 如果条件满足,则执行对应的逻辑。
生产者遵循如下逻辑:
- 获取对象的锁;
- 改变条件;
- 通知所有等待在对象上的线程。
还可以将wait()
方法替换为wait(long)
方法,增加超时控制,以免造成线程永久阻塞调用者的情况。
5、synchronized与volatile
最简单的区分
synchronized:原子性,线程安全
volatile:可见性,不保证原子性,线程不安全
所以,如果有数据仅要求多线程共享,而不涉及原子性改动时,可以考虑添加volatile
关键字,但并不建议大量添加,以免影响性能。
示例
通过以上内容,应该对Java线程有了基本的了解,下面通过超时等待\唤醒机制来编写一个建议的数据库连接池。
思路:
- 能定义连接线程池容量;
- 可以获取和释放连接;
- 具备超时机制,以免永久阻塞。
线程池代码:
public class ConnectionPool {
private LinkedList<Connection> pool = new LinkedList<>();;
//创建时设定容量
public ConnectionPool(int initialSize){
for(int i=0;i<initialSize;i++){
pool.addLast(ConnectionDriver.createConnection());
}
}
public void releaseConnection(Connection connection){
if(connection != null){
synchronized (pool) {
pool.addLast(connection);
pool.notifyAll();
}
}
}
//若拿不到连接,则进入超时等待
public Connection fetchConnection(long mills) throws InterruptedException {
synchronized (pool) {
if(mills < 0) {
while(pool.isEmpty()){
pool.wait();
}
return pool.removeFirst();
}
long future = System.currentTimeMillis() + mills;
long remaining = mills;
while(pool.isEmpty() && remaining > 0){
pool.wait(remaining);
remaining = future - System.currentTimeMillis();
}
Connection result = null;
if(!pool.isEmpty()){
result = pool.removeFirst();
}
return result;
}
}
}
下面创建一个代理类来模拟数据库连接:
public class ConnectionDriver {
static class ConnectionHandler implements InvocationHandler {
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
if(method.getName().equals("commit")){
TimeUnit.MILLISECONDS.sleep(100);
}
return null;
}
}
public static final Connection createConnection(){
return (Connection) Proxy.newProxyInstance(ConnectionDriver.class.getClassLoader()
,new Class<?>[]{Connection.class}, new ConnectionHandler());
}
}
最后,实际测试一下:
public class ConnectionPoolTest {
static ConnectionPool pool = new ConnectionPool(10);
static CountDownLatch start = new CountDownLatch(1);
static CountDownLatch end;
static class ConnectionRunner implements Runnable {
int count;
AtomicInteger got;
AtomicInteger notGot;
public ConnectionRunner(int count, AtomicInteger got, AtomicInteger notGot) {
this.count = count;
this.got = got;
this.notGot = notGot;
}
public static void main(String[] args) throws InterruptedException {
int threadCount = 10;
end = new CountDownLatch(5);
int count = 20;
AtomicInteger got = new AtomicInteger();
AtomicInteger notGot = new AtomicInteger();
for (int i=0; i< threadCount ; i++) {
Thread thread = new Thread(new ConnectionRunner(count,got,notGot),"ConnecntionRunnerThread");
thread.start();
}
start.countDown();
end.await();
System.out.println("total invoke:" + threadCount * count);
System.out.println("got connection:" + got.get());
System.out.println("not got connection:" + notGot.get());
}
@Override
public void run() {
try {
start.await();
} catch (InterruptedException e) {
}
while (count > 0){
try {
Connection connection = pool.fetchConnection(1000);
if(connection != null){
try {
connection.createStatement();
connection.commit();
} finally {
pool.releaseConnection(connection);
got.incrementAndGet();
}
} else {
notGot.incrementAndGet();
}
} catch (Exception e) {
e.printStackTrace();
} finally {
count--;
}
}
end.countDown();
}
}
}
注
以上内容整理自《Java 并发编程的艺术》一书,源码使用Java 8编写,与原文略有差异。