Java多线程编程(三)

本文介绍了定时器Timer和线程的其他特性。
Timer是一个定时器,可以定时来做一次或重复任务。学习了Timer的源码,但是后来在effective java上看到Timer有一些问题,推荐DelayQueue来取代。
后面还介绍了线程的特性,比如线程的状态及其变化,还有线程组及对异常的处理等。

定时器Timer

定时/计划功能在移动开发领域使用较多,而这个功能主要就是由Timer对象实现的。它在内部使用多线程方式进行处理。下面就来看Timer是如何工作的。

定时器Timer的使用

Timer是一个调度器,里面调度的方法TimeTask由自己实现,一个简单的例子如下:

1
2
3
4
5
6
Timer timer = new Timer();  
timer.schedule(new TimerTask() {
public void run() {
System.out.println("abc");
}
}, 10000 , 1000);

上面的代码意思是在10s后每隔1s执行一次run()方法。

Timer的构造方法:

从源码的角度来学习几个构造方法。

  1. 构造方法1:无参构造方法:
    1
    2
    3
    public Timer() {
    this("Timer-" + serialNumber());
    }

内部实际上调用Timer(String name)这个构造方法。

  1. 构造方法2:设置守护线程:

    1
    2
    3
    4
    public Timer(boolean isDaemon) {
    this("Timer-" + serialNumber(), isDaemon);
    }
    如果设置为守护线程,则主线程结束后,Timer自动结束,无须使用cancel来结束。
  2. 构造方法3:带参数String的构造方法,传入Timer线程的名字

    1
    2
    3
    4
    public Timer(String name) {
    thread.setName(name);
    thread.start();
    }

这个构造方法有两个功能,一个是设置线程名字,另一个是启动线程。thread这个变量的定义是:

1
private final TimerThread thread = new TimerThread(queue);

而TimerThread这个类:

1
class TimerThread extends Thread{

从以上构造方法可知,Timer内部包装了一个线程,独立于外部线程,进行调度以及管理。TimerThread类是default类型,基本上是Timer类专用的类。

Timer常用方法

下面来看一看Timer内常用方法:

1
2
3
4
5
6
public void schedule(TimerTask task, long delay)	//经过delay ms后进行调度一次task
public void schedule(TimerTask task, Date time) //在time时间点调度一次task
public void schedule(TimerTask task, long delay, long period)//在time时间点以period的时间间隔调度task
public void schedule(TimerTask task, Date firstTime, long period)//经过delay ms后以period的时间间隔调度task
public void scheduleAtFixedRate(TimerTask task, long delay, long period)//与schedule类似,后文介绍其区别
public void scheduleAtFixedRate(TimerTask task, Date firstTime, long period)//方法同上

schedule的源码如下:

1
2
3
4
5
public void schedule(TimerTask task, long delay) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
sched(task, System.currentTimeMillis()+delay, 0);
}

这里调用了sched这个private方法,将task传入。看传入的形参列表可以发现,第二个参数是时间,那么传入Date时直接把这个Date传入就可以了。
下面来看一看重载方法schedule(TimerTask task, long delay, long period):

1
2
3
4
5
6
7
public void schedule(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, -period);
}

发现这个方法其实也是对sched方法的包装。第三个参数-period就是重复的时间。
再来看scheduleAtFixedRate这个方法:

1
2
3
4
5
6
7
public void scheduleAtFixedRate(TimerTask task, long delay, long period) {
if (delay < 0)
throw new IllegalArgumentException("Negative delay.");
if (period <= 0)
throw new IllegalArgumentException("Non-positive period.");
sched(task, System.currentTimeMillis()+delay, period);
}

发现与上一个方法基本相同,只有period的符号不同。可能是因为不愿意增加一个参数,直接用period的符号来表示那个方法。
下面来讨论Timer类的 核心:sched方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
private void sched(TimerTask task, long time, long period) {
if (time < 0)
throw new IllegalArgumentException("Illegal execution time.");

// Constrain value of period sufficiently to prevent numeric
// overflow while still being effectively infinitely large.
if (Math.abs(period) > (Long.MAX_VALUE >> 1))
period >>= 1;

synchronized(queue) {
if (!thread.newTasksMayBeScheduled)
throw new IllegalStateException("Timer already cancelled.");

synchronized(task.lock) {
if (task.state != TimerTask.VIRGIN)
throw new IllegalStateException(
"Task already scheduled or cancelled");
task.nextExecutionTime = time;
task.period = period;
task.state = TimerTask.SCHEDULED;
}

queue.add(task);
if (queue.getMin() == task)
queue.notify();
}
}

