金色坐标

关注互联网应用和搜索引擎技术

« JDK5引入的线程并发开发concurrent包(二)JavaScript学习笔记1 »

JDK5引入的线程并发开发concurrent包(三)

Java 5中引入了并发包(java.util.concurrent),大大简化了多线程并发程序的开发。接下来我们分析一下引入concurrent包为多线程并发程序带来的变化。为清晰期间,进行对比说明。

第三,同步化器(Synchronizers)

在没有JDK 5引入的同步化器之前,线程之间的协作基本依赖于低层次的wait和notfiy(notifyAll)这样的操作,一方面这些操作对实现多个线程复杂流水线(执行顺序和时刻受条件变化)有很大难度,另外一方面即便写出相应实现,也很容易充满各种bugs。同步化器则大大减轻了这种负担和压力。

1,信号量(Semaphores)

对操作系统中学习的经典PV原语的支持。

2,栅栏(Barriers)

一些线程互相等待到达一个公共的栅栏点(barrier point)。Java中对应的类为 CyclicBarrier。

方法 用途
CyclicBarrier(int parties, Runnable barrierAction) 最常用的构造方法,它有两个参数,一个是参与者的个数,另外一个是所有参与者都达到栅栏点(barrier point)后执行的任务
int await()  等待知道所有的参与者都已经调用了该方法(即都到达一个公共的栅栏点(barrier point)),返回的整数表明当前参与者是第几(从0开始)个到达的

假设有3个人一起写一本书,每人各写一章,另外请人负责把大家写的章节校对并且合并为一本书, 写章节的人在其他写章节的人没有全部完成前不许睡觉。

public class BookWriters {

final int N = 3; //3个写书章节的
final CyclicBarrier barrier;

public BookWriters(){
  //创建一个章节合并者
  Runnable merger = new Runnable() {
                                   public void run() {
                                      //合并章节
                                      ....
                                   }
                                 };
  barrier =  new CyclicBarrier(N, merger);
  for (int i = 0; i < N; ++i) {
       new Thread(new ChapterWriter(i+1)).start();
  }
}

public static class ChapterWriter implements Runnable {
  int c;
  CyclicBarrier barrier;
  public ChapterWriter(int c, CyclicBarrier barrier){
    this.c = c;
    this.barrier = barrier;
  }
 
  public void run() {
     //写完第c章
     completeCharpter();
     barrier.await(); //等大家都写完了
     System.out.println("写第"+c + "章的人可以去睡觉了")
  }
}
}


3, 倒计时闩(CountDown Latch)

public class TestHarness {
    public long timeTasks(int nThreads, final Runnable task)
            throws InterruptedException {
        final CountDownLatch startGate = new CountDownLatch(1);
        final CountDownLatch endGate = new CountDownLatch(nThreads);

        for (int i = 0; i < nThreads; i++) {
            Thread t = new Thread() {
                public void run() {
                    try {
                        startGate.await();//等主线程准备好
                        try {
                            task.run();
                        } finally {
                            endGate.countDown();
                        }
                    } catch (InterruptedException ignored) { }
                }
            };
            t.start();
        }

        long start = System.nanoTime();
        startGate.countDown();//让所有线程都可以开始了
        endGate.await();//等待所有线程完成
        long end = System.nanoTime();
        return end-start;
    }
}


4,交换器(Exchanger)

是Barriers的特殊形式,相当于是两个参与者各自等待对方的栅栏到达对应的栅栏点,典型的应用为一个线程生成数据放入buffer,另外一个线程消费数据将buffer清空。

public V exchange(V x)  throws InterruptedException
等待另外一个线程到达exchange point(即也调用同一个exchager的exchange方法(除非中间被interrupt),这样两个线程互相得到对方传入的数据)


下面就是一个使用双缓存轮换的示例,生存者和消费者各使用一个缓存,等生存者缓存满了而消费者缓存空了,就进行一次缓存交换,假定buffer的类型为DataBuffer:

class FillAndEmpty {
   Exchanger<DataBuffer> exchanger = new Exchanger<DataBuffer>();
   //给生成数据方用的初始缓存
   DataBuffer initialEmptyBuffer = ... a made-up type
   //给消费数据方用的初始缓存,可能为empty
   DataBuffer initialFullBuffer = ...

   //不停地生产数据的线程
   class FillingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialEmptyBuffer;
       try {
         while (currentBuffer != null) {
           //把当前生产的数据添加到缓存中
           addToBuffer(currentBuffer);
           //如果缓存满了或者缓存不空并且已经超时,就等待和消费方交换缓存,这里等待知道对方也调用exchanger.exchange()为止,返回值为对方传入的参数。
           if (currentBuffer.isFull() || (!currentBuffer.isEmpty() && isTimeOut()))
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ... }
     }
   }

   //不停地消费数据的线程
   class EmptyingLoop implements Runnable {
     public void run() {
       DataBuffer currentBuffer = initialFullBuffer;
       try {
         while (currentBuffer != null) {
           //从缓存中消费数据
           takeFromBuffer(currentBuffer);
           //如果缓存消费完了,就可以等待和生产方交换缓存
           if (currentBuffer.isEmpty())
             currentBuffer = exchanger.exchange(currentBuffer);
         }
       } catch (InterruptedException ex) { ... handle ...}
     }
   }

   void start() {
     new Thread(new FillingLoop()).start();
     new Thread(new EmptyingLoop()).start();
   }
 }
 

 




原创文章,如转载请注明:转载自金色坐标 [ http://www.kingxy.com/ ]

本文链接地址:http://www.kingxy.com/archives/204.html

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

订阅博客

  • 订阅我的博客:订阅我的博客
  • 通过Google订阅本站
  • 通过bloglines订阅本站
  • 通过抓虾订阅本站
  • 通过yahoo订阅本站

Search

Google

最新评论及回复

最近发表

金色坐标博客——京ICP备09009094号

本站采用创作共用版权协议, 要求署名、非商业用途和保持一致. 转载本站内容必须也遵循“署名-非商业用途-保持一致”的创作共用协议.
KingXY Blog - This site is licensed under a Creative Commons Attribution-NonCommercial-ShareAlike 2.5 License.