IO如何实现

描述

IO模型

我们的程序基本上都是对数据的IO操作以及基于CPU的运算。

基于Java的开发大部分是网络相关的编程,不管是基于如Tomcat般的Web容器,或是基于Netty开发的应用间的RPC服务。为了提供系统吞吐量, 降低硬件资源的开销,IO模型也在不断适应大规模、高并发需求不断演进,今天我们就来看看这个在网络上高频出现的词汇IO模型

linux IO模型

首先我们要明确,用户程序从计算机硬件读取数据(包括文件、网络数据等),会经历数据从硬件设备中读取到系统内核后,再拷贝到用户空间的过程。在linux系统中,针对这一操作提供了5种IO模型用于优化不同场景下的IO操作。

  • 同步阻塞IO 系统程序调用recvfrom阻塞等待内核将数据准备(从网卡将数据读取到内存中)。之后用户通过recvfrom等待内核将数据准备好,此时内核将数据从内核缓冲区复制到用户态缓冲区。

Linux系统

blocking I/O发起system call recvfrom()时,进程将一直阻塞等待另一端Socket的数据到来。在该模式下,会阻塞其他连接的建立,因此一般都会通过多线程处理Socket数据的读取。

Blocking I/O优点是简单易用,对于本地I/O而言性能很高。缺点是处理网络I/O时,造成进程阻塞,以及创建线程的资源消耗。

  • 同步非阻塞IO
    系统程序调用recvfrom时并不会阻塞等待,但是需要调用方不停的去轮询内核,获取数据准备状态。之后用户发起的(同步)recvfrom检查到内核将数据准备好后,进行数据由内核到用户空间的复制。

Linux系统

相对于阻塞I/O的等待,非阻塞I/O隔一段时间就就需要发起system call判断数据是否就绪。如果数据就绪,就从kernel space复制到user space,操作数据; 否则,kernel会立即返回EWOULDBLOCK这个错误。

recvfrom有个参数叫flags,默认情况下阻塞。可以设置flag为非阻塞让kernel在数据未就绪时直接返回。这就是”非阻塞”主要是指数据准备阶段。

  • IO多路复用
    系统程序调用select/poll/epoll会阻塞等待至少有一个套接字就绪则返回。用户(同步)调用recvfrom,获取这些就绪的套接字,轮询将数据由内核复制到用户态缓冲区。

Linux系统

I/O Multiplexing首先向kernel发起system call,传入file descriptor和感兴趣的事件(readable、writable等)让kernel监测, 当其中一个或多个fd数据就绪,就会返回结果。程序再发起真正的I/O操作recvfrom读取数据。

  • 信号驱动IO
    系统调用sigaction不会阻塞。当数据准备完成之后,会主动的通知用户进程数据已经准备完成,对用户进程做一个回调。用户发起的(同步)recvfrom将就绪的数据由内核复制到用户态缓冲区。

Linux系统

第一次发起system call不会阻塞进程,kernel的数据就绪后会发送一个signal给进程。进而发起真正的IO操作。

  • 异步IO
    系统调用aio_read不会阻塞。直到I/O数据准备好内核会直接将数据复制到用户空间,然后内核主动会给用户进程发送通知,告诉用户进程信号表示并进行数据处理。

Linux系统

既然说到异步IO,则前面的几种IO模型都是同步的,由上图可以看到,在数据拷贝(内核态到用户态)时,仍然是阻塞的。在异步IO中,请求连接到内核后,从数据准备到复制整个过程 都是在内核中完成,对应用户程序不会阻塞,直到请求数据完全准备好后,通过回调函数通知用户程序完成整个IO操作。

Java中的IO模型

Java中提供的IO相关的API,主要是基于操作系统底层的IO的操作。在Java中的BIO、NIO、AIO属于Java对操作系统的各种IO模型的封装。当我们使用这些API时,不用关注底层IO的实现。

  • BIO

同步阻塞IO ,服务端通过阻塞输入流来监听客户端是否有数据写入,当处理输入数据时,程序会等待内核完成处理完成并返回后才会继续执行。

Linux系统

上图可以看到,服务端通过ServerSocket#accept阻塞方法监听客户端的接入,然后阻塞在通过阻塞输入流等待客户端的输入,如果一直没有输入,则其他客户端都会被阻塞在此。

Linux系统

我们可以通过多线程来改善,每个客户端连接时,都由独立的线程来处理,虽然通过多线程可以解决客户端间的阻塞问题,但单个线程内然是阻塞模式, 并且当客户端过多时需要足够的线程来支持,比较耗费系统资源。

Linux系统

  • NIO

同步非阻塞IO ,基于多路复用模型,依赖于服务器操作系统,通过一个Selector即可监听多个连接,并进行IO处理。但要注意,如果处理IO的过程较长一样会影响到其他的连接。

Linux系统

服务端通过Selector#select阻塞方法,监听Channel状态,一旦有Channel准备就绪,程序才会继续往下执行,因此需要不断轮询并监控Channel的状态变更。与BIO的多线程模式非常相似,只不过BIO是基于多线程技术实现,而NIO是基于操作系统底层提供的函数,效率更好且资源消耗更少。

Linux系统

  • AIO

