Java并发编程艺术

Java并发编程艺术知识点总结…

Java并发编程基础

Daemon线程

是一种支持型线程,当Java虚拟机中不存在Daemon线程时,Java虚拟机将会退出,而不会管Daemon线程时候执行完毕。通过Thread.setDaemon(true)设置为Daemon线程。

线程中断

当线程被阻塞时,此时被中断会抛出InterruptedException异常,并使中断标识符复位false,阻塞方法有Thread.sleep(), Object.wait(), Thread.join()….,其中IO阻塞是不会抛出异常,也不会提前返回。

没被阻塞的线程,其中断标识会变为ture

如何处理InterruptedException:

  1. 通常通过throws 抛出异常

  2. 如果在抛出异常时,还要做些其他处理,可以try-catch捕获异常

  3. 也可以不抛出异常,在catch里面再次Thread.currentThread().interrupt();中断线程

线程间的通信

volatile & synchronized 关键字

volatile修饰一个字段

synchronized修饰方法或者同步快

// 修饰一个代码块
synchronized(this) {
         // do something
}
// 修饰一个方法
public synchronized void method1(){
    // do something
}
// 修饰一个静态方法
public synchronized static void method2(){
    // do something
}
// 修饰一个类
synchronized(ClassName.class){
    // do something
}
方法名 描述
notify() 通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提是获取到了对象的锁
notifyAll() 通知所有等待在该对象上的线程
wait() 通过该方法的线程进入WAITING状态,只有等待另外线程的通知或被中断才会返回,需要注意,调用wait()方法后,会释放对象的锁。
wait(long) 超时等待一段时间(毫秒),没有通知则超时返回
wait(long,int) int 处为纳秒

注意:

public class WaitNotify {
    static Object lock = new Object();
    public static void main(String[] args) throws InterruptedException {
        Thread thread = new Thread(new Wait());
        thread.start();
        TimeUnit.SECONDS.sleep(2);
        synchronized (lock){
            lock.notify();
        }
    }

    static class Wait implements Runnable{
        @Override
        public void run() {
            synchronized (lock){
                try {
                    lock.wait(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("finally");
            }
        }
    }
}

管道输入输出流PipedOutputStream、PipedInputStream、PipedReader、PipedWrite

主要用于线程之间的数据传输,传输媒介为内存。

public class Piped {
    public static void main(String[] args) throws IOException {
        PipedWriter out = new PipedWriter();
        PipedReader in = new PipedReader();
        out.connect(in);	// 将输出和输入进行连接,否则会抛出IOException
        Thread printThread = new Thread(new Print(in), "PrintWrite");
        printThread.start();
        int receive = 0;
        try {
            while ((receive = System.in.read()) != 10) {
                out.write(receive);
                System.out.println("receive: " + receive);
            }
        } finally {
            out.close();
        }
    }
    static class Print implements Runnable {
        private PipedReader in;

        public Print(PipedReader in) {
            this.in = in;
        }

        @Override
        public void run() {
            int receive = 0;
            try {
                while ((receive = in.read()) != -1) { 	// 会一直堵塞等待接收信息,知道对方输出关闭
                    System.out.print((char) receive);
                }
            } catch (IOException e) {}
        }
    }
}
join() 使用:

​ 如果线程A 执行了thread.join()语句,其含义是:当前线程A 等待thread线程终止后才从thread.join()返回,也就是说把当前线程加在thread后面,等他执行完后,才可能轮到本线程。

ThreadLocal使用:

ThreadLocal类用来提供线程内部的局部变量。这种变量在多线程环境下访问(通过getset方法访问)时能保证各个线程里的变量相对独立于其他线程内的变量,也就是说仅共享初始值,之后相互独立,大概的实现就是以当前线程为键,任意对象为值ThreadLocal实例通常来说都是private static类型的,用于关联线程和线程的上下文。

public class Profiler {
    // 申明一个本地变量
    private static final ThreadLocal<Long> TIME_THREADLOCAL = new ThreadLocal<Long>() {
        @Override
        protected Long initialValue() {
            return new Long(0);
        }
    };
    