在这个操作中,对queue做了同步,可以认为是线程安全的。从以上源码可以看出,schd主要任务是把nextExecutionTime、period、state这三个参数传入task中,并将task添加到queue中,并对queue做一次notify()操作。
简而言之,就是将参数传入task并将其放入queue中。我们现在开始看看queue的实现:

1
private final TaskQueue queue = new TaskQueue();

继续来看一看TaskQueue()的实现:

1
2
3
4
class TaskQueue {  
private TimerTask[] queue = new TimerTask[128];
private int size = 0;
...

可见,TaskQueue的结构非常简单,内部是一个数组,只要task的数目低于128是不会扩容的。其中还有很多方法,提供了类似于ArrayList的方法来管理,内部有很多排序之类的处理。
好了,此时可以得到这样的关系:一个Timer内部包装了“一个Thread”和“一个Task”队列,这个队列按照一定的方式将任务排队处理,包含的线程在Timer的构造方法调用时被启动,这个Thread的run方法无限循环这个Task队列,若队列为空且没发生cancel操作,此时会一直等待,如果等待完成后,队列还是为空,则认为发生了cancel从而跳出死循环,结束任务;循环中如果发现任务需要执行的时间小于系统时间,则需要执行,那么根据任务的时间片从新计算下次执行时间,若时间片为0代表只执行一次,则直接移除队列即可。

Timer已经被取代

太坑爹了。。学习了Timer后才在java并发编程实践中看到Timer这一章。书上介绍,TimerTask抛出异常时Timer并不会捕获,这就导致Timer异常结束,而使用者根本就不知道这个事情,使得已经被调度的线程和还未执行的线程都不会执行。这就是所谓的“线程泄漏”。在JDK5.0以后的版本中,使用DelayQueue来代替Timer。DelayQueue实现了BlockingQueue,管理一组Delay对象,可以提供安全的操作。

线程池扩展

