博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Java7中的ForkJoin并发框架初探(下)—— ForkJoin的应用
阅读量:5978 次
发布时间:2019-06-20

本文共 4645 字,大约阅读时间需要 15 分钟。

  前两篇文章已经对Fork Join的设计和JDK中源码的简要分析。这篇文章,我们来简单地看看我们在开发中怎么对JDK提供的工具类进行应用,以提高我们的需求处理效率。

  Fork Join这东西确实用好了能给我们的任务处理提高效率,也为开发带来方便。但Fork Join不是那么容易用好的,我们先来看几个例子(反例)。

0. 反例错误分析

  我们先来看看这篇文章中提供的例子:http://www.iteye.com/topic/643724 (因为是反例,就不提供超链接了,只以普通文本给出URL)

  这篇文章是我学习和整理Fork Join时搜索到的一篇文章,其实总的来说这篇文章前面分析得还是比较好的,只是给出的第一个例子(有返回结果的RecursiveTask应用的例子) 没有正确地对Fork Join进行应用。为了方便分析,还是贴下这个例子中具体的的代码吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public
class
Calculator
extends
RecursiveTask {
 
    
private
static
final
int
THRESHOLD =
100
;
    
private
int
start;
    
private
int
end;
 
    
public
Calculator(
int
start,
int
end) {
        
this
.start = start;
        
this
.end = end;
    
}
 
    
@Override
    
protected
Integer compute() {
        
int
sum =
0
;
        
if
((start - end) < THRESHOLD){
            
for
(
int
i = start; i< end;i++){
                
sum += i;
            
}
        
}
else
{
            
int
middle = (start + end) /
2
;
            
Calculator left =
new
Calculator(start, middle);
            
Calculator right =
new
Calculator(middle +
1
, end);
            
left.fork();
            
right.fork();
 
            
sum = left.join() + right.join();
        
}
        
return
sum;
    
}
 
}

  我们看到其中一段已经高亮的代码,显示对两个子任务进行fork()调用,即分别提交给当前线程的任务队列,依次加到末尾。紧接着,又按照调用fork()的顺序执行两个子任务对象的join()方法。

  其实,这样就有一个问题,在每次迭代中,第一个子任务会被放到线程队列的倒数第二个位置,第二个子任务是最后一个位置。当执行join()调用的时 候,由于第一个子任务不在队列尾而不能通过执行ForkJoinWorkerThread的unpushTask()方法取出任务并执行,线程最终只能挂 起阻塞,等待通知。而Fork Join本来的做法是想通过子任务的合理划分,避免过多的阻塞情况出现。这样,这个例子中的操作就违背了Fork Join的初衷,每次子任务的迭代,线程都会因为第一个子任务的join()而阻塞,加大了代码运行的成本,提高了资源开销,不利于提高程序性能。

  除此之外,这段程序还是不能进入Fork Join的过程,因为还有一个低级错误。看下第15、16行代码的条件,就清楚了。按照逻辑,start必然是比end小的。这将导致所有任务都将以循环累加的方式完成,而不会执行fork()和join()。

  由此可见,Fork Join的使用还是要注意对其本身的理解和对开发过程中细节的把握的。我们看下JDK中RecursiveAction和RecursiveTask这两个类。

1. RecursiveAction分析及应用实例

  这两个类都是继承了ForkJoinTask,本身给出的实现逻辑并不多不复杂,在JDK的类文件中,它的注释比源码还要多。我们可以看下它的实现代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public
abstract
class
RecursiveAction
extends
ForkJoinTask<Void> {
    
private
static
final
long
serialVersionUID = 5232453952276485070L;
 
    
protected
abstract
void
compute();
 
    
public
final
Void getRawResult() {
return
null
; }
 
    
protected
final
void
setRawResult(Void mustBeNull) { }
 
    
protected
final
boolean
exec() {
        
compute();
        
return
true
;
    
}
}

  我们看到其中两个方法是关于处理空返回值的方法。而exec方法则是调用了compute(),这个compute就是我们使用Fork Join时需要自己实现的逻辑。

  我们可以看下API中给出的一个最简单最具体的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class
IncrementTask
extends
RecursiveAction {
   
final
long
[] array;
final
int
lo;
final
int
hi;
   
IncrementTask(
long
[] array,
int
lo,
int
hi) {
     
this
.array = array;
this
.lo = lo;
this
.hi = hi;
   
}
   
protected
void
compute() {
     
if
(hi - lo < THRESHOLD) {
       
for
(
int
i = lo; i < hi; ++i)
         
array[i]++;
     
}
     
else
{
       
int
mid = (lo + hi) >>>
1
;
       
invokeAll(
new
IncrementTask(array, lo, mid),
                 
new
IncrementTask(array, mid, hi));
     
}
   
}
 
}

  大致的逻辑就是,对给定一个特定数组的某段,进行逐个加1的操作。我们看到else中的代码块,显示取一个lo和hi的中间值,此后分割成两个子任务,并进行invokeAll()调用。我们来看下继承自FutureTask的invokeAll()方法实现。很简单:

