Java并发编程(二十一)AIO

AIO

在网络编程的一步一步进化中,NIO取代了旧的IO方式。而如今,NIO这样的通过selector不断去监控channel的方式,已经开始不满足于现如今的优化需要,也因为NIO一直都是一个同步的IO队列,对于高并发情况下,产生的阻塞依然会让人苦恼,于是乎,像AIO这样的异步IO方式,便诞生了。

aio

AIO非常的简单,甚至可以说是完全的从面向对象出发,对方发送什么,我就立即返回。请注意这里的立即,因为AIO是异步的,所以它注定不会在服务端缓慢的处理信息,这里就要引入之前学过的Future模式,AIO正是这样的模式,得益于Future,收到了信息,便立即返回,然后在返回的途中处理完全部的数据。这样,即完成了异步(因为是立即返回,所以并不产生阻塞),又使得程序更为简洁。

以下便是使用AIO去再次重制服务器。注意,AIO的编程方式,虽然最终是面向对象,但还是引入了一个新的方式,那就是函数式编程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class AIOEchoServer {
public final static int port=8000;
private AsynchronousServerSocketChannel server;
public AIOEchoServer() throws IOException {
server=AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port));
}

public static void main(String[] args) throws IOException, InterruptedException {
new AIOEchoServer().start();
while (true){
Thread.sleep(1000);
}
}

public void start(){
System.out.println("服务端口为:"+port);

server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
final ByteBuffer byteBuffer=ByteBuffer.allocate(1024);
@Override
public void completed(AsynchronousSocketChannel result, Object attachment) {
System.out.println("执行的线程为:"+Thread.currentThread().getName());
Future<Integer> writeResult=null;
//使用future模式,接收到数据便立即返回,在返回中处理,便可异步的使用
try {
byteBuffer.clear();
//清除上次的缓存
result.read(byteBuffer).get(100, TimeUnit.SECONDS);
byteBuffer.flip();
writeResult=result.write(byteBuffer);
//将数据立即写回给客户端
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}finally {
try {
server.accept(null,this);
writeResult.get();
//服务器将进行下一次客户端接收的准备,
// 使用future.get,通过等待,保证write操作写完,再关闭
result.close();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
}
}

@Override
public void failed(Throwable exc, Object attachment) {
System.out.println("失败"+exc);
}


});
}
}

这是echo服务器,它的实际代码量其实非常的少,只有buffer那一段才是最为重要的。这里的AsynchronousServerSocketChannel调用了accept和read和write,它们的共同点就是,都继承了一个叫做CompletionHandler的接口,而这个CompletionHandler的接口又能够再接收一个AsynchronousSocketChannel,这样,服务器开始了它的俄罗斯套娃之旅。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public abstract <A> void accept(A attachment,
CompletionHandler<AsynchronousSocketChannel,? super A> handler);

public final <A> void write(ByteBuffer src,
A attachment,
CompletionHandler<Integer,? super A> handler)

{
write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}


public final <A> void read(ByteBuffer dst,
A attachment,
CompletionHandler<Integer,? super A> handler)
{
read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler);
}

值得注意的是,这样的函数式编程思维,使得AIO更为简洁,每次只有服务器接收到了数据才会返回。函数式编程在《Java8实战》中有着重的讲述,也算是JDK8或以上才拥有的特点。而我们也可以应用到客户端上:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;

public class AIOClient {
public static void main(String[] args) throws IOException, InterruptedException {
final AsynchronousSocketChannel client= AsynchronousSocketChannel.open();
//建立通道
client.connect(new InetSocketAddress("localhost", 8000), null,
new CompletionHandler<Void, Object>() {
//先连接,连接过程中传入一个CompletionHandler写入
@Override
public void completed(Void result, Object attachment) {
client.write(ByteBuffer.wrap("hello!".getBytes()),null,
new CompletionHandler<Integer,Object>(){
//开始写入,完了接着往回read
@Override
public void completed(Integer result, Object attachment) {
try {
ByteBuffer buffer=ByteBuffer.allocate(1024);
client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() {
//开始读取,读取完了关闭
@Override
public void completed(Integer result, ByteBuffer buffer) {
buffer.flip();
System.out.println(new String(buffer.array()));
try {
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, ByteBuffer attachment) {

}
});
} catch (Exception e) {
e.printStackTrace();
}
}

@Override
public void failed(Throwable exc, Object attachment) {

}
});
}

@Override
public void failed(Throwable exc, Object attachment) {

}
});
Thread.sleep(1000);
}
}


/*
服务端口为:8000
执行的线程为:Thread-9
//
hello!

这里使用了connect去连接服务器,连接的时候,传入了一个CompletionHandler接口,完成连接之后便开始写入,而在写入的参数中又有一个CompletionHandler接口,完成它的读回,读取的过程中又有一个CompletionHandler,继续完成它的关闭连接操作,这样,它们便连接起来的了。每当有数据发送至服务端时,便可以立即被返回,不在服务器逗留。AIO的方式,又极大的提高了效率。