Last Modified: 2003.05.15 Util.concurrent工具包概述Doug
Lea
State University of New York at Oswego
http://gee.cs.oswego.edu 翻译: Cocia Lin([email protected]) Huihoo.org 原文 http://gee.cs.oswego.edu/dl/cpjslides/util.pdf 要点
--目标和结构 --主要的接口和实现 Sync:获得/释放(acquire/release)
协议 Channel:放置/取走(put/take)
协议 Executor:执行Runnable任务 --每一个部分都有一些关联的接口和支持类 --简单的涉及其他的类和特性 目标
--一些简单的接口 -但是覆盖大部分程序员需要小心处理代码的问题 -- 高质量实现 -正确的,保守的,有效率的,可移植的 --可能作为将来标准的基础 -获取经验和收集反馈信息 Sync
-- acquire/release协议的主要接口 -用来定制锁,资源管理,其他的同步用途 - 高层抽象接口 - 没有区分不同的加锁用法 --实现 -Mutex,
ReentrantLock, Latch, CountDown,Semaphore, WaiterPreferenceSemaphore,FIFOSemaphore,
PrioritySemaphore n
还有,有几个简单的实现,例如ObservableSync, LayeredSync 独占锁
try { lock.acquire(); try { action(); } finally { lock.release(); } } catch (InterruptedException
ie) { ... } -- Java同步块不适用的时候使用它 - 超时,回退(back-off) - 确保可中断 - 大量迅速锁定 - 创建Posix风格应用(condvar) 独占例子
class ParticleUsingMutex { int x; int y; final Random rng = new Random(); final Mutex mutex = new Mutex(); public void move() { try { mutex.acquire(); try { x +=
rng.nextInt(2)-1; y += rng.nextInt(2)-1; } finally { mutex.release(); } } catch (InterruptedException
ie) { Thread.currentThread().interrupt(); } } public void draw(Graphics g) { int lx, ly; try { mutex.acquire(); try { lx = x; ly = y;
} finally { mutex.release(); } } catch (InterruptedException
ie) { Thread.currentThread().interrupt(); return; } g.drawRect(lx, ly, 10, 10); } } 回退(Backoff)例子
class CellUsingBackoff { private long val; private final Mutex mutex = new Mutex(); void swapVal(CellUsingBackoff other) throws InterruptedException { if (this == other) return; // alias check for (;;) { mutex.acquire(); try { I f (other.mutex.attempt(0)) { try { long t = val; val = other.val; other.val = t; return; } finally { other.mutex.release();
} } } finally { mutex.release(); }; Thread.sleep(100); // heuristic retry
interval } } } 读写锁
interface ReadWriteLock { Sync readLock(); Sync writeLock(); } -- 管理一对锁 - 和普通的锁一样的使用习惯 --
对集合类很有用 -半自动的方式实现SyncSet, SyncMap, ... --
实现者使用不同的锁策略 - WriterPreference, ReentrantWriterPreference, ReaderPreference, FIFO ReadWriteLock例子
-- 示范在读写锁中执行任何Runnable的包装类 class WithRWLock { final ReadWriteLock rw; public WithRWLock(ReadWriteLock l) { rw = l;
} public void performRead(Runnable readCommand) throws InterruptedException { rw.readLock().acquire(); try { readCommand.run(); } finally { rw.readlock().release(); } } public void performWrite(...) // similar } 闭锁(Latch)
--
闭锁是开始时设置为false,但一旦被设置为true,他将永远保持true状态 - 初始化标志 - 流结束定位 - 线程中断 - 事件出发指示器 --
CountDown和他有点类似,不同的是,CountDown需要一定数量的触发设置,而不是一次 --
非常简单,但是广泛使用的类 - 替换容易犯错的开发代码 Latch
Example 闭锁例子
class Worker implements Runnable { Latch startSignal; Worker(Latch l) { startSignal = l; } public void run() { startSignal.acquire(); // ... doWork(); } } class Driver { // ... void main() { Latch ss = new Latch(); for (int i = 0; i < N; ++i) // make
threads new Thread(new Worker(ss)).start(); doSomethingElse(); // don’t let run yet ss.release(); // now let all
threads proceed } } 信号(Semaphores)
--
服务于数量有限的占有者 - 使用许可数量构造对象(通常是0) - 如果需要一个许可才能获取,等待,然后取走一个许可 - 释放的时候将许可添加回来 --
但是真正的许可并没有转移(But no actual permits change hands.) - 信号量仅仅保留当前的计数值 --
应用程序 - 锁:一个信号量可以被用作互斥体(mutex) - 一个独立的等待缓存或者资源控制的操作 - 设计系统是想忽略底层的系统信号 -- (phores ‘remember’ past signals)记住已经消失的信号量 信号量例子
class Pool { ArrayList items = new ArrayList(); HashSet busy = new HashSet(); final Semaphore available; public Pool(int n) { available = new Semaphore(n); // ... somehow initialize n items ...; } public Object getItem() throws InterruptedException
{ available.acquire(); return doGet(); } public void returnItem(Object x) { if (doReturn(x)) available.release(); } synchronized Object doGet() { Object x = items.remove(items.size()-1); busy.add(x); // put in set to check returns return x; } synchronized boolean doReturn(Object x) { return busy.remove(x); // true if was
present } } 屏障(Barrier)
-- 多部分同步接口 - 每一部分都必须等待其他的分不撞倒屏障 --
CyclicBarrier类 - CountDown的一个可以重新设置的版本 - 对于反复划分算法很有用(iterative partitioning algorithms) --
Rendezvous类 - 一个每部分都能够和其他部分交换信息的屏障 - 行为类似同时的在一个同步通道上put和take - 对于资源交换协议很有用(resource-exchange protocols) 通道(Channel)
--为缓冲,队列等服务的主接口 -- 具体实现 - LinkedQueue, BoundedLinkedQueue,BoundedBuffer,
BoundedPriorityQueue,SynchronousChannel, Slot 通道属性
--
被定义为Puttable和Takable的子接口 - 允许安装生产者/消费者模式执行 --
支持可超时的操作offer和poll - 当超时值是0时,可能会被阻塞 - 所有的方法能够抛出InterruptedException异常 --
没有接口需要size方法 - 但是一些实现定义了这个方法 - BoundedChannel有capacity方法 通道例子
class Service { // ... final Channel msgQ = new LinkedQueue(); public void serve() throws
InterruptedException { String status = doService(); msgQ.put(status); } public Service() { // start background thread Runnable logger = new Runnable() { public void run() { try { for(;;) System.out.println(msqQ.take()); } catch(InterruptedException ie) {} } }; new Thread(logger).start(); } } 运行器(Executor)
--
类似线程的类的主接口 - 线程池 - 轻量级运行框架 - 可以定制调度算法 --
只需要支持execute(Runnable r) - 同Thread.start类似 --
实现 - PooledExecutor, ThreadedExecutor,QueuedExecutor,
FJTaskRunnerGroup - 相关的ThreadFactory类允许大多数的运行器通过定制属性使用线程 PooledExecutor
--
一个可调的工作者线程池,可修改得属性如下: - 任务队列的类型 - 最大线程数 - 最小线程数 - 预热(预分配)和立即(分配)线程 - 保持活跃直到工作线程结束 --
以后如果需要可能被一个新的代替 - 饱和(Saturation)协议 --
阻塞,丢弃,生产者运行,等等 PooledExecutor例子
class WebService { public static void main(String[] args) { PooledExecutor pool = new PooledExecutor(new BoundedBuffer(10), 20); pool.createThreads(4); try { ServerSocket socket = new ServerSocket(9999); for (;;) { final Socket connection = socket.accept(); pool.execute(new Runnable() { public void run() { new Handler().process(connection); }}); } } catch(Exception e) { } // die } } class Handler { void process(Socket s); } 前景(Future)和可调用(Callable)
-- Callabe是类似于Runnable的接口,用来作为参数和传递结果 interface Callable { Object call(Object arg) throws Exception; } -- FutureResult管理Callable的异步执行 class FutureResult { // ... // block caller until result is ready public Object get() throws InterruptedException,
InvocationTargetException; public void set(Object result); // unblocks
get // create Runnable that can be used with an
Executor public Runnable setter(Callable function); } FutureResult例子
class ImageRenderer { Image render(byte[]
raw); } class App { // ... Executor executor = ...; // any
executor ImageRenderer renderer = new ImageRenderer(); public void display(byte[] rawimage) { try { FutureResult futureImage = new FutureResult(); Runnable cmd = futureImage.setter(new Callable(){ public Object call() { return renderer.render(rawImage); }}); executor.execute(cmd); drawBorders(); // do other things while
executing drawCaption(); drawImage((Image)(futureImage.get())); // use
future } catch (Exception ex) { cleanup(); return; } } } 其他的类
--
CopyOnWriteArrayList - 支持整个集合复制时每一个修改的无锁访问 - 适合大多数的多路广播应用程序 --
工具包还包括了一个java.beans多路广播类的COW版本 --
SynchronizedDouble,
SynchronizedInt,SynchronizedRef, etc - 类似于java.lang.Double,提供可变操作的同步版本.例如,addTo,inc - 添加了一些象swap,commit这样的实用操作 未来计划
--
并发数据构架 - 一组繁重线程连接环境下有用的工具集合 --支持侧重I/O的程序 - 事件机制的IO系统 --
小版本的实现 - 例如SingleSourceQueue --小幅度的改善 - 使运行器更容易使用 --
替换 - JDK1.3 java.util.Timer 被ClockDaemon取代 |