控制并发流程

什么是控制并发流程

定义

  1. 控制并发流程的工具类,作用就是帮助我们程序员更容易的让线程间合作
  2. 让线程间相互配合,来满足业务逻辑(如线程A等待线程B执行完毕再执行)

控制并发流程的工具类

作用 说明
Semaphore 信号量,可以通过控制“许可证”的数量,来保证线程间的配合 线程只有在拿到“许可证”后才能继续运行。相比于其他的同步器,更灵活
CyclicBarrier 线程会等待,直到足够多线程达到了实现规定的数目,一旦达到了触发条件,就可以进行下一步操作 适用于线程之间相互等待处理结果就绪的场景
Phaser 和CyclicBarrier类似,但是计数可变 Java7加入的
CountDownLatch 和CyclicBarrier类似,数量递减到0时,触发动作 不可重复使用
Exchanger 让两个线程在合适时交换对象 适用场景:当两个线程工作在同一个类的不同实例上时,用于交换数据
Condition 可以控制线程的“等待”和唤醒 是Object.wait()的升级版

为什么要控制并发流程

在不控制并发流程的情况下,所有的线程是尽可能的跑,并且受线程调度器控制,有的时候,我们要求有些任务必须先执行,有些任务必须在其他任务执行结束以后在执行,那就必须控制并发流程

CountDownLatch倒计时门闩

作用

倒数结束之前,一直处于等待状态,直到倒计时结束了,此流程才继续工作

主要方法介绍

  1. CountDownLatch(int count):仅有一个构造函数,参数count为需要倒数的数值
  2. await();调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
  3. countDown():将count值减1,直到为0,等待的线程会被唤醒

典型用法1

一个线程等待多个线程都执行完毕,再继续自己的工作

  /**
     * 描述: 工厂中,质检5个工人检查,所有人都认为通过,才通过
     */
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            service.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep((long)(Math.random() * 10000));
                        System.out.println("No" + no + "完成了检查");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        latch.countDown();
                    }
                }
            });
        }
        System.out.println("等待5个人检查完毕");
        latch.await();
        System.out.println("5个人检查完毕");
        service.shutdown();
    }

结果

等待5个人检查完毕
No2完成了检查
No1完成了检查
No3完成了检查
No4完成了检查
No5完成了检查
5个人检查完毕

典型用法2

多个线程等待某一个线程的信号,同时开始执行

 /**
     * 描述: 模拟100米跑步。5名选手准备完成,只等裁判员一声令下,所有人哦同时开始跑步
     */
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            service.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("No." + no + "准备完毕,等待发令枪");
                    try {
                        begin.await();
                        System.out.println("No." + no + "开始跑步");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

                }
            });
        }
        Thread.sleep(5000);
        begin.countDown();
        System.out.println("发令枪响,比赛开始!");
        service.shutdown();
    }

结果

No.3准备完毕,等待发令枪
No.5准备完毕,等待发令枪
No.1准备完毕,等待发令枪
No.2准备完毕,等待发令枪
No.4准备完毕,等待发令枪
发令枪响,比赛开始!
No.5开始跑步
No.3开始跑步
No.2开始跑步
No.1开始跑步
No.4开始跑步

组合使用

 /**
     * 描述: 模拟100米跑步。5名选手准备完成,只等裁判员一声令下,所有人同时开始跑步,等待所有人到达终点后,比赛结束
     */
    public static void main(String[] args) throws InterruptedException {
        CountDownLatch begin = new CountDownLatch(1);
        CountDownLatch end = new CountDownLatch(5);
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            final int no = i + 1;
            service.submit(new Runnable() {
                @Override
                public void run() {
                    try {
                        System.out.println("No." + no + "准备完毕");
                        begin.await();
                        System.out.println("No." + no + "开始跑步了");
                        Thread.sleep((long) (Math.random() * 10000));
                        System.out.println("No." + no + "到达终点");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        end.countDown();
                    }

                }
            });
        }
        Thread.sleep(5000);
        begin.countDown();
        System.out.println("发令枪响,比赛开始!");
        end.await();
        System.out.println("所有运动员都到达终点,比赛结束");
        service.shutdown();
    }

结果

No.3准备完毕
No.5准备完毕
No.4准备完毕
No.1准备完毕
No.2准备完毕
发令枪响,比赛开始!
No.4开始跑步了
No.5开始跑步了
No.3开始跑步了
No.2开始跑步了
No.1开始跑步了
No.1到达终点
No.4到达终点
No.3到达终点
No.2到达终点
No.5到达终点
所有运动员都到达终点,比赛结束

注意点

CountDownLatch是不能够重用的,如果需要重新计数,可以考虑使用CyclicBarrier或者创建新的CountDownLatch实例。

总结

两个典型用法:一等多和多等一
CountDownLatch类在创建实例的时候,需要传递倒数次数。倒数到0的时候,之前等待的线程会继续运行

Semaphore信号量

Semaphore可以用来限制或管理数量有限的资源的使用情况
信号量的作用是维护一个“许可证”的计数,线程可以获取许可证,那信号量剩余的许可证就减一,线程也可以释放一个许可证,那么信号量的许可证就加一,当信号量所拥有的许可证数量为0,那么下一个还想要获取许可证的线程,就需要等待,直到有另外的线程释放了许可证

