Time :
Java并发编程艺术
Java并发编程艺术知识点总结…
Java并发编程基础
Daemon线程
是一种支持型线程,当Java虚拟机中不存在Daemon
线程时,Java虚拟机将会退出,而不会管Daemon
线程时候执行完毕。通过Thread.setDaemon(true)
设置为Daemon
线程。
线程中断
当线程被阻塞时,此时被中断会抛出InterruptedException
异常,并使中断标识符复位false
,阻塞方法有Thread.sleep(), Object.wait(), Thread.join()….,其中IO阻塞是不会抛出异常,也不会提前返回。
没被阻塞的线程,其中断标识会变为ture
。
如何处理InterruptedException
:
-
通常通过throws 抛出异常
-
如果在抛出异常时,还要做些其他处理,可以
try-catch
捕获异常 -
也可以不抛出异常,在catch里面再次
Thread.currentThread().interrupt();
中断线程
线程间的通信
volatile
& synchronized
关键字
volatile
修饰一个字段
synchronized
修饰方法或者同步快
// 修饰一个代码块
synchronized(this) {
// do something
}
// 修饰一个方法
public synchronized void method1(){
// do something
}
// 修饰一个静态方法
public synchronized static void method2(){
// do something
}
// 修饰一个类
synchronized(ClassName.class){
// do something
}
方法名 | 描述 |
---|---|
notify() | 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是获取到了对象的锁 |
notifyAll() | 通知所有等待在该对象上的线程 |
wait() | 通过该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁。 |
wait(long) | 超时等待一段时间(毫秒),没有通知则超时返回 |
wait(long,int) | int 处为纳秒 |
注意:
- 使用
wait()
,notify()
和notifyAll()
时需要先对调用对象加锁 - 调用
wait()
方法后,线程状态由RUNNING
变成WAITING
,并将当前线程放置到对象等待队列。 notify()
或 ` notifyAll()方法调用后,等待线程依旧不会从
wait()返回,需要调用
notify()或
notifyAll()的线程释放锁以后,等待线程才有机会从
wait()`返回。notify()
只通知一个,使其状态由Waiting -> Blocked
wait()
方法返回的前提是获得调用对象的锁
public class WaitNotify {
static Object lock = new Object();
public static void main(String[] args) throws InterruptedException {
Thread thread = new Thread(new Wait());
thread.start();
TimeUnit.SECONDS.sleep(2);
synchronized (lock){
lock.notify();
}
}
static class Wait implements Runnable{
@Override
public void run() {
synchronized (lock){
try {
lock.wait(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("finally");
}
}
}
}
管道输入输出流PipedOutputStream、PipedInputStream、PipedReader、PipedWrite
主要用于线程之间的数据传输,传输媒介为内存。
public class Piped {
public static void main(String[] args) throws IOException {
PipedWriter out = new PipedWriter();
PipedReader in = new PipedReader();
out.connect(in); // 将输出和输入进行连接,否则会抛出IOException
Thread printThread = new Thread(new Print(in), "PrintWrite");
printThread.start();
int receive = 0;
try {
while ((receive = System.in.read()) != 10) {
out.write(receive);
System.out.println("receive: " + receive);
}
} finally {
out.close();
}
}
static class Print implements Runnable {
private PipedReader in;
public Print(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1) { // 会一直堵塞等待接收信息,知道对方输出关闭
System.out.print((char) receive);
}
} catch (IOException e) {}
}
}
}
join() 使用:
如果线程A
执行了thread.join()
语句,其含义是:当前线程A
等待thread
线程终止后才从thread.join()
返回,也就是说把当前线程加在thread
后面,等他执行完后,才可能轮到本线程。
ThreadLocal使用:
ThreadLocal
类用来提供线程内部的局部变量。这种变量在多线程环境下访问(通过get
或set
方法访问)时能保证各个线程里的变量相对独立于其他线程内的变量,也就是说仅共享初始值,之后相互独立,大概的实现就是以当前线程为键,任意对象为值。ThreadLocal
实例通常来说都是private static
类型的,用于关联线程和线程的上下文。
public class Profiler {
// 申明一个本地变量
private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
@Override
protected Long initialValue() {
return new Long(0);
}
};
public static void main(String[] args) throws InterruptedException {
Thread thread1 = new Thread(new Thread1(TIME_THREADLOCAL),"thread1");
Thread thread2 = new Thread(new Thread2(TIME_THREADLOCAL),"thread2");
thread1.start();
thread2.start();
}
static class Thread1 implements Runnable{
private ThreadLocal<Long> threadLocal = null;
public Thread1(ThreadLocal<Long> threadLocal) {
this.threadLocal = threadLocal;
}
@Override
public void run() {
threadLocal.set(new Long(111));
System.out.println(Thread.currentThread().getName() + " : " + threadLocal.get());
}
}
static class Thread2 implements Runnable{
private ThreadLocal<Long> threadLocal = null;
public Thread2(ThreadLocal<Long> threadLocal) {
this.threadLocal = threadLocal;
}
@Override
public void run() {
threadLocal.set(new Long(222));
System.out.println(Thread.currentThread().getName() + " : " + threadLocal.get());
}
}
}
线程应用实例
- 等待超时模式,通过wait(long)来实现
- 线程池技术 (需要维护两个容器就好,一个装job,一个装job消费者,有job来就通知消费者)
Java中的锁——更加细腻的同步操作
锁
锁是面向使用者的,他定义的使用者与锁交互的接口
队列同步器
同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待、唤醒底层操作。
-
getState()
:获取当前同步状态 -
setState()
:设置当前同步状态 -
compareAndSetState(int expect,int update)
:使用CAS设置当前状态,该方法可以保证状态设置的原子性以上三个方法用于访问或者修改同步状态。
同步器可以重写的方法:
tryAcquire(int arg)
,tryRelease()
,tryAcquireShared(int arg)
,tryReleaseShared(int arg)
,isHeldExclusively()
.同步器提供的模板方法:
// 独占式获取同步状态 public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); } private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; } final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
// 共享式获取同步状态 public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); } private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
独占锁示例
import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class Mutex implements Lock { // 面向开发者 private final Sync sync = new Sync(); // 静态内部类,自定义同步器 private static class Sync extends AbstractQueuedSynchronizer{ // 是否处于占有状态 protected boolean isHeldExclusive(){ return getState() == 1; // 有且只有一个获取锁 } // 尝试性获取锁 public boolean tryAcquire(int acquires){ if (compareAndSetState(0,1)){ return true; } return false; } // 释放锁,将状态设置为0 protected boolean tryRelease(int releases){ if (getState() == 0){ throw new IllegalMonitorStateException(null); } setExclusiveOwnerThread(null);// 存放一个线程,其他线程可以安全获取 setState(0); return true; } // 返回一个Condition, 每个condition 都包含了一个condition队列 Condition newCondition(){ return new ConditionObject(); } } // 面向用户 public void lock() { sync.acquire(1); } public boolean tryLock() { return sync.tryAcquire(1); } public void unlock() { sync.tryRelease(1); } public Condition newCondition() { return sync.newCondition(); } public void lockInterruptibly() throws InterruptedException { } public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } }
共享锁示例
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
public class TwinsLock implements Lock {
private final Sync sync = new Sync(2);
private static final class Sync extends AbstractQueuedSynchronizer{
Sync(int count){
if (count <= 0) {
throw new IllegalArgumentException("count must large than zero.");
}
setState(count);
}
public int tryAcquireShared(int reduceCount){
for (;;){
int curent = getState();
int newCount = curent - reduceCount;
if (newCount < 0 || compareAndSetState(curent,newCount)){
return newCount;
}
}
}
public boolean tryReleaseShared(int returnCount){
for (;;){ // 这是跟独占锁最大的区别,需要保证解锁线安全
int current = getState();
int newCount = current + returnCount;
if (compareAndSetState(current,newCount)){
return true;
}
}
}
}
@Override
public void lock() {
sync.acquireShared(1);
}
@Override
public void unlock() {
sync.tryReleaseShared(1);
}
}
重入锁
重入锁是在方法执行时,执行线程在获取锁之后仍能连续多次地获取该锁。这也就是所谓的非公平锁机制。(书上说一般公平锁没非公平锁效率高)
其实现关键代码如下:
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread()); //设置哪个线程,用于下比较
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
abstract static class Sync extends AbstractQueuedSynchronizer {
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 如果是当前线程,继续累加状态
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 保证重入线程完全被释放
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
}
读写锁
ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
Lock r = reentrantReadWriteLock.readLock();
Lock w = reentrantReadWriteLock.writeLock();
LockSupport
工具
定义了一组公共静态方法:
void park()
, void parkNanos(long nanos)
, void parkUntil(long deadline)
, void unpark(Thread thread)
.
Condition
接口
任意一个Java对象,都有一组监视器方法(定义在java.lang.Object
上),主要包括wait(), wait(long timeout),notify(), nofityAll(),
这些方法与synchronized
同步关键字配合使用。
Condition也
提供了类似Object
的监视器方法,与Lock
配合可以实现等待/通知模式。其实现通过一个等待队列,用于实现等待或者通知。
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
condition.await(); // 须使用在锁中
lock.unlock();
ConcurrentLinkedQueue
非阻塞式安全队列
Java里的阻塞队列
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
DelayQueue
SynchronousQueue
LinkedTransferQueue
LinkedBlockingQueue
Fork/Join框架
- 分割任务
ForkJoinTast
:他提供fork(), join()来操作,其中:RecursiveAction
用于没有返回结果的任务,RecursiveTask
:用于有返回结果的任务。 - 执行任务并合并结果
ForkJoinPool
: 执行ForkJoinTast
。ForkJoinPool
由ForkJoinTask
数组和ForKJoinWorkThread
数组组成,ForkJoinTask
数组负责将存放程序提交给ForkJoinPool
的任务,而ForkJoinWorkerThread
数组负责执行这些任务。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class CountTask extends RecursiveTask<Integer> {
private static final int THRESHOLD = 2; // 阈值
private int start;
private int end;
public CountTask(int start,int end){
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
int sum = 0;
// 如果任务足够小就计算任务
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute){
for (int i = start; i <= end; i++){
sum += i;
}
}else {
// 如果大于阈值就分割任务
int middle = (start + end) / 2;
CountTask leftTask = new CountTask(start, middle);
CountTask rightTask = new CountTask(middle + 1, end);
// 执行子任务
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成计算任务
CountTask task = new CountTask(1,4000);
// 执行一个任务
Future<Integer> result = forkJoinPool.submit(task);
try {
System.out.println(result.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
}
13个原子操作类
原子跟新基本类型类
AtomicBoolean
AtomicInteger
AtomicLong
原子跟新数组
AtomicIntegerArray
AtoimcLongArray
AtomicReferenceArray
AtomicReferenceArray
原子更新引用类型
AtomicReference
AtomicReferenceFieldUpdater
AtomicMarkableReference
原子更新字段类
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicStampedReference
Java中的并发工具类
等待多线程完成的
CountDownLatch
:
static CountDownLatch c = new CountDownLatch(2); // 初始N为2
c.await(); // 阻塞当前线程,直到N为0,才被唤醒
c.countDown(); // 使N减1
同步屏障
CyclicBarrier
:
static CyclicBarrier c = new CyclicBarrier(2); // 需要两个堵塞,才能唤醒被堵塞的线程
static CyclicBarrier cc = new CyclicBarrier(2,Runnable barrierAction); // 需要两个堵塞,才能唤醒被堵塞的线程,唤醒后优先执行barrierAction这个线程
c.await(); // 堵塞当前线程,只有达到堵塞屏障2,才能被唤醒
c.reset(); // 重置状态
c.isBroken(); // 判断线程中是否有被打断的
控制并发线程数的
Semaphore
:
static Semaphore s = new Semaphore(10); // 每次并发只能10个以内
// 调用方式
public void run(){
s.acquire();
// ........
s.release();
}
线程间交换数据的
Exchanger
:
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ExhangerTest {
private static final Exchanger<String> exgr = new Exchanger<>();
private static ExecutorService threadPool = Executors.newFixedThreadPool(2);
public static void main(String[] args) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try{
String A = "bank A";
System.out.println("1: " + exgr.exchange(A));;
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
threadPool.execute(new Runnable() {
@Override
public void run() {
try{
String B = "bank B";
System.out.println("2: " + exgr.exchange(B));;
}catch (InterruptedException e){
e.printStackTrace();
}
}
});
}
}
/**结果
2: bank A
1: bank B
*/
Executor框架
executor框架结构:
任务。包括任务需要实现的接口:Runnable , Callable
public class Run implements Runnable {
@Override
public void run() {
}
}
public class Call implements Callable<String> {
@Override
public String call() throws Exception {
return null;
}
}
任务执行
执行任务机制的核心接口Executor
,以及继承自Executor
的ExecutorService
接口。其两个关键实现类为:ThreadPoolExecutor
和 ScheduledThreadPoolExecutor
。
-
ThreadPoolExecutor
适用于满足资源管理要求,而需要限制当前线程数量的应用场景,他适用于负载交重的服务器。
-
SingleThreadExecutor
-
CachedThreadPool
适用于很多短期异步任务的小程序,或者负载较轻的服务器。
-
ScheduledThreadPoolExecutor
Future
接口和实现Future
接口的FutureTask
类,代表异步计算的结果。
Future<Object> future = Executors.newFixedThreadPool(1).submit(new Callable<Object>() {
@Override
public Object call() throws Exception {
return "hello";
}
});
future.get(); // 堵塞直到获取到结果
future.cancel(true); // 任务未启动false、true都不会被执行,已启动,true中断,false不中断,已完成,返回false
参考资源: