AIO
在网络编程的一步一步进化中,NIO取代了旧的IO方式。而如今,NIO这样的通过selector不断去监控channel的方式,已经开始不满足于现如今的优化需要,也因为NIO一直都是一个同步的IO队列,对于高并发情况下,产生的阻塞依然会让人苦恼,于是乎,像AIO这样的异步IO方式,便诞生了。
AIO非常的简单,甚至可以说是完全的从面向对象出发,对方发送什么,我就立即返回。请注意这里的立即,因为AIO是异步的,所以它注定不会在服务端缓慢的处理信息,这里就要引入之前学过的Future模式,AIO正是这样的模式,得益于Future,收到了信息,便立即返回,然后在返回的途中处理完全部的数据。这样,即完成了异步(因为是立即返回,所以并不产生阻塞),又使得程序更为简洁。
以下便是使用AIO去再次重制服务器。注意,AIO的编程方式,虽然最终是面向对象,但还是引入了一个新的方式,那就是函数式编程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousServerSocketChannel; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException;
public class AIOEchoServer { public final static int port=8000; private AsynchronousServerSocketChannel server; public AIOEchoServer() throws IOException { server=AsynchronousServerSocketChannel.open().bind(new InetSocketAddress(port)); }
public static void main(String[] args) throws IOException, InterruptedException { new AIOEchoServer().start(); while (true){ Thread.sleep(1000); } }
public void start(){ System.out.println("服务端口为:"+port);
server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() { final ByteBuffer byteBuffer=ByteBuffer.allocate(1024); @Override public void completed(AsynchronousSocketChannel result, Object attachment) { System.out.println("执行的线程为:"+Thread.currentThread().getName()); Future<Integer> writeResult=null; try { byteBuffer.clear(); result.read(byteBuffer).get(100, TimeUnit.SECONDS); byteBuffer.flip(); writeResult=result.write(byteBuffer); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (TimeoutException e) { e.printStackTrace(); }finally { try { server.accept(null,this); writeResult.get(); result.close(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } } }
@Override public void failed(Throwable exc, Object attachment) { System.out.println("失败"+exc); }
}); } }
|
这是echo服务器,它的实际代码量其实非常的少,只有buffer那一段才是最为重要的。这里的AsynchronousServerSocketChannel调用了accept和read和write,它们的共同点就是,都继承了一个叫做CompletionHandler的接口,而这个CompletionHandler的接口又能够再接收一个AsynchronousSocketChannel,这样,服务器开始了它的俄罗斯套娃之旅。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public abstract <A> void accept(A attachment, CompletionHandler<AsynchronousSocketChannel,? super A> handler);
public final <A> void write(ByteBuffer src, A attachment, CompletionHandler<Integer,? super A> handler)
{ write(src, 0L, TimeUnit.MILLISECONDS, attachment, handler); }
public final <A> void read(ByteBuffer dst, A attachment, CompletionHandler<Integer,? super A> handler) { read(dst, 0L, TimeUnit.MILLISECONDS, attachment, handler); }
|
值得注意的是,这样的函数式编程思维,使得AIO更为简洁,每次只有服务器接收到了数据才会返回。函数式编程在《Java8实战》中有着重的讲述,也算是JDK8或以上才拥有的特点。而我们也可以应用到客户端上:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.AsynchronousSocketChannel; import java.nio.channels.CompletionHandler;
public class AIOClient { public static void main(String[] args) throws IOException, InterruptedException { final AsynchronousSocketChannel client= AsynchronousSocketChannel.open(); client.connect(new InetSocketAddress("localhost", 8000), null, new CompletionHandler<Void, Object>() { @Override public void completed(Void result, Object attachment) { client.write(ByteBuffer.wrap("hello!".getBytes()),null, new CompletionHandler<Integer,Object>(){ @Override public void completed(Integer result, Object attachment) { try { ByteBuffer buffer=ByteBuffer.allocate(1024); client.read(buffer, buffer, new CompletionHandler<Integer, ByteBuffer>() { @Override public void completed(Integer result, ByteBuffer buffer) { buffer.flip(); System.out.println(new String(buffer.array())); try { client.close(); } catch (IOException e) { e.printStackTrace(); } }
@Override public void failed(Throwable exc, ByteBuffer attachment) {
} }); } catch (Exception e) { e.printStackTrace(); } }
@Override public void failed(Throwable exc, Object attachment) {
} }); }
@Override public void failed(Throwable exc, Object attachment) {
} }); Thread.sleep(1000); } }
|
这里使用了connect去连接服务器,连接的时候,传入了一个CompletionHandler接口,完成连接之后便开始写入,而在写入的参数中又有一个CompletionHandler接口,完成它的读回,读取的过程中又有一个CompletionHandler,继续完成它的关闭连接操作,这样,它们便连接起来的了。每当有数据发送至服务端时,便可以立即被返回,不在服务器逗留。AIO的方式,又极大的提高了效率。