信号量使用流程

  1. 初始化Semaphore并指定许可证的数量
  2. 在需要被现在的代码前加acquire()或者acquireUninteerruptibly()方法
  3. 在任务执行结束后,调用release()来释放许可证

主要方法结束

  • new Semaphore(int permits,boolean fair):这里可以设置是否要使用公平策略,如果传入true,那么Semaphore会把之前等待的线程放到FIFO的队列中,以便于当有了新的许可证,可以分发给之前等了最长时间的线程。
  • acquire():获取一个“许可证”,可以响应中断
  • acquireUninteerruptibly():获取一个“许可证”,不响应中断
  • tryAcquire():看看现在有没有空闲的许可证,如果有的话就获取,如果没有的话也没有关系,我不必陷入阻塞,我可以去做别的事,过一会儿再来查看许可证的空闲情况
  • tryAcquire(timeout):和tryAcquire()一样,但是多了一个超时时间,比如“在3秒内获取不到许可证,我就去做别的事”。
  • release():归还许可证

代码

public class SemaphoreDemo {

    static Semaphore semaphore = new Semaphore(2, true);

    public static void main(String[] args) {
        ExecutorService service = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 5; i++) {
            service.submit(new Task());
        }
        service.shutdown();
    }
    static class Task implements Runnable {
        @Override
        public void run() {
            try {
                // semaphore.acquire(3);  可以一次拿多个信号量,信号量不够就等待
                semaphore.acquire();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "拿到了许可证");
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() + "释放了许可证");
            semaphore.release();
            //semaphore.release(3);可以一次释放多个信号量,允许超过初始值
        }
    }
}

结果

pool-1-thread-1拿到了许可证
pool-1-thread-2拿到了许可证
pool-1-thread-2释放了许可证
pool-1-thread-1释放了许可证
pool-1-thread-5拿到了许可证
pool-1-thread-3拿到了许可证
pool-1-thread-3释放了许可证
pool-1-thread-5释放了许可证
pool-1-thread-4拿到了许可证
pool-1-thread-4释放了许可证

特殊用法

一次性获取或释放多个许可证

比如TaskA会调用很消耗资源的method1(),而TaskB调用的是不太消耗资源的method2(),假设我们一共有5个许可证。那么我们就可以要求TaskA获取5个许可证才能执行,而TaskB只需要获取到一个许可证就能执行,这样就避免了A和B同时运行的情况,我们可以根据自己的需求合理分配资源

注意点

  1. 获取和释放的许可证数量必须一致,否则比如每次都获取两个但是只是放一个甚至不释放,随着时间的推移,到最后许可证数量不够用,会导致程序卡死(虽然信号量类并不对是否和获取的数量做规定,但是这是我们的编码规范,否则容易出错)

  2. 注意在初始化Semaphore的时候设置公平性,一般设置为true会更合理

  3. 并不是必须由获取许可证的线程释放那个许可证,事实上,获取和释放许可证对线程并无要求,也许是A获取了,然后B释放,只要逻辑合理即可

  4. 信号量的作用,除了控制临界区最多同时又N个线程访问外,另一个作用是可以实现“条件等待”,例如线程1需要在线程2完成准备工作以后才能开始工作,那么线程1acquire(),而线程2完成任务后release(),这样的话,相当于是轻量级的CountDownLatch。

Condition(又称为条件对象)

作用

当线程1需要等待某个条件的时候,他就去执行condition.await(),一旦执行了await()方法,线程就会进入阻塞状态
然后通常会又另外一个线程,假设是线程2,去执行对应的条件,直到这个条件达成的时候,线程2就回去执行condition.signal()方法,这是JVM就会总被阻塞的线程里找,找到哪些等待改condition的线程,当线程1收到可执行信号的时候,他的线程状态就会变成Runnable可执行状态

signal()和signalAll()区别

  • signalAll()会唤起所有的正在等待的线程
  • 但是signal()是公平的,只会唤起那个等待时间最长的线程

普通用法

public class ConditionDemo1 {
    private ReentrantLock lock = new ReentrantLock();
    private Condition condition = lock.newCondition();
    void method1() throws InterruptedException {
        lock.lock();
        try {
            System.out.println("条件不满足,开始休眠");
            condition.await();
            System.out.println("条件满足,开始执行");
        } finally {
            lock.unlock();
        }
    }

    void method2() {
        lock.lock();
        try {
            System.out.println("准备工作完成,唤醒任务");
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ConditionDemo1 demo1 = new ConditionDemo1();
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(1000);
                    demo1.method2();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
        demo1.method1();
    }
}

结果

条件不满足,开始休眠
准备工作完成,唤醒任务
条件满足,开始执行

使用Condition实现生产者消费者模式

public class ConditionDemo2 {

    private int queueSize = 5;
    private PriorityQueue queue = new PriorityQueue<>(queueSize);
    private Lock lock = new ReentrantLock();
    private Condition notFull = lock.newCondition();
    private Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ConditionDemo2 demo2 = new ConditionDemo2();