异步非阻塞IO ,在JDK1.7之后提供了异步的相关Channel,AIO提供异步功能, 基于回调函数实现 ,同样依赖于操作系统底层的异步IO模型,异步操作的实现是在对应的 accept、connection、read、write等方法异步执行,完成后会主动调用回调函数。

Linux系统

其中accept、read等方法都是非阻塞的,即立即返回结果,几乎所有的异步操作都是基于回调函数实现,这种方式不管是对操作系统资源的利用以及效率上都是最佳的实现。

Linux系统

虽然三种IO模型的演进是为了提升系统处理IO的能力,但是开发的复杂度也同步上升:

  • BIO方式适用于连接数目比较小且固定的架构,需要依赖于线程来支持多个客户端接入,但程序直观简单易理解。
  • NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂。
  • AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂。

同/异步与(非)阻塞

关于阻塞、非阻塞、同步、异步这些名词的解释,可以在网上找到很多解释,但是如何能够从本质上描述其含义,正如IO与NIO中说到的阻塞与非阻塞,又是怎么体现的呢?

我们一般说说的IO模型,其实是服务端进行IO操作执行与实现的形式,程序将数据从程序写入或读写时,与硬件设备(比如硬盘、网卡)间,基于操作系统提供的系统api实现数据由用户态与内核态交互的一种形式。

  • 同步
    程序执行需要等待返回后才会继续。
  • 异步
    与同步相反,比较直观的就是线程。
  • 阻塞IO
    程序需要等待内核IO操作完成后返回到用户空间继续执行用户程序的操作指令。这里的阻塞主要是调用操作系统api被阻塞导致程序挂起,描述的是程序当前执行的状态。
  • 非阻塞IO
    既然阻塞是调用操作系统api被阻塞,那么非阻塞则相反,得益于操作系统提供的函数支持,一般是通过轮询机制与回调函数实现。

同步与异步属于程序发起请求的方式;阻塞与非阻塞属于服务响应IO操作的底层实现方式。

示例

基于上面的理解,我们看下在Java中如何实现BIO、NIO以及AIO。

BIO

Server:

serverSocket = new ServerSocket(port);
  // 阻塞直到有连接
  Socket clientSocket = serverSocket.accept();
  // 阻塞读取数据
  BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  log.info(" > > > > > Server接收消息:{}" , reader.readLine());
  socket.shutdownInput();
  
  log.info(" > > > > > Server回复消息:{}" , message);
  PrintWriter writer = new PrintWriter(socket.getOutputStream());
  writer.println(message);

Client:

// 连接服务端
  socket = new Socket("127.0.0.1",port);
  OutputStream out = socket.getOutputStream();
  out.write(message.getBytes());
  socket.shutdownOutput();
  
  BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
  log.info("接收Server回复:{}", reader.readLine());

NIO

省略

AIO

Server:

//
    serverSocketChannel = AsynchronousServerSocketChannel.open();
    //绑定端口
    serverSocketChannel.bind(new InetSocketAddress(port));
    //异步接收客户端连接
    serverSocketChannel.accept(null, new AcceptCompletionHandler< String >());

    /**
     * 处理客户端连接
     * @param < T >
     */
    public class AcceptCompletionHandler< T > implements CompletionHandler< AsynchronousSocketChannel,T > {

        @Override
        public void completed(AsynchronousSocketChannel result, T attachment) {
            log.info(" > > > 客户端接入...");
            ByteBuffer byteBuffer = ByteBuffer.allocate(512);
            //异步读客户端数据
            result.read(byteBuffer, byteBuffer, new ReadCompletionHandler());
            //接收其他的客户端连接的
            serverSocketChannel.accept(null, this);
        }

        @Override
        public void failed(Throwable exc, T attachment) {
            log.error(" > > > 客户端接入失败:{}", exc.getMessage());
        }
    }

    /**
     * 处理ServerChannel读取
     * @param < T >
     */
    public class ReadCompletionHandler< T extends Buffer > implements CompletionHandler< Integer, T >{

        @Override
        public void completed(Integer result, T attachment) {
            if(attachment.hasRemaining()){
                // 切换成读模式
                attachment.flip();
                //
                if( attachment instanceof ByteBuffer ){
                    byte[] bytes = new byte[attachment.remaining()];
                    ((ByteBuffer)attachment).get(bytes); // 从Buffer中取数据 get
                    log.info("Server接收消息:{}", new String(bytes));
                }
            }
        }

        @Override
        public void failed(Throwable exc, T attachment) {
            log.error("Server接收消息失败:{}", exc.getMessage());
        }
    }

Client:

//创建异步通道实例
    socketChannel = AsynchronousSocketChannel.open();
    //连接服务端,异步方式
    socketChannel.connect(new InetSocketAddress("127.0.0.1",port), null, new ConnetionComplateHandler());
    // 消息发送
    this.socketChannel.write(Charset.defaultCharset().encode(message));
    /**
     *
     * @param < T >
     */
    public class ConnetionComplateHandler< T > implements CompletionHandler< Void, T > {

        @Override
        public void completed(Void result, T attachment) {
            log.info("Client连接服务的成功...");
        }

        @Override
        public void failed(Throwable exc, T attachment) {

        }
    }

结束语

通过了解操作系统层面的IO模型可以让我们理解IO是如何实现,以及通过Java语言提供的类库实现了操作系统底层API调用的复杂性。

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分