1
2
3
4
5
public
static
void
invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
    
t2.fork();
    
t1.invoke();
    
t2.join();
}

  对于参数中的两个子任务,对第二个子任务进行fork(),即放入线程对应队列的结尾,然后执行第一个子任务,再调用第二个子任务的join(),实际上就是跳转到第二个子任务,进行执行(当然如果不能执行,就需要阻塞等待了)。

  其实invokeAll()是个重载方法,同名的还有另外两个,基本逻辑都是一样的,我们拿出一个通用一点的来看一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public
static
void
invokeAll(ForkJoinTask<?>... tasks) {
    
Throwable ex =
null
;
    
int
last = tasks.length -
1
;
    
for
(
int
i = last; i >=
0
; --i) {
        
ForkJoinTask<?> t = tasks[i];
        
if
(t ==
null
) {
            
if
(ex ==
null
)
                
ex =
new
NullPointerException();
        
}
        
else
if
(i !=
0
)
            
t.fork();
        
else
if
(t.doInvoke() < NORMAL && ex ==
null
)
            
ex = t.getException();
    
}
    
for
(
int
i =
1
; i <= last; ++i) {
        
ForkJoinTask<?> t = tasks[i];
        
if
(t !=
null
) {
            
if
(ex !=
null
)
                
t.cancel(
false
);
            
else
if
(t.doJoin() < NORMAL && ex ==
null
)
                
ex = t.getException();
        
}
    
}
    
if
(ex !=
null
)
        
UNSAFE.throwException(ex);
}

  我们发现第一个子任务(i==0的情况)没有进行fork,而是直接执行,其余的统统先调用fork()放入任务队列,之后再逐一join()。其 实我们注意到一个要点就是第一个任务不要fork()再join(),也就是上面中例子的错误所在,这样会造成阻塞,而不能充分利用Fork Join的特点,也就不能保证任务执行的性能。

2. RecursiveTask简要说明

  其实说完了RecursiveAction,RecursiveTask可以用“同理”来解释。实现代码也很简单:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public
abstract
class
RecursiveTask<V>
extends
ForkJoinTask<V> {
    
private
static
final
long
serialVersionUID = 5232453952276485270L;
 
    
V result;
 
    
protected
abstract
V compute();
 
    
public
final
V getRawResult() {
        
return
result;
    
}
 
    
protected
final
void
setRawResult(V value) {
        
result = value;
    
}
 
    
protected
final
boolean
exec() {
        
result = compute();
        
return
true
;
    
}
 
}

  我们看到唯一不同的是返回结果的处理,其余都可以和RecursiveAction一样使用。

3. Fork Join应用小结

  Fork Join是为我们提供了一个非常好的“分而治之”思想的实现平台,并且在一定程度上实现了“变串行并发为并行”。但Fork Join不是万能的页不完全是通用的,对于可很好分解成子任务的场景,我们可以对其进行应用,更多时候要考虑需求和应用场景,并且注意其使用要点才行。

转载于:https://www.cnblogs.com/wxgblogs/p/5477113.html

你可能感兴趣的文章
你有足够的勇气改变不良的数据习惯吗?
查看>>
SQL中存储过程的创建和使用
查看>>
荷兰政府:保证不强制在任何产品中留有后门
查看>>
编写单元测试的10条理由
查看>>
LINUX-SAMBA服务配置
查看>>
图像处理------光束效果
查看>>
剑指offer 面试题6:重建二叉树
查看>>
智能合约从入门到精通:Solidity语法之内存变量的布局和状态变量的存储模型...
查看>>
初识Vue
查看>>
Android之Handler消息传递机制详解
查看>>
iOS 离屏渲染
查看>>
OpenCV 离散傅里叶变换
查看>>
小程序多图上传
查看>>
(入门)使用webpack 4.x定制自己的react开发环境
查看>>
笔记3 es6,7,正则
查看>>
CSS结构原理(二)——伪类、伪元素与样式特性
查看>>
Flutter 系列文章:Flutter SliverAppbar 控件介绍
查看>>
vuepress搭建技术文档实战(带github项目地址)
查看>>
第一个React项目总结
查看>>
我用两种方式写的省市区判断转换
查看>>