Java IO与NIO总结

Posted by lan_cyl on August 17, 2016
IO NIO
面向流 面向缓存
阻塞IO 非阻塞IO
选择器

NIO简介

NIO包(java.nio.* )主要包含一下内容:

  1. Buffer,包含数据且用于读写的线性表结构,跟某一特定类型绑定;
  2. charset,处理ByteBuffer中的字符编码问题
  3. channel,用来读写Buffer,静态方法创建Stream或Reader以与传统IO相容
  4. Selector,选择可用通道,对每个通道注册感兴趣的事件,事件发生时该通道可用
  5. files,工具类,封装了对文件的操作:copy, move, createXxx, delete, exists, getAttribute, isXxx, newBufferedReader, newByteChannel, newInputStream, readAllBytes, readAllLines, size, write

SocketChannel客户端程序

客户端用阻塞方式打开SocketChannel

注意Buffer操作:clear 当前指针指向0,限制指针指向尾,可以在整个buf内写入;flip 限制指针指向当前位置,当前指针指向0,只能读当前写入的数据

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;

public class EchoClient {
	private final Charset CHARSET = Charset.forName("UTF-8");
	private final ByteBuffer buf = ByteBuffer.allocate(1024);
	private SocketChannel channel;

	public EchoClient(String ip, int port) {
		try {
			channel = SocketChannel.open(new InetSocketAddress(ip, port));
			channel.configureBlocking(true);
			System.out.println("客户端发起连接成功");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void sendMsg(String msg) {
		try {
			channel.write(CHARSET.encode(msg));
			System.out.println("客户端发送消息成功");
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private void rcvMsg() {
		try {
			buf.clear();
			if (channel.read(buf) > 0) {
				buf.flip();
				System.out.println("客户端收到消息:" + CHARSET.decode(buf).toString());
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String[] args) {
		EchoClient client = new EchoClient("127.0.0.1", 8081);

		Scanner sc = new Scanner(System.in);
		String msg = null;
		while (sc.hasNext()) {
			msg = sc.nextLine();
			if ("quit".equals(msg))
				return;
			client.sendMsg(msg);
			client.rcvMsg();
		}
	}
}

ServerSocketChannel服务器程序

一个连接一个线程的处理方式,似乎没人这样用;一个请求一个线程比较合理,见下节

网上搜了下,Netty等框架用的是 bind,accept与read,write的线程分离,connect与read、write线程分离;;;较复杂,线程也较多,所以效率并不一定优于下节基于事件的线程处理方式

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * echo server
 *
 * receive client's msg, then response.
 *
 * @author yonglecai
 *
 */
public class EchoServer implements Runnable {
	private ServerSocketChannel ssc;
	private final ExecutorService pool;

	public EchoServer(int port, int poolSize) {
		pool = Executors.newFixedThreadPool(poolSize);
		try {
			ssc = ServerSocketChannel.open();
			ssc.socket().bind(new InetSocketAddress(port));
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		try {
			for (;;) {
				pool.execute(new Handler(ssc.accept()));
			}
		} catch (IOException ex) {
			ex.printStackTrace();
		} finally {
			pool.shutdown();
		}
	}
}

class Handler implements Runnable {
	private final ByteBuffer buf = ByteBuffer.allocate(1024);
	private final SocketChannel channel;

	Handler(SocketChannel sc) {
		this.channel = sc;
		System.out.println("oh! 来了个新客户端");
	}

	public void run() {
		if (channel == null)
			return;

		try {
			int count = 0;
			for (;;) {
				if ((count = channel.read(buf)) > 0) {
					buf.flip();
					while (buf.hasRemaining())
						channel.write(buf);

					buf.clear();
				} else if (count < 0) {// 传输结束了
					return;
				}
			}
		} catch (IOException e) {
			e.printStackTrace();
		} finally {
			try {
				channel.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		System.out.println("服务器开启中...");
		new Thread(new EchoServer(8081, 4)).start();
		System.out.println("服务器开启成功");
	}
}

Selector重写服务端

上面的服务器程序使用线程池来处理每个连接,连接过多且连接时间都很长时,线程池就堵塞了

Selector面向每个事件,有新的连接来了我连接一下,有新的请求来了我处理一下,这更加符合线程池,每次处理一个小事件,不会对其他连接阻塞

Selector这里有个坑啊,对于read事件,如果把读取请求的操作放到线程里执行,此时该线程有可能阻塞,导致Selector判断该事件仍然有效,多次抛出线程处理这一个事件

处理accept的线程池和read write逻辑业务的线程池可以分开

accept如果用线程池处理,要注意selector的同步

相应的channel可读、到达文件尾端、远端关闭、未处理的错误,都会将当前key加到selected-key set里。所以客户端关闭连接的时候,read会返回负值,我们也可以关闭连接了

换句话说,可read 的时候,读到的数据并不一定完整,比如write buffer满了,发来了部分数据,这时候就要对数据进行校验,确定是否和下次数据合并处理

package nioDemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * echo server
 *
 * @author yonglecai
 *
 */
public class EchoServerSelector implements Runnable {
	private Selector sel;
	private ServerSocketChannel ssc;
	private final ExecutorService pool;
	private final int TIMEOUT = 3000;

	public EchoServerSelector(int port, int poolSize) {
		pool = Executors.newFixedThreadPool(poolSize);
		try {
			ssc = ServerSocketChannel.open();
			ssc.configureBlocking(false);
			ssc.socket().bind(new InetSocketAddress(port));

			sel = Selector.open();
			ssc.register(sel, SelectionKey.OP_ACCEPT);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	@Override
	public void run() {
		try {
			Iterator<SelectionKey> itr;
			SelectionKey key;
			for (;;) {
				if (sel.select(TIMEOUT) == 0) {
					System.out.print(" " + sel.keys().size());
					continue;
				}
				itr = sel.selectedKeys().iterator();
				while (itr.hasNext()) {
					key = itr.next();
					itr.remove();
					if (key.isAcceptable()) {// 新连接
						System.out.println("oh! 来了个新客户端");
						pool.execute(new AcceptHandler(sel, ((ServerSocketChannel) key.channel()).accept()));
					} else if (key.isReadable()) {// 新请求,就是这里有个坑,要在这里读完请求,在给线程处理,如果把请求的读取也放到线程里,由于线程执行的不确定性,selector会重复对这一请求抛出线程!!!
						System.out.println("oh! 来了个新请求");
						SocketChannel channel = (SocketChannel) key.channel();

						int count = 0;
						ByteBuffer buf = ByteBuffer.allocate(1024);// 为每个线程分配一个buffer
						if ((count = channel.read(buf)) > 0) {// 读取请求数据到buffer
							pool.execute(new ReadHandler(channel, buf));
						} else if (count < 0) {// 客户端关闭的时候,这里会接到一个请求,咱也关了
							channel.close();
						}
					}
				}
			}
		} catch (IOException ex) {
			ex.printStackTrace();
		} finally {
			try {
				sel.close();
				ssc.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
			pool.shutdown();
		}
	}

  // 处理accept的任务:将新来的连接注册到selector里
	class AcceptHandler implements Runnable {
		private final Selector sel;
		private final SocketChannel channel;

		AcceptHandler(Selector sel, SocketChannel channel) {
			this.sel = sel;
			this.channel = channel;
		}

		@Override
		public void run() {
			try {
				channel.configureBlocking(false);
				channel.register(sel, SelectionKey.OP_READ);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}
  // 处理客户端请求的任务:根据客户发来的请求,进行回复
	class ReadHandler implements Runnable {
		private final ByteBuffer buf;
		private final SocketChannel channel;

		ReadHandler(SocketChannel channel, ByteBuffer buf) {
			this.channel = channel;
			this.buf = buf;
		}

		public void run() {
			try {
				// 处理请求并返回结果,这里可能做大量工作哦
				buf.flip();
				while (buf.hasRemaining())
					channel.write(buf);
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
	}

	public static void main(String[] args) {
		System.out.println("服务器开启中...");
		new Thread(new EchoServerSelector(8081, 4)).start();
		System.out.println("服务器开启成功");
	}
}

Creative Commons License
This work is licensed under a CC A-S 4.0 International License.