        Produce produce = demo2.new Produce();
        Consumer consumer = demo2.new Consumer();
        produce.start();
        consumer.start();
    }

    class Consumer extends Thread {

        @Override
        public void run() {
            consumer();
        }

        private void consumer() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == 0) {
                        try {
                            notEmpty.await();//队列为空
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("队列空,等待数据进来");
                    }
                    queue.poll();
                    // 消费了一个数据,队列中空闲了一个位置,可以继续添加了
                    notFull.signalAll();
                    System.out.println("从队列中取走了一个元素,队列中剩余" + queue.size() + "个未消费");
                } finally {
                    lock.unlock();
                }
            }
        }
    }

    class Produce extends Thread {

        @Override
        public void run() {
            produce();
        }

        private void produce() {
            while (true) {
                lock.lock();
                try {
                    while (queue.size() == queueSize) {
                        try {
                            notFull.await();//队列满
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("队列满,等待位置空闲");
                    }
                    queue.add(1);
                    // 添加了一个数据,消费者可以消费了
                    notEmpty.signalAll();
                    System.out.println("向队列插入了一个元素,队列中剩余空间" + (queueSize - queue.size()));
                } finally {
                    lock.unlock();
                }
            }
        }
    }
}

结果

从队列中取走了一个元素,队列中剩余0个未消费
向队列插入了一个元素,队列中剩余空间4
队列空,等待数据进来
从队列中取走了一个元素,队列中剩余0个未消费
向队列插入了一个元素,队列中剩余空间4
队列空,等待数据进来
从队列中取走了一个元素,队列中剩余0个未消费
向队列插入了一个元素,队列中剩余空间4
向队列插入了一个元素,队列中剩余空间3
向队列插入了一个元素,队列中剩余空间2
向队列插入了一个元素,队列中剩余空间1
向队列插入了一个元素,队列中剩余空间0
队列空,等待数据进来
从队列中取走了一个元素,队列中剩余4个未消费
从队列中取走了一个元素,队列中剩余3个未消费
从队列中取走了一个元素,队列中剩余2个未消费
从队列中取走了一个元素,队列中剩余1个未消费
从队列中取走了一个元素,队列中剩余0个未消费
队列满,等待位置空闲
... ...

线程交替运行,直到终止程序

Condition注意点

  • 实际上,如果说Lock用来代替synchronized,那么Condition就是用来代替相应的Object.wait/notigy的,所以在用法和性质上,几乎都一样
  • await方法会自动释放持有的Lock锁,和Object.wait一样,不需要自己手动释放锁
  • 调用await的时候,必须持有锁,否则会抛出异常,和Object.wait一样

CyclicBarrier循环栅栏

CyclicBarrier循环栅栏和countDownLatch很类似,都能阻塞一组线程
当有大量线程相互配合,分别计算不同任务,并且最后统一汇总的时候,我们可以使用CyclicBarrier,CyclicBarrier可以构造一个集结点,当某一个线程执行完毕,他就会到集结点等待,直到所有线程都到了集结点,那么该栅栏就被撤销,所有线程再统一出发,继续执行剩下的任务

public class CyclicBarrierDemo {

    public static void main(String[] args) {
        // 需要等待的线程数量,等待数量足够了以后,需要干什么
        CyclicBarrier cyclicBarrier = new CyclicBarrier(5, new Runnable() {
            @Override
            public void run() {
                System.out.println("所有人都到场了,大家统一出发");
            }
        });
        for (int i = 0; i < 5; i++) {
            new Thread(new Task(i, cyclicBarrier)).start();
        }

    }

    static class Task implements Runnable {
        int id;
        CyclicBarrier cyclicBarrier;

        public Task(int id, CyclicBarrier cyclicBarrier) {
            this.id = id;
            this.cyclicBarrier = cyclicBarrier;
        }

        @Override
        public void run() {
            System.out.println("线程" + id + "开始出发前往集合地点");
            try {
                Thread.sleep((long) (Math.random() * 10000));
                System.out.println("线程" + id + "到达集合地点");
                cyclicBarrier.await();
                System.out.println("线程" + id + "出发了");
            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

        }
    }

}

结果

线程1开始出发前往集合地点
线程2开始出发前往集合地点
线程0开始出发前往集合地点
线程3开始出发前往集合地点
线程4开始出发前往集合地点
线程3到达集合地点
线程2到达集合地点
线程1到达集合地点
线程0到达集合地点
线程4到达集合地点
所有人都到场了,大家统一出发
线程4出发了
线程3出发了
线程1出发了
线程2出发了
线程0出发了

CyclicBarrier和CountDownLatch的区别

  • 作用不同:CyclicBarrier要等固定数量的线程都达到了栅栏位置才能继续执行,而CountDownLatch只需要等待数字到0,也就是说CountDownLatch用于事件,但是CyclicBarrier是用于线程的
  • 可重用性不同:CountDownLatch在倒数到0并触发门闩打开后,就不能再次使用了,除非新建新的实例,而CyclicBarrier可以重复使用

发表回复

您的电子邮箱地址不会被公开。 必填项已用*标注