- 分割List,根据采用的线程(任务)数平均分配,即list.size()/threadCounts。
- 定义一个记录“很大List”中所有整数和的变量sum,采用一个线程(任务)处理一个分割后的子List,计算子List中所有整数和(subSum),然后把和(subSum)累加到sum上。
- 等待所有线程(任务)完成后输出总和(sum)的值。
示意图如下: (Edraw) 三:详细编码实现 代码中有很详细的注释,这里就不解释了。
/** * 计算List中所有整数的和 * 采用多线程,分割List计算 */ public class CountListIntegerSum { private long sum;//存放整数的和 private CyclicBarrier barrier;//障栅集合点(同步器) private Listlist;//整数集合List private int threadCounts;//使用的线程数 public CountListIntegerSum(List list,int threadCounts) { this.list=list; this.threadCounts=threadCounts; } /** * 获取List中所有整数的和 * @return */ public long getIntegerSum(){ ExecutorService exec=Executors.newFixedThreadPool(threadCounts); int len=list.size()/threadCounts;//平均分割List //List中的数量没有线程数多(很少存在) if(len==0){ threadCounts=list.size();//采用一个线程处理List中的一个元素 len=list.size()/threadCounts;//重新平均分割List } barrier=new CyclicBarrier(threadCounts+1); for(int i=0;i list.size()?list.size():len*(i+1)))); } } try { barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+":Interrupted"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName()+":BrokenBarrier"); } exec.shutdown(); return sum; } /** * 分割计算List整数和的线程任务 * */ public class SubIntegerSumTask implements Runnable{ private List subList; public SubIntegerSumTask(List subList) { this.subList=subList; } public void run() { long subSum=0L; for (Integer i : subList) { subSum += i; } synchronized(CountListIntegerSum.this){ //在CountListIntegerSum对象上同步 sum+=subSum; } try { barrier.await();//关键,使该线程在障栅处等待,直到所有的线程都到达障栅处 } catch (InterruptedException e) { System.out.println(Thread.currentThread().getName()+":Interrupted"); } catch (BrokenBarrierException e) { System.out.println(Thread.currentThread().getName()+":BrokenBarrier"); } System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum); } } }
有人可能对barrier=new CyclicBarrier(threadCounts+1);//创建的线程数和主线程main有点不解,不是采用的线程(任务)数是threadCounts个吗?怎么为CyclicBarrier设置的给定数量的线程参与者比我们要采用的线程数多一个呢?
答案就是这个多出来的一个用于控制main主线程的,主线程也要等待,它要等待其他所有的线程完成才能输出sum值,这样才能保证sum值的正确性,如果main不等待的话,那么结果将是不可预料的。/** * 计算List中所有整数的和测试类 */ public class CountListIntegerSumMain { /** * @param args */ public static void main(String[] args) { Listlist = new ArrayList (); int threadCounts = 10;//采用的线程数 //生成的List数据 for (int i = 1; i <= 1000000; i++) { list.add(i); } CountListIntegerSum countListIntegerSum=new CountListIntegerSum(list,threadCounts); long sum=countListIntegerSum.getIntegerSum(); System.out.println("List中所有整数的和为:"+sum); } }
四:总结 本文主要通过一个淘宝的面试题为引子,介绍了并发的一点小知识,主要是介绍通过CyclicBarrier同步辅助器辅助多个并发任务共同完成一件工作。Java SE5的java.util.concurrent引入了大量的设计来解决并发问题,使用它们有助于我们编写更加简单而健壮的并发程序。 附mathfox提到的ExecutorService.invokeAll()方法的实现 这个不用自己控制等待,invokeAll执行给定的任务,当所有任务完成时,返回保持任务状态和结果的 Future 列表。sdh5724也说用了同步,性能不好。这个去掉了同步,根据返回结果的 Future 列表相加就得到总和了。
/** * 使用ExecutorService的invokeAll方法计算 * */ public class CountSumWithCallable { /** * @param args * @throws InterruptedException * @throws ExecutionException */ public static void main(String[] args) throws InterruptedException, ExecutionException { int threadCounts =19;//使用的线程数 long sum=0; ExecutorService exec=Executors.newFixedThreadPool(threadCounts); List> callList=new ArrayList >(); //生成很大的List List list = new ArrayList (); for (int i = 0; i <= 1000000; i++) { list.add(i); } int len=list.size()/threadCounts;//平均分割List //List中的数量没有线程数多(很少存在) if(len==0){ threadCounts=list.size();//采用一个线程处理List中的一个元素 len=list.size()/threadCounts;//重新平均分割List } for(int i=0;i subList; if(i==threadCounts-1){ subList=list.subList(i*len,list.size()); }else{ subList=list.subList(i*len, len*(i+1)>list.size()?list.size():len*(i+1)); } //采用匿名内部类实现 callList.add(new Callable (){ public Long call() throws Exception { long subSum=0L; for(Integer i:subList){ subSum+=i; } System.out.println("分配给线程:"+Thread.currentThread().getName()+"那一部分List的整数和为:\tSubSum:"+subSum); return subSum; } }); } List > futureList=exec.invokeAll(callList); for(Future future:futureList){ sum+=future.get(); } exec.shutdown(); System.out.println(sum); } }
一些感言 这篇文章是昨天夜里11点多写好的,我当时是在网上看到了这个题目,就做了一下分析,写了实现代码,由于水平有限,难免有bug,这里感谢xifo等人的指正。这些帖子从发表到现在不到24小时的时间里创造了近9000的浏览次数,回复近100,这是我没有想到的,javaeye很久没这么疯狂过啦。这不是因为我的算法多好,而是因为这个题目、这篇帖子所体现出的意义。大家在看完这篇帖子后不光指正错误,还对方案进行了改进,关键是思考,人的思维是无穷的,只要我们善于发掘,善于思考,总能想出一些意想不到的方案。 从算法看,或者从题目场景对比代码实现来看,或许不是一篇很好的帖子,但是我说这篇帖子是很有意义的,方案也是在很多场景适用,有时我们可以假设这不是计算和,而是把数据写到一个个的小文件里,或者是分割进行网络传输等等,都有一定的启发,特别是回帖中的讨论。 单说一下回帖,我建议进来的人尽量看完所有的回帖,因为这里是很多人集思广益的精华,这里有他们分析问题,解决问题的思路,还有每个人提到的解决方案,想想为什么能用?为什么不能用?为什么好?为什么不好? 我一直相信:讨论是解决问题、提高水平的最佳方式!
http://www.iteye.com/topic/711162InvokeAll的一个demo
import java.util.ArrayList;import java.util.List;import java.util.concurrent.*;public class InvokeAllDemoWhenException { public static void main(String[] args) { Listtasks = new ArrayList (); for (int i = 0; i < 11; i++) { tasks.add(new TaskInvokeAll(i)); } ExecutorService es = Executors.newCachedThreadPool(); try { List > invokeAll = es.invokeAll(tasks); for (Future future : invokeAll) { try { System.out.println(future.get()); } catch (ExecutionException e) { e.printStackTrace(); } } } catch (InterruptedException e) { e.printStackTrace(); } }}class TaskInvokeAll implements Callable { private int flag; public TaskInvokeAll(int flag) { this.flag = flag; } @Override public String call() throws Exception { Thread currentThread = Thread.currentThread(); if (flag == 5) { System.out.println(currentThread + " " + this.flag); throw new IllegalArgumentException("current :" + flag); } if (flag == 2) { for (int i = 0; i < 5; i++) { System.out.println(currentThread + "Begin to sleep 1s"); TimeUnit.SECONDS.sleep(1); } } System.out.println(currentThread + " T'm finish:" + this.flag); return String.valueOf(flag); }}
Output(抛异常的打印,在输出结果时不固定):
Thread[pool-1-thread-1,5,main] T'm finish:0Thread[pool-1-thread-5,5,main] T'm finish:4Thread[pool-1-thread-2,5,main] T'm finish:1Thread[pool-1-thread-3,5,main]Begin to sleep 1sThread[pool-1-thread-5,5,main] T'm finish:10Thread[pool-1-thread-7,5,main] T'm finish:6Thread[pool-1-thread-9,5,main] T'm finish:8Thread[pool-1-thread-4,5,main] T'm finish:3Thread[pool-1-thread-8,5,main] T'm finish:7Thread[pool-1-thread-6,5,main] 5Thread[pool-1-thread-2,5,main] T'm finish:9Thread[pool-1-thread-3,5,main]Begin to sleep 1sThread[pool-1-thread-3,5,main]Begin to sleep 1sThread[pool-1-thread-3,5,main]Begin to sleep 1sThread[pool-1-thread-3,5,main]Begin to sleep 1sThread[pool-1-thread-3,5,main] T'm finish:201234678910java.util.concurrent.ExecutionException: java.lang.IllegalArgumentException: current :5 at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222) at java.util.concurrent.FutureTask.get(FutureTask.java:83) at thread.InvokeAllDemoWhenException.main(InvokeAllDemoWhenException.java:25) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)Caused by: java.lang.IllegalArgumentException: current :5 at thread.TaskInvokeAll.call(InvokeAllDemoWhenException.java:50) at thread.TaskInvokeAll.call(InvokeAllDemoWhenException.java:37) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) at java.util.concurrent.FutureTask.run(FutureTask.java:138) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) at java.lang.Thread.run(Thread.java:619)