    public static void main(String[] args) throws InterruptedException {
        Thread thread1 = new Thread(new Thread1(TIME_THREADLOCAL),"thread1");
        Thread thread2 = new Thread(new Thread2(TIME_THREADLOCAL),"thread2");
        thread1.start();
        thread2.start();
    }
    
    static class Thread1 implements Runnable{
        private ThreadLocal<Long> threadLocal = null;

        public Thread1(ThreadLocal<Long> threadLocal) {
            this.threadLocal = threadLocal;
        }
        @Override
        public void run() {
            threadLocal.set(new Long(111));
            System.out.println(Thread.currentThread().getName() + " : " + threadLocal.get());
        }
    }

    static class Thread2 implements Runnable{
        private ThreadLocal<Long> threadLocal = null;
        public Thread2(ThreadLocal<Long> threadLocal) {
            this.threadLocal = threadLocal;
        }
        @Override
        public void run() {
            threadLocal.set(new Long(222));
            System.out.println(Thread.currentThread().getName() + " : " + threadLocal.get());
        }
    }
}
线程应用实例
  1. 等待超时模式,通过wait(long)来实现
  2. 线程池技术 (需要维护两个容器就好,一个装job,一个装job消费者,有job来就通知消费者)

Java中的锁——更加细腻的同步操作

锁是面向使用者的,他定义的使用者与锁交互的接口

队列同步器

同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待、唤醒底层操作。

  1. getState():获取当前同步状态

  2. setState():设置当前同步状态

  3. compareAndSetState(int expect,int update):使用CAS设置当前状态,该方法可以保证状态设置的原子性

    以上三个方法用于访问或者修改同步状态。

    同步器可以重写的方法:

    tryAcquire(int arg), tryRelease(), tryAcquireShared(int arg), tryReleaseShared(int arg), isHeldExclusively().

    同步器提供的模板方法:

