Java并发编程(十九)并行算法

串行计算在并行的情况

在串行条件下,许多计算方法,都比较的贴近自然语言,就好似a=b+c; 这样的计算方法,无疑那么的明显,就是a要等于b和c的和,和我们使用手写写出来的,并没与什么区别。但是,这样的计算方法,在并行条件下,却有那么点不同。或者说,串行计算在并行条件(多线程)下,也只会执行串行计算。举个例子:(B+C)*D-2;这样的计算式,我们可能将它们拆分开来计算,不可能先计算D-2再去计算 *(B+C),这样有违于计算的基本常识。所以,想要得到最终的答案,就不得不一直去等待(B+C)的结果,这样的话,很多并行计算其实也没有那么多优势。但人们的眼光不仅仅拘于此,人们想到了使用流水线的方式,去发挥并行计算所能达到的最佳性能,你可将以上式子,在并行条件下拆分为,

1
T1=A+B;T2=T1*D;T3=T2-2;

这样的形式,让每一个线程在运行时,不必去重新计算他们的值,使用分工合作,一个线程负责加操作,一个负责乘操作,一个负责减操作,如此一来,就将我们的串行计算,大大的优化了。我们也可以使用例子去验证一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
//执行一个(B+C)*D/2的例子
public class msg {
//信息交换的载体
public double i;
public double j;
public String s=null;
}
//
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class puls implements Runnable {
public static BlockingQueue<msg> bq=new LinkedBlockingQueue<msg>();
@Override
public void run() {
while (true){
try {
msg m=bq.take();
//从队列获得值
m.j=m.i+m.j;
//求两数之和
multiply.bq.add(m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class multiply implements Runnable {
public static BlockingQueue<msg> bq=new LinkedBlockingQueue<msg>();

@Override
public void run() {
while (true){
try {
msg m=bq.take();
m.i=m.i*m.j;
div.bq.add(m);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

public class div implements Runnable {
public static BlockingQueue<msg> bq=new LinkedBlockingQueue<msg>();

@Override
public void run() {
while (true){
try {
msg m=bq.take();
m.i=m.i/m.j;
System.out.println(m.s+"="+m.i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
//
public class test {
public static void main(String[] args) throws InterruptedException {
new Thread(new puls()).start();
new Thread(new multiply()).start();
new Thread(new div()).start();
//开启三个线程,但在并未传入参数之前,都会被take所阻塞
for (int i = 1; i <=1000 ; i++) {
for (int j = 1; j <1000 ; j++) {
Thread.sleep(1000);
msg m=new msg();
m.i=i;
m.j=j;
m.s="(("+i+"+"+j+")*"+i+")/2";
puls.bq.add(m);
}
}
}
}

//
((1+1)*1)/2=1.0
((1+2)*1)/2=1.0
((1+3)*1)/2=1.0
((1+4)*1)/2=1.0
((1+5)*1)/2=1.0
((1+6)*1)/2=1.0
((1+7)*1)/2=1.0

这样的并行计算,就结束啦。

并行下的搜索

在并行条件下 ,除了使用的计算方式会有所不同之外,使用的算法也有着相应的改变。就比如搜索这一个算法,搜索在串行条件下,无论是二分搜索或者是其他搜索,我们都只需要在一个数组中遍历就了,最基本的搜索方式,就是在无序数组中,使用逐步遍历的方式进行搜索。那么在并行条件下,使用搜索,怎么把多线程里面的能力利用起来呢?这就要涉及到了一个线程之间的通信,我们可以想办法把一个数组分成两块,一个线程搜一块,这不就提高效率了吗,而之前所提到的Future模式,便可以应用到这里,搜索到了并返回搜索结果,这也是runnable接口所不具有的呢。来试试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class test {

static int[] arr={11,12,13,14,15,16};
//表示要搜索的原数组
static ExecutorService es= Executors.newCachedThreadPool();
//线程池
static final int Thread_num=2;
//线程数量
static AtomicInteger result=new AtomicInteger(-1);
//定义搜索结果,没有搜到则返回-1
public static int search(int value,int begin,int end){
int i=0;
for ( i = begin; i <end ; i++) {
if (result.get()>=0){
return result.get();
//表示找到了
}
if (arr[i]==value){
if (!result.compareAndSet(-1,i)){
//如果设置失败,表示其他线程先找到了
return result.get();
}
return i;
}
}
return -1;
}

public static class task implements Callable<Integer>{
int begin,end,value;

public task(int value, int begin, int end) {
this.begin = begin;
this.end = end;
this.value = value;
}

@Override
public Integer call() throws Exception {
int r=search(value,begin,end);
return r;
}
}

public static int psearch(int value) throws ExecutionException, InterruptedException {
//根据线程数量对数组进行划分
int sub=arr.length/Thread_num+1;
List<Future<Integer>> r=new ArrayList<Future<Integer>>();
for (int i = 0; i <arr.length ; i+=sub) {
int end=i+sub;
if (end>=arr.length){
end=arr.length;
}
r.add(es.submit(new task(value,i,end)));
}
for (Future<Integer> f:
r) {
if (f.get()>=0){
return f.get();
}
}
return -1;
}

public static void main(String[] args) throws ExecutionException, InterruptedException {
int i=psearch(12);
if (i>=0){
System.out.println("找到了,数组下标为:"+i);
}
else {
System.out.println("没有找到");
}
}
}

//找到了,数组下标为:1

通过使用Callable,就把并行搜索给实现了

并行下的排序

讲到排序,想必大家一开始学到的排序大多数冒泡排序之类的吧,像冒泡排序这种排序手法,在此不多赘述,但冒泡排序的特点就是,如果右边的数值比左边大,那么就将这两个相邻的数字进行一次交换,这个排序手法在串行条件下是那么的明确和简单。冒泡

但是在并行条件下,两个不同的线程要怎么把这种即比较手法结合到一起呢?这似乎是一个难题,因为这不等于搜索,可以拆分成两块去搜,排序的要求至少对整体而言,必须是可见的,毕竟部分有序不等于整体有序,为了解决这种难题,从冒泡排序中更改了一些逻辑,诞生了一个新的排序方法:奇偶排序

交换型排序:奇偶排序

奇偶排序并不是字面上意思,即奇数与奇数排序偶数与偶数排序,奇偶排序实际上是分两个线程,一边使得该数组中的奇数与其相邻的数字进行比较和交换,一边让该数组中偶数和相邻的数字进行比较和交换。这样不断重复下来即可,这样就可以将冒泡排序的排序手法,运用到多线程中了。使用实际例子试一试吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test {
static int[] arr={16,12,14,11,15,13};
static ExecutorService es= Executors.newCachedThreadPool();
//线程池

static int flag=1;
//设置是否进行交换的标志,以此控制奇偶数
static synchronized void setFlag(int f){
flag = f;
}

public static class sort implements Runnable{
int i;
CountDownLatch countDownLatch;

public sort(int i, CountDownLatch countDownLatch) {
this.i = i;
this.countDownLatch = countDownLatch;
}

@Override
public void run() {
if (arr[i]>arr[i+1]){
int temp=arr[i];
arr[i]=arr[i+1];
arr[i+1]=temp;
//交换
setFlag(1);
}
countDownLatch.countDown();
}
}

public static void psort(int[] arr) throws InterruptedException {
int start=0;
while (flag==1 || start==1){
setFlag(0);
CountDownLatch latch=new CountDownLatch(arr.length/2-(arr.length%2==0?start:0));
//偶数的数组长度,当start等于1时,只有len/2-1个线程
for (int i = start; i <arr.length-1 ; i+=2) {
es.submit(new sort(i,latch));
//此处是核心所在,每次i+2,可以跳过 奇/偶 的位置
// 不断地去提交任务给线程执行,每次执行完都会set一个flag,
// 开始下一次 奇/偶 排序
}
latch.await();
//等待所有线程结束
if (start==0){
start=1;
}else{
start=0;
}
}
}

public static void main(String[] args) throws InterruptedException {
System.out.println("交换前:");
for (int a:
arr) {
System.out.print(" "+a);
}
psort(arr);
System.out.println();
System.out.println("交换后:");
for (int a:
arr) {
System.out.print(" "+a);
}
}
}

交换前:
16 12 14 11 15 13
交换后:
11 12 13 14 15 16

这样一来,就排序成功了,内部运行的效果可看下图:

奇偶排序))

插入型排序:希尔排序

插入排序是一种常见的排序类型,这个和交换类型排序有着比较大的区别,直接选择排序就是一种插入型排序。把数组分成两部分,一部分是排序好的,一部分是还未排序好的,把未排序好的数组中的元素插入到排序好的数组中,就是插入排序。如果想要把这样排序应用到并行程序当中,未免也太过于困难,于是乎,便根据插入排序的基本原则,诞生了一种适合多线程下运行的排序:希尔排序。

希尔

我们来看看希尔排序如何在多线程下实现吧。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class test {

static int[] arr={16,12,14,11,15,13};
static ExecutorService es= Executors.newCachedThreadPool();
//线程池
public static class shell implements Runnable{
int i=0;
int h=0;
CountDownLatch latch;

public shell(int i, int h, CountDownLatch latch) {
this.i = i;
this.h = h;
this.latch = latch;
}

@Override
public void run() {
if (arr[i]<arr[i-h]){
int temp=arr[i];
int j=i-h;
while (j>=0 && arr[j]>temp){
arr[j+h]=arr[j];
j-=h;
}
arr[j+h]=temp;
}
latch.countDown();
}
}
public static void pshell(int[]arr) throws InterruptedException {
int h=1;
CountDownLatch latch=null;
while (h<=arr.length/3){
h=h*3+1;
//计算出最大的h值
}
while (h>0){
System.out.println();
System.out.println("gap="+h);
if (h>=4){
latch=new CountDownLatch(arr.length-h);
}
System.out.println("交换的步骤:");
for (int i = h; i <arr.length ; i++) {
//控制线程的数量
if (h>=4){
es.execute(new shell(i,h,latch));
}else {
if(arr[i]<arr[i-h]){
int temp=arr[i];
int j=i-h;
while (j>=0&&arr[j]>temp){
arr[j+h]=arr[j];
j-=h;
}
arr[j+h]=temp;
}
System.out.println(Arrays.toString(arr));
}
}
latch.await();
//等待排序完成
h=(h-1)/3;
}
}

public static void main(String[] args) throws InterruptedException {
System.out.println("交换前:");
for (int a:
arr) {
System.out.print(" "+a);
}
pshell(arr);
System.out.println("交换后:");
for (int a:
arr) {
System.out.print(" "+a);
}
}
}


交换前:
16 12 14 11 15 13
gap=4
交换的步骤:

gap=1
交换的步骤:
[12, 15, 14, 11, 16, 13]
[12, 14, 15, 11, 16, 13]
[11, 12, 14, 15, 16, 13]
[11, 12, 14, 15, 16, 13]
[11, 12, 13, 14, 15, 16]
交换后:
11 12 13 14 15 16

如此一来,就排序成功了。