Java并发编程之任务执行

串行执行任务.

在单个线程中串行执行各项任务,

class SingleThreadServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while(true) {
Socket connection = sockect.accept();
handleRequest(connection);
}
}
}

主线程在接受连接与处理相关请求操作间交替运行, 服务器在处理请求时, 新到来的连接必须等待直到请求处理完成. Web应用多为IO bound, 这种方式浪费了宝贵的CPU资源.

显式为请求创立线程

通过为每个请求创立新的线程, 实现更高的响应性.

class ThreadPerTaskWebServer {
public static void main(String[] args) throws IOException {
ServerSocket socket = new ServerSocket(80);
while (true) {
final Socket connection = socket.accept();
Runnable task = new Runnable() {
public void run() {
handleRequest(connection);
}
}
new Thread(task).start();
}
}
}

这个版本有如下几个特点:

  1. 任务从主线程分离出来, 主线程将处理连接的任务交给了子线程, 主循环不必等待任务处理结束, 从而主线程能够更快地响应新连接
  2. 任务可以并行, 从而能够同时服务多个请求, 如果有多个处理器, 能够更好地利用多处理器的优势.
  3. 任务代码必须是线程安全的, 因为有多个任务会并发调用这段代码.

无限制创建线程的不足

在生产环境中, “为每个任务分配一个线程”这种方法存在一些缺陷, 尤其是当需要创建大量线程时.

  1. 线程生命周期的开销非常高:

  2. 资源消耗: 活跃的线程会消耗系统资源, 尤其是内存, 如果可运行的线程数量多于可用的处理器数量,那么有些线程将闲置, 大量空闲的线程会占用许多内存, 给垃圾回收带来压力, 而且大量线程在竞争CPU资源时还将产生其他性能开销.

    如果已经拥有足够多的线程使CPU保持忙碌状态, 那么再创建更多的线程, 反而会降低性能.

  3. 稳定性: 可创建线程的数量存在一个限制. 这个闲置将随着平台的不同而不同. 并且受多个因素制约, 包括JVM的启动参数,

    Thread构造函数中请求的栈大小, 以及底层操作系统对线程的限制等。 如果破坏了这些限制, 那么很可能抛出OutOfMemoryError,

    要想从这种错误中恢复过来是非常危险的, 更简单的方法是通过构造程序来避免超出这些限制.

在一定范围内, 增加线程可以提高系统的吞吐率, 但如果超过欧这个范围, 再创建更多的线程只会降低程序的执行速度,

并且如果过多地创建一个线程, 整个应用程序将崩溃, 如果想避免这种危险,应该对应用程序创建的线程数量进行限制,

并且全面地测试应用程序,从而确保在线程数量达到限制时, 程序也不会耗尽资源.

Excutor 框架

串行执行的问题在于其糟糕的响应性和吞吐量, 而”为每个线程分配一个线程”的问题在于资源管理的复杂性.

java.util.concurrent提供了一种灵活的线程池实现作为Executor框架的一部分. 在Java类库中, 任务执行的主要抽象不是Thread, 而是Executor.

public interface Executor {
void execute(Runnable command);
}

Executor 基于生产者——消费者模式, 提交任务的操作相当于生产者, 执行任务的线程相当于消费者.

基于Executor的Web服务器:

class TaskExecutionWebServer {
private static final int NTHREADS = 100;
private static final Executor exec = Excutors.newFixedThreadPool(NTHREADS);

public static void main(String[] args) {
ServerSocket socket = new ServerSocket(80);
while(true) {
final Socket connection = socket.accept();
Runable task = new Runable() {
public void run() {
handelRequest(connection);
}
};
exec.execute()
}
}
}

通过Executor,可以将任务提交与执行解耦开来, 从而无需太大的困难就可以为某种类型的任务指定和修改执行策略.

  • 在什么线程执行任务?
  • 按照什么顺序执行任务 (FIFO, LIFO, 优先级)?
  • 有多少任务能并发执行
  • 在队列中有多少任务在等待执行
  • 如果系统由于过载需要拒绝一个任务, 那么应该选择哪一个任务? 另外, 如何通知应用程序有任务被拒绝.
  • 执行一个任务之前或之后需要进行哪些动作 ?

