CountDownLatch:允许一个或多个线程 等待其他线程完成操作
CyclicBarrier:当一组线程达到同步点时被阻塞,直到最后一个线程到达同步点时,所有被拦截的线程才会继续运行
Semaphore:信号量,用来控制同时访问特定资源的线程数量,它通过协调各线程以保证合理的使用公共资源
Exchanger:用于进行线程间的数据交换
CountDownLatch允许一个或多个线程等待其他线程执行完成。CountDownLatch的一个非常典型的应用场景是:有一个任务想要往下执行,但必须要等到其他的任务执行完毕后才可以继续往下执行。
构造函数:
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}
CountDownLatch的构造函数可以传入一个int类型的参数N,表示需要等待N个点完成。调用CountDownLatch的countDown()方法时,N会减1。CountDownLatch的await方法会阻塞当前线程,直到N变为0.
CountDownLatch实例:
/**
* 十个人分成两组比赛,每组5个,当发令响时,开始比赛
* @author Administrator
*
*/
public class CountDownLatchDemo {
private static final int GROUP_SIZE=5;
private static final Random RANDOM = new Random();
private static void processOneGroup(final String groupName){
//等待所以选手准备就绪
final CountDownLatch preCountDownLatch = new CountDownLatch(GROUP_SIZE);
//等待比赛开始
final CountDownLatch startCountDownLatch = new CountDownLatch(1);
//等待比赛结束
final CountDownLatch endCountDownLatch = new CountDownLatch(GROUP_SIZE);
System.out.println(groupName+",比赛开始:");
for (int i = 0; i < GROUP_SIZE; i++) {
new Thread(String.valueOf(i)){
public void run() {
preCountDownLatch.countDown();//准备完毕
System.out.println("第"+GROUP_SIZE+"组,第"+this.getName()+"号选手,已准备完毕!~");
try {
//等待裁判发出开始指令
startCountDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
try {
//运行一个随机时间,表示选手实际的比赛时间
Thread.sleep(Math.abs(RANDOM.nextInt())%1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//完成比赛
endCountDownLatch.countDown();
};
}.start();
}
try {
preCountDownLatch.await();//等待所有的选手准备完毕
} catch (InterruptedException e) {
e.printStackTrace();
}
startCountDownLatch.countDown();//开始比赛
try {
endCountDownLatch.await();//等待所有的选手结束比赛
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(groupName+" 比赛结束!");
}
public static void main(String[] args) {
processOneGroup("分组1");
processOneGroup("分组2");
}
}
输出结果:
countDown方法的实现如下:
/**
* 减少锁存器的计数,如果计数达到零,则释放所有等待的线程。
* 如果当前计数大于零,那么它被递减。如果新计数为零,则所有等待的线程被重新启用用于线程调度目的。如果当前计数等于零,则什么也不发生。
*/
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
protected boolean tryReleaseShared(int releases) {
// 递减计数; 直到信号转换为零时
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
/**
* 确保释放传播,即使有其他进行中的获取/释放。 如果它需要信号,则以通常的方式尝试解除头的处理器。 但如果不是,状态设置为PROPAGATE以确保在释放后,传播继续。 另外,我们必须循环,以便在我们这样做的时候添加一个新的节点。 此外,与unparkSuccessor的其他用法不同,我们需要知道CAS是否重置状态失败,如果这样重新检查。
*/
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
CountDownLatch与Thread.join()的区别;
调用thread.join() 方法必须等thread 执行完毕,当前线程才能继续往下执行,而CountDownLatch通过计数器提供了更灵活的控制,只要检测到计数器为0当前线程就可以往下执行而不用管相应的thread是否执行完毕.
CyclicBarrier可以让一组线程到达一个同步点时被阻塞,直到最后一个线程到达同步点时,所有被阻塞的线程才会继续运行。
默认构造方法:parties表示阻塞在同步点的线程数。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;//表示仍在等待的线程数,只要一个线程到达同步点,count就减1,直到0为止
this.barrierCommand = barrierAction;//当线程全部到达同步点时,优先执行barrierAction
}
每个线程调用await方法,告诉CyclicBarrier已经到达同步点,然后当前线程被阻塞.
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
CyclicBarrier实例如下:
/**
* 用一个Excel保存用户所有银行流水,每个Sheet保存一个账号近一年的每笔银行
* 流水,现在需要统计用户的日军银行流水,先用多线程处理每个sheet,都执行完
* 之后,得到每个sheet的日均银行流水,最后用barrierAction技术整个Excel的
* 日均银行流水
* @author Administrator
*
*/
public class CyclicBarrierDemo implements Runnable{
//
private CyclicBarrier cyclicBarrier = new CyclicBarrier(4,this);
//假设只有4个sheet,启动4个线程
private Executor executor = Executors.newFixedThreadPool(4);
//保存每个sheet计算结果
private ConcurrentHashMap<String,Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count(){
for (int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
//业务操作,计算sheet的银行流水数据,假设结果为1,代码省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
try {
cyclicBarrier.await();
} catch (Exception e) {
}
}
});
}
}
//汇总操作
@Override
public void run() {
int result = 0;
//汇总每个sheet计算的结果
for (Entry<String,Integer> entry : sheetBankWaterCount.entrySet()) {
result+=entry.getValue();
}
//将结果输出
System.out.println(result);
}
public static void main(String[] args) {
CyclicBarrierDemo demo = new CyclicBarrierDemo();
demo.count();
}
}
结果输出:
4
CyclicBarrier和CountDownLatch的区别:
CountDownLatch到达计算器只能使用一次,而CyclicBarrier的计数器可以使用reset()方法重置,所有CyclicBarrier可以使用更复杂的业务场景。
CyclicBarrier还提供了getNumberWaiting方法可以获得阻塞的线程数量。isBroken可以获得阻塞的线程是否被中断
需要注意的是,如果CyclicBarrier的parriers数比时间的线程数大时,那么这组线程将会被一直阻塞
Semaphore是用来控制同时访问特定资源 的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore可以用于做流量控制,特别是公有资源有限的应用场景,比如数据库链接。
Semaphore的构造方法Semaphore(int permits)接受一个整型的数组,表示最大的并发数。线程使用Semaphore的acquire()方法获得一个许可,使用之后调用release()方法归还许可。
Semaphore示例:
/**
* 假设有10个线程要访问某个资源,但是我们的并发量控制为3
* @author Administrator
*
*/
public class SemaphoreDemo {
private static final int THREAD_COUNT=10;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore semaphore = new Semaphore(3);
public static void main(String[] args) {
for (int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("线程"+Thread.currentThread().getName()+"获得访问资源的权利!");
Thread.sleep(TimeUnit.SECONDS.toSeconds(5));
System.out.println("线程"+Thread.currentThread().getName()+"访问结束!");
semaphore.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
threadPool.shutdown();
}
}
输出结果如下:
线程pool-1-thread-1获得访问资源的权利!
线程pool-1-thread-2获得访问资源的权利!
线程pool-1-thread-3获得访问资源的权利!
线程pool-1-thread-2访问结束!
线程pool-1-thread-3访问结束!
线程pool-1-thread-4获得访问资源的权利!
线程pool-1-thread-1访问结束!
线程pool-1-thread-5获得访问资源的权利!
线程pool-1-thread-6获得访问资源的权利!
线程pool-1-thread-5访问结束!
线程pool-1-thread-6访问结束!
线程pool-1-thread-4访问结束!
线程pool-1-thread-8获得访问资源的权利!
线程pool-1-thread-9获得访问资源的权利!
线程pool-1-thread-7获得访问资源的权利!
线程pool-1-thread-7访问结束!
线程pool-1-thread-10获得访问资源的权利!
线程pool-1-thread-8访问结束!
线程pool-1-thread-9访问结束!
线程pool-1-thread-10访问结束!
通过打印结果我们可以看出,当已经有三个线程获取了资源访问的权利后,后面的线程只能等这三个线程中的一个释放了访问权利,才能接着访问资源。
Exchanger用于线程间的协作,进行数据交互。它提供一个同步点,在这个同步点,两个线程可用交互彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,他会一直等待第二个线程也执行exchange方法,当两个线程都到达同步点时,这两个线程就可以交换数据,将生产出来的数据传递给对方。
Exchanger可以用于遗传算法,遗传算法里需要选出两个人作为交配对象,这时候会交换两人的数据,并使用交叉规则得出2个交配结果。Exchanger也可以用于校对工作,比如我们需要将纸质银行流水通过人工的方式录入成电子银行流水,为了避免错误,采用AB岗,两人进行录入,录入到Excel后,系统需要加载这两个Excel,并对两个Excel数据进行校对,看看是否一致。
代码实例:
public class ExchangerDemo {
/**
*
* 生产者
*
*/
static class Producer implements Runnable{
private List<String> producerBuffer;
private final Exchanger<List<String>> exchanger;
public Producer(List<String> producerBuffer,Exchanger<List<String>> exchanger){
this.producerBuffer=producerBuffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
producerBuffer.add("生产者"+i);
System.out.println("第"+i+"次生产者在等待与消费者交换数据");
try {
producerBuffer=exchanger.exchange(producerBuffer);
System.out.println("第"+i+"次生产者与消费者交换后的数据为:"+producerBuffer.get(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
/**
*消费者
*/
static class Consumer implements Runnable{
private List<String> consumerBuffer;
private final Exchanger<List<String>> exchanger;
public Consumer(List<String> consumerBuffer,Exchanger<List<String>> exchanger) {
this.consumerBuffer=consumerBuffer;
this.exchanger=exchanger;
}
@Override
public void run() {
for (int i = 0; i < 5; i++) {
consumerBuffer.add("消费者"+i);
try {
System.out.println("第"+i+"次消费者正在等待与生产者交换数据!");
consumerBuffer=exchanger.exchange(consumerBuffer);
System.out.println("第"+i+"次消费者交换后的数据为:"+consumerBuffer.get(i));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public static void main(String[] args) {
List<String> producerBuffer = new ArrayList<>();
List<String> consumerBuffer = new ArrayList<>();
Exchanger<List<String>> exchanger = new Exchanger<>();
ExchangerDemo.Producer producer = new Producer(producerBuffer, exchanger);
ExchangerDemo.Consumer consumer = new Consumer(consumerBuffer, exchanger);
Thread producerThread = new Thread(producer);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
}
}
输出结果为
第0次生产者在等待与消费者交换数据
第0次消费者正在等待与生产者交换数据!
第0次消费者交换后的数据为:生产者0
第0次生产者与消费者交换后的数据为:消费者0
第1次消费者正在等待与生产者交换数据!
第1次生产者在等待与消费者交换数据
第1次消费者交换后的数据为:生产者1
第1次生产者与消费者交换后的数据为:消费者1
第2次消费者正在等待与生产者交换数据!
第2次生产者在等待与消费者交换数据
第2次生产者与消费者交换后的数据为:消费者2
第2次消费者交换后的数据为:生产者2
第3次生产者在等待与消费者交换数据
第3次消费者正在等待与生产者交换数据!
第3次生产者与消费者交换后的数据为:消费者3
第3次消费者交换后的数据为:生产者3
第4次生产者在等待与消费者交换数据
第4次消费者正在等待与生产者交换数据!
第4次消费者交换后的数据为:生产者4
第4次生产者与消费者交换后的数据为:消费者4
如果两个线程,有一个线程没有调用exchange()方法,那么则会一直等待,因此,为避免一直等待,可以使用exchange(V x,long timeout,TimeUnit unit)设置等待时长
因篇幅问题不能全部显示,请点此查看更多更全内容
Copyright © 2019- 99spj.com 版权所有 湘ICP备2022005869号-5
违法及侵权请联系:TEL:199 18 7713 E-MAIL:2724546146@qq.com
本站由北京市万商天勤律师事务所王兴未律师提供法律服务