  • Executors.newScheduledThreadPool
    来完成对调度队列中的线程池的处理,内部通过new ScheduledThreadPoolExecutor来创建线程池的Executor的创建,当然也可以调用:
    *Executors.unconfigurableScheduledExecutorService
    方法来创建一个DelegatedScheduledExecutorService其实这个类就是包装了下下scheduleExecutor,也就是这只是一个壳,英文理解就是被委派的意思,被托管的意思。

参考Timer与TimerTask的真正原理&使用介绍

线程的其他知识

按照书上第七章来进行补漏。

线程的状态

线程的状态储存在Thread的State枚举内部类当中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public enum State {
/**
* Thread state for a thread which has not yet started.
*/

NEW,

/**
* Thread state for a runnable thread. A thread in the runnable
* state is executing in the Java virtual machine but it may
* be waiting for other resources from the operating system
* such as processor.
*/

RUNNABLE,

/**
* Thread state for a thread blocked waiting for a monitor lock.
* A thread in the blocked state is waiting for a monitor lock
* to enter a synchronized block/method or
* reenter a synchronized block/method after calling
*/

BLOCKED,

/**
* Thread state for a waiting thread.
* A thread is in the waiting state due to calling one of the
* following methods:
*
* <p>A thread in the waiting state is waiting for another thread to
* perform a particular action.
*
* For example, a thread that has called <tt>Object.wait()</tt>
* on an object is waiting for another thread to call
* <tt>Object.notify()</tt> or <tt>Object.notifyAll()</tt> on
* that object. A thread that has called <tt>Thread.join()</tt>
* is waiting for a specified thread to terminate.
*/

WAITING,

/**
* Thread state for a waiting thread with a specified waiting time.
* A thread is in the timed waiting state due to calling one of
* the following methods with a specified positive waiting time:
*/

TIMED_WAITING,

/**
* Thread state for a terminated thread.
* The thread has completed execution.
*/

TERMINATED;
}

java的线程状态与经典的线程状态略有不同。经典线程状态是创建、就绪、运行、阻塞、挂起、终止。java线程模型是这样的:NEW是新建的状态;RUNNABLE状态包括了就绪和运行态,这种状态下线程即使暂停也是系统的原因;BLOCKED表示阻塞态,等待其他线程释放锁;WAITING指的是当前线程已经持有锁,调用wait()方法,等待其他线程将其唤醒;TIMED_WAITING是有时间限制的等待,即调用wait(long)方法后线程的状态;TERMINATED则表示线程的终止状态。

进入不同的线程状态

NEW

新建线程在启动之前的状态就是NEW:

1
2
3
4
public static void newState() {
Thread t = new Thread();
System.out.println(t.getState());
}

RUNNABLE

线程启动,未停止的状态就是RUNNABLE:

1
2
3
4
5
6
7
8
9
public static void runnableState() {
Thread t = new Thread() {
public void run() {
while (true);
}
};
t.start();
System.out.println(t.getState());
}

BLOCKED

当一个线程获取锁,另一个线程等待时就是BLOCKED状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void blockedState() {
final Object lock = new Object();
Runnable run = new Runnable() {
@Override
public void run() {
for(int i=0; i<100; i++){
synchronized (lock) {
System.out.println(Thread.currentThread().getName() + i);
}
}
}
};
Thread t1 = new Thread(run);
t1.setName( "t1");
Thread t2 = new Thread(run);
t2.setName( "t2");
t1.start();
t2.start();
System.out.println(t1.getState());
System.out.println(t2.getState());
}

WAITING

线程主动等待的状态就是WAITING

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void waitingState() throws InterruptedException {
final Object lock = new Object();
Thread t = new Thread() {
@Override
public void run() {
synchronized (lock) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
};
t.start();
Thread.sleep(1000);
System.out.println(t.getState());
}

TIMED_WAITING

只需将上一段代码改为lock.wait(2000);即可显示这个状态。

TERMINATED

线程结束后就是终止状态。

线程状态的作用

了解线程状态又有什么用呢?这是JVM进阶必备的知识,在系统中找到性能瓶颈、调优时作用巨大。在工具jstack中,可以观察线程的执行状态。如果某个线程经常wait,那么可能是notify有一定的问题,需要进行调优。当然,这属于相当高级的知识了,具体的实施以后在工作中慢慢来学习~

参考java线程的几种状态

线程组

线程组是java线程所特有的概念。java中,线程组指的是java.lang.ThreadGroup类的对象,每个线程隶属于唯一一个线程组。这个线程组在线程创建时指定,并在线程生命周期内不可以更改。可以通过调用包含ThreadGroup类型参数的Thread类构造方法来指定线程所述线程组。如果没有指定, 线程默认属于main的系统线程组 。除了系统线程,所有线程组都必须显示创建。
java中除了系统线程外的每个线程组又隶属于另一个线程组,可以在创建线程组时指定隶属的线程组。缺省为main。这样,所有的线程组组成了一个以main为根的线程树。
线程组管理一组线程,包括启动、中断、枚举等方法。用法如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public class _2ThreadGroup {
public static void main(String[] args) {
ThreadGroup threadGroup = new ThreadGroup("Searcher");

Result result = new Result();
SearchTask searchTask = new SearchTask(result);

for (int i = 0; i < 5; i++) {
Thread thread = new Thread(threadGroup, searchTask);
thread.start();
}

//get message
System.out.println("Number of Thread is: " + threadGroup.activeCount());
System.out.println("Information about the ThreadGroup");
threadGroup.list();

//get the state of thread
Thread[] threads = new Thread[threadGroup.activeCount()];
threadGroup.enumerate(threads);
for (int i = 0; i < threadGroup.activeCount(); i++)
System.out.println("Thread " + threads[i].getName()
+ ":" + threads[i].getState());

waitFinish(threadGroup);
threadGroup.interrupt();
}

private static void waitFinish(ThreadGroup threadGroup) {
while (threadGroup.activeCount() > 9) {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

class Result {
private String name;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
}
// 线程,随机进行睡眠
class SearchTask implements Runnable {
private Result result;
public SearchTask(Result result) {
super();
this.result = result;
}

private void doTask() {
Random random = new Random();
int value = random.nextInt(10);
System.out.printf("Thread %s sleep %d\n", Thread.currentThread().getName(), value);
try {
TimeUnit.SECONDS.sleep(value);
} catch (InterruptedException e) {
System.out.println(Thread.currentThread().getName() + " is interrupted");
}
}

@Override
public void run() {
String name = Thread.currentThread().getName();
System.out.printf("Thread %s: Start\n", name);
doTask();
result.setName(name);
System.out.printf("Thread %s: End\n",name);
}
}

对线程异常的处理

线程中出现的异常可以用专门的线程处理,只需要调用Thread的setDefaultUncaughtExceptionHandler方法进行处理。示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class _3Exception {
public static void main(String[] args) {
Thread t = new Thread() {
@Override
public void run() {
int i = 8 / 0;
}
};
Thread.setDefaultUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override
public void uncaughtException(Thread t, Throwable e) {
System.out.println("Thread " + t.getName() + " has exception");
e.printStackTrace();
}
});
t.start();
}
}

概念参考于第六部分 线程组