可以通过Executors中的静态工厂方法之一来创建一个线程池:

  • newFixedThreadPool: 创建一个固定长度的线程池, 每当提交一个任务是就创建一个线程, 直到达到达到线程池的最大数量, 这时线程池的规模将不再变化.

    如果某一线程由于发生了未预期的Exception而结束, 那么线程池会补充一个新的线程.

  • newCachedThreadPoll: 创建一个可缓存的线程池, 如果线程池的规模超过了处理需求时, 那么将回收空闲的线程, 而当需求增加时, 可以添加新的线程池, 线程池的规模不存在任务和限制.

  • newSingleThreadExecutor: 创建单个工作者线程来执行任务, 如果这个线程异常结束, 会创建另一个线程来替代.

  • newScheduledThreadPoll: 创建固定长度的线程池, 而且以延迟或者定时的方式执行任务, 类似于Timer

线程池解决了服务器因为创建过多线程而失败的问题, 但在足够长时间内, 如果任务到达的速度总是超过任务执行的速度, 那么服务器仍有可能耗尽内存,

因为等待执行的Runnable队列将不断增长, 可以通过使用一个有界工作队列在Excutor内部解决这个问题.

Executor生命周期

Executor采用异步方式执行任务,因此在任何时刻,之前提交的任务状态不是立即可见的. 有些可能已经完成,有些正在运行, 而其他任务可能在队列中等待执行.

为解决执行服务的生命周期问题, Executor扩展了ExecutorService接口, 添加了一些用于生命周期管理的方法(同时还有一些用于提交任务的便利方法).

pubic interface ExecutorService extends Exectuor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;
Future<T> <T> submit(Callable<T>);
Future<T> <T> submit(Runnable, T);
Future<?> submit(Runnable);
List<Future<T>> <T> invokeAll(Collection<? extends Callable<T>>);
...
}

ExecutorService的生命周期有3种状态:运行,关闭和已终止. ExecutorService初始创建处于运行状态, shutdown方法将执行平缓的关闭过程: 不再接受新的任务, 同时等待已经提交的任务执行完成——包括哪些还未开始执行的任务. shutdownNow方法将执行粗暴的关闭过程: 它将尝试取消所有运行中的任务,并且不再启动队列中尚未开始的任务.

Executor框架中, 已提交尚未开始的任务可以取消, 但是对那些已经开始执行的任务, 只有它们能够响应中断时, 才能取消.

携带结果的任务Callable与Future

Executor框架使用Runnable作为其基本的任务表达形式. Runnable是一种有很大局限的抽象,

虽然run能写入到日志文件或者将结果放入某个共享的数据结构中, 但是不能返回一个值, 或者抛出一个受检查的异常.

CompletionService

CompletionService将Executor和BlockingQueue的功能结合起来, 可以将 Callable或者Runnable任务提交(submit)给它,

使用类似于队列操作的take和poll等方法获取已完成的结果.

public class Renderer {
private final ExecutorService executor;
Renderer (ExecutorService executor) { this.executor = executor; }
void renderPage(CharSequence source) {
List<imageInfo> info = scanForImageInfo(source);
CompletionService<ImageData> completionsService = new ExecutorCompletionService<ImageData>(executor);

for (final ImageInfo imageInfo: info) {
completionService.submit(new Callable<ImageData>() {
public ImageData call() { // 下载图片任务
return imageInfo.downloadImage();
}
});
}
renderText(source);
try {
for (int t = 0, n = info.size(); t < n; t++) {
Future<ImageData> f = completionService.take(); // 阻塞操作, 可抛异常
ImageData imageData = f.get();
renderImage(ImgData);
}
} catch (InterruptedException e) { // 响应中断
Thread.currentThread.interrupt();
} catch (ExecutionException e) { // 运行错误处理
throw launderThrowable(e.getCause());
}
}
}

为任务设置时限

Future.get 支持限时, 如果在限定时间内没有得到计算结果, 将抛出 TimeoutException。 在使用限时任务时应当注意, 当这些任务超时后应当立即停止, 从而避免继续计算一个不在使用的结果而浪费资源. 为此, 可以再次使用Future, 如果限时的get方法抛出了TimeoutException, 那么可以通过Future来取消任务, 如果编写的任务是可取消的, 那么就可以提前终止它, 以免消耗过多的资源.

在指定时间获取广告信息的例子

Page renderPageWithAd() throws InterruptedException {
long endNanos = System.nonoTime() + TIME+BUDGET;
Future<Ad> f = exec.submit(new FetchAdTask{});
Page page = renderPageBody();
Ad ad;
try {
long timeLeft = endNanos - System.nanoTime();
ad = f.get(timeLeft, NANOSECONDS);
} catch (ExecutionException e) {
ad = DEFALUT_AD:
} catch (TimeoutException e) {
ad = DEFALUT_AD;
f.cancel(true); // 超时,取消任务
}
page.sendAd(ad);
return page;
}

摘自: Java Concurrency In Practice