    // 独占式获取同步状态
    public final void acquire(int arg) {
            if (!tryAcquire(arg) &&
                acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
                selfInterrupt();
        }
    private Node addWaiter(Node mode) {
            Node node = new Node(Thread.currentThread(), mode);
            // Try the fast path of enq; backup to full enq on failure
            Node pred = tail;
            if (pred != null) {
                node.prev = pred;
                if (compareAndSetTail(pred, node)) {
                    pred.next = node;
                    return node;
                }
            }
            enq(node);
            return node;
        }
    final boolean acquireQueued(final Node node, int arg) {
            boolean failed = true;
            try {
                boolean interrupted = false;
                for (;;) {
                    final Node p = node.predecessor();
                    if (p == head && tryAcquire(arg)) {
                        setHead(node);
                        p.next = null; // help GC
                        failed = false;
                        return interrupted;
                    }
                    if (shouldParkAfterFailedAcquire(p, node) &&
                        parkAndCheckInterrupt())
                        interrupted = true;
                }
            } finally {
                if (failed)
                    cancelAcquire(node);
            }
        }
    
    // 共享式获取同步状态
    public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)
                doAcquireShared(arg);
        }
    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }
    

    独占锁示例

    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.AbstractQueuedSynchronizer;
    import java.util.concurrent.locks.Condition;
    import java.util.concurrent.locks.Lock;
    
    public class Mutex implements Lock {
        // 面向开发者
        private final Sync sync = new Sync();
        // 静态内部类,自定义同步器
        private static class Sync extends AbstractQueuedSynchronizer{
            // 是否处于占有状态
            protected boolean isHeldExclusive(){
                return getState() == 1; // 有且只有一个获取锁
            }
            // 尝试性获取锁
            public boolean tryAcquire(int acquires){
                if (compareAndSetState(0,1)){
                    return true;
                }
                return false;
            }
            // 释放锁,将状态设置为0
            protected boolean tryRelease(int releases){
                if (getState() == 0){
                    throw new IllegalMonitorStateException(null);
                }
                setExclusiveOwnerThread(null);// 存放一个线程,其他线程可以安全获取
                setState(0);
                return true;
                   
            }
            // 返回一个Condition, 每个condition 都包含了一个condition队列
            Condition newCondition(){
                return new ConditionObject();
            }
               
        }    
        // 面向用户
        public void lock() {
            sync.acquire(1);
        }
    
        public boolean tryLock() {
            return sync.tryAcquire(1);
        }
    
        public void unlock() {
            sync.tryRelease(1);
        }
    
        public Condition newCondition() {
            return sync.newCondition();
        }
    
        public void lockInterruptibly() throws InterruptedException {
    
        }
    
        public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
            return false;
        }
    }
    

    共享锁示例

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class TwinsLock implements Lock {
    private final Sync sync = new Sync(2);
    private static final class Sync extends AbstractQueuedSynchronizer{
        Sync(int count){
            if (count <= 0) {
                throw new IllegalArgumentException("count must large than zero.");
            }
            setState(count);
        }
        public int tryAcquireShared(int reduceCount){
            for (;;){
                int curent = getState();
                int newCount = curent - reduceCount;
                if (newCount < 0 || compareAndSetState(curent,newCount)){
                    return newCount;
                }
            }
        }
        public boolean tryReleaseShared(int returnCount){
            for (;;){					// 这是跟独占锁最大的区别,需要保证解锁线安全
                int current = getState();
                int newCount = current + returnCount;
                if (compareAndSetState(current,newCount)){
                    return true;
                }
            }
        }
    }
    @Override
    public void lock() {
        sync.acquireShared(1);
    }

    @Override
    public void unlock() {
        sync.tryReleaseShared(1);
    }
}

重入锁

重入锁是在方法执行时,执行线程在获取锁之后仍能连续多次地获取该锁。这也就是所谓的非公平锁机制。(书上说一般公平锁没非公平锁效率高)

其实现关键代码如下:

static final class NonfairSync extends Sync {
        private static final long serialVersionUID = 7316153563782823691L;
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread()); //设置哪个线程,用于下比较
            else
                acquire(1);
        }
        protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
        }
    }

abstract static class Sync extends AbstractQueuedSynchronizer {
    final boolean nonfairTryAcquire(int acquires) {		
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {	// 如果是当前线程,继续累加状态
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {			// 保证重入线程完全被释放
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }   
}

读写锁

ReentrantReadWriteLock reentrantReadWriteLock = new ReentrantReadWriteLock();
Lock r = reentrantReadWriteLock.readLock();
Lock w = reentrantReadWriteLock.writeLock();

LockSupport工具

定义了一组公共静态方法:

void park(), void parkNanos(long nanos), void parkUntil(long deadline), void unpark(Thread thread).

Condition接口

任意一个Java对象,都有一组监视器方法(定义在java.lang.Object上),主要包括wait(), wait(long timeout),notify(), nofityAll(), 这些方法与synchronized同步关键字配合使用。

Condition也提供了类似Object的监视器方法,与Lock配合可以实现等待/通知模式。其实现通过一个等待队列,用于实现等待或者通知。

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
lock.lock();
condition.await();   // 须使用在锁中
lock.unlock();

ConcurrentLinkedQueue非阻塞式安全队列

Java里的阻塞队列

  1. ArrayBlockingQueue
  2. LinkedBlockingQueue
  3. PriorityBlockingQueue
  4. DelayQueue
  5. SynchronousQueue
  6. LinkedTransferQueue
  7. LinkedBlockingQueue

Fork/Join框架

  1. 分割任务 ForkJoinTast:他提供fork(), join()来操作,其中:RecursiveAction用于没有返回结果的任务,RecursiveTask:用于有返回结果的任务。
  2. 执行任务并合并结果 ForkJoinPool: 执行ForkJoinTastForkJoinPoolForkJoinTask数组和ForKJoinWorkThread数组组成,ForkJoinTask数组负责将存放程序提交给ForkJoinPool的任务,而ForkJoinWorkerThread数组负责执行这些任务。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class CountTask extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 2; // 阈值
    private int start;
    private int end;
    public CountTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected Integer compute() {
        int sum = 0;
        // 如果任务足够小就计算任务
        boolean canCompute = (end - start) <= THRESHOLD;
        if (canCompute){
            for (int i = start; i <= end; i++){
                sum += i;
            }
        }else {
            // 如果大于阈值就分割任务
            int middle = (start + end) / 2;
            CountTask leftTask = new CountTask(start, middle);
            CountTask rightTask = new CountTask(middle + 1, end);
            // 执行子任务
            leftTask.fork();
            rightTask.fork();
            // 等待子任务执行完,并得到其结果
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();

            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        // 生成计算任务
        CountTask task = new CountTask(1,4000);
        // 执行一个任务
        Future<Integer> result = forkJoinPool.submit(task);
        try {
            System.out.println(result.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

13个原子操作类

原子跟新基本类型类
原子跟新数组
原子更新引用类型
原子更新字段类

Java中的并发工具类

等待多线程完成的CountDownLatch:

static CountDownLatch c = new CountDownLatch(2); // 初始N为2
c.await();	// 阻塞当前线程,直到N为0,才被唤醒
c.countDown(); // 使N减1

同步屏障CyclicBarrier:

static CyclicBarrier c = new CyclicBarrier(2); // 需要两个堵塞,才能唤醒被堵塞的线程
static CyclicBarrier cc = new CyclicBarrier(2,Runnable barrierAction); // 需要两个堵塞,才能唤醒被堵塞的线程,唤醒后优先执行barrierAction这个线程
c.await(); // 堵塞当前线程,只有达到堵塞屏障2,才能被唤醒
c.reset(); // 重置状态
c.isBroken(); // 判断线程中是否有被打断的

控制并发线程数的Semaphore:

static Semaphore s = new Semaphore(10);  // 每次并发只能10个以内
// 调用方式
public void run(){
    s.acquire();
   // ........
   s.release();
}

线程间交换数据的Exchanger:

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExhangerTest {
    private static final Exchanger<String> exgr = new Exchanger<>();
    private static ExecutorService threadPool = Executors.newFixedThreadPool(2);

    public static void main(String[] args) {
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    String A = "bank A";
                    System.out.println("1: " + exgr.exchange(A));;
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        });
        threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try{
                    String B = "bank B";
                    System.out.println("2: " + exgr.exchange(B));;
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            }
        });
    }
}
/**结果
2: bank A
1: bank B
*/

Executor框架

executor框架结构:

任务。包括任务需要实现的接口:Runnable , Callable

public class Run implements Runnable {
    @Override
    public void run() {
    }
}
public class Call implements Callable<String> {
    @Override
    public String call() throws Exception {
        return null;
    }
}

任务执行

执行任务机制的核心接口Executor,以及继承自ExecutorExecutorService接口。其两个关键实现类为:ThreadPoolExecutorScheduledThreadPoolExecutor

Future接口和实现Future接口的FutureTask类,代表异步计算的结果。

Future<Object> future = Executors.newFixedThreadPool(1).submit(new Callable<Object>() {
            @Override
            public Object call() throws Exception {
                return "hello";
            }
        });
        future.get();   // 堵塞直到获取到结果
        future.cancel(true);    // 任务未启动false、true都不会被执行,已启动,true中断,false不中断,已完成,返回false

参考资源:

Java 理论与实践: 处理 InterruptedException

《Java并发编程的艺术》