IO | NIO |
---|---|
面向流 | 面向缓存 |
阻塞IO | 非阻塞IO |
无 | 选择器 |
NIO简介
NIO包(java.nio.* )主要包含一下内容:
- Buffer,包含数据且用于读写的线性表结构,跟某一特定类型绑定;
- charset,处理ByteBuffer中的字符编码问题
- channel,用来读写Buffer,静态方法创建Stream或Reader以与传统IO相容
- Selector,选择可用通道,对每个通道注册感兴趣的事件,事件发生时该通道可用
- files,工具类,封装了对文件的操作:copy, move, createXxx, delete, exists, getAttribute, isXxx, newBufferedReader, newByteChannel, newInputStream, readAllBytes, readAllLines, size, write
SocketChannel客户端程序
客户端用阻塞方式打开SocketChannel
注意Buffer操作:clear 当前指针指向0,限制指针指向尾,可以在整个buf内写入;flip 限制指针指向当前位置,当前指针指向0,只能读当前写入的数据
package nioDemo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Scanner;
public class EchoClient {
private final Charset CHARSET = Charset.forName("UTF-8");
private final ByteBuffer buf = ByteBuffer.allocate(1024);
private SocketChannel channel;
public EchoClient(String ip, int port) {
try {
channel = SocketChannel.open(new InetSocketAddress(ip, port));
channel.configureBlocking(true);
System.out.println("客户端发起连接成功");
} catch (IOException e) {
e.printStackTrace();
}
}
private void sendMsg(String msg) {
try {
channel.write(CHARSET.encode(msg));
System.out.println("客户端发送消息成功");
} catch (IOException e) {
e.printStackTrace();
}
}
private void rcvMsg() {
try {
buf.clear();
if (channel.read(buf) > 0) {
buf.flip();
System.out.println("客户端收到消息:" + CHARSET.decode(buf).toString());
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
EchoClient client = new EchoClient("127.0.0.1", 8081);
Scanner sc = new Scanner(System.in);
String msg = null;
while (sc.hasNext()) {
msg = sc.nextLine();
if ("quit".equals(msg))
return;
client.sendMsg(msg);
client.rcvMsg();
}
}
}
ServerSocketChannel服务器程序
一个连接一个线程的处理方式,似乎没人这样用;一个请求一个线程比较合理,见下节
网上搜了下,Netty等框架用的是 bind,accept与read,write的线程分离,connect与read、write线程分离;;;较复杂,线程也较多,所以效率并不一定优于下节基于事件的线程处理方式
package nioDemo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* echo server
*
* receive client's msg, then response.
*
* @author yonglecai
*
*/
public class EchoServer implements Runnable {
private ServerSocketChannel ssc;
private final ExecutorService pool;
public EchoServer(int port, int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
try {
ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress(port));
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
for (;;) {
pool.execute(new Handler(ssc.accept()));
}
} catch (IOException ex) {
ex.printStackTrace();
} finally {
pool.shutdown();
}
}
}
class Handler implements Runnable {
private final ByteBuffer buf = ByteBuffer.allocate(1024);
private final SocketChannel channel;
Handler(SocketChannel sc) {
this.channel = sc;
System.out.println("oh! 来了个新客户端");
}
public void run() {
if (channel == null)
return;
try {
int count = 0;
for (;;) {
if ((count = channel.read(buf)) > 0) {
buf.flip();
while (buf.hasRemaining())
channel.write(buf);
buf.clear();
} else if (count < 0) {// 传输结束了
return;
}
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println("服务器开启中...");
new Thread(new EchoServer(8081, 4)).start();
System.out.println("服务器开启成功");
}
}
Selector重写服务端
上面的服务器程序使用线程池来处理每个连接,连接过多且连接时间都很长时,线程池就堵塞了
Selector面向每个事件,有新的连接来了我连接一下,有新的请求来了我处理一下,这更加符合线程池,每次处理一个小事件,不会对其他连接阻塞
Selector这里有个坑啊,对于read事件,如果把读取请求的操作放到线程里执行,此时该线程有可能阻塞,导致Selector判断该事件仍然有效,多次抛出线程处理这一个事件
处理accept的线程池和read write逻辑业务的线程池可以分开
accept如果用线程池处理,要注意selector的同步
相应的channel可读、到达文件尾端、远端关闭、未处理的错误,都会将当前key加到selected-key set里。所以客户端关闭连接的时候,read会返回负值,我们也可以关闭连接了
换句话说,可read 的时候,读到的数据并不一定完整,比如write buffer满了,发来了部分数据,这时候就要对数据进行校验,确定是否和下次数据合并处理
package nioDemo;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* echo server
*
* @author yonglecai
*
*/
public class EchoServerSelector implements Runnable {
private Selector sel;
private ServerSocketChannel ssc;
private final ExecutorService pool;
private final int TIMEOUT = 3000;
public EchoServerSelector(int port, int poolSize) {
pool = Executors.newFixedThreadPool(poolSize);
try {
ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(port));
sel = Selector.open();
ssc.register(sel, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
try {
Iterator<SelectionKey> itr;
SelectionKey key;
for (;;) {
if (sel.select(TIMEOUT) == 0) {
System.out.print(" " + sel.keys().size());
continue;
}
itr = sel.selectedKeys().iterator();
while (itr.hasNext()) {
key = itr.next();
itr.remove();
if (key.isAcceptable()) {// 新连接
System.out.println("oh! 来了个新客户端");
pool.execute(new AcceptHandler(sel, ((ServerSocketChannel) key.channel()).accept()));
} else if (key.isReadable()) {// 新请求,就是这里有个坑,要在这里读完请求,在给线程处理,如果把请求的读取也放到线程里,由于线程执行的不确定性,selector会重复对这一请求抛出线程!!!
System.out.println("oh! 来了个新请求");
SocketChannel channel = (SocketChannel) key.channel();
int count = 0;
ByteBuffer buf = ByteBuffer.allocate(1024);// 为每个线程分配一个buffer
if ((count = channel.read(buf)) > 0) {// 读取请求数据到buffer
pool.execute(new ReadHandler(channel, buf));
} else if (count < 0) {// 客户端关闭的时候,这里会接到一个请求,咱也关了
channel.close();
}
}
}
}
} catch (IOException ex) {
ex.printStackTrace();
} finally {
try {
sel.close();
ssc.close();
} catch (IOException e) {
e.printStackTrace();
}
pool.shutdown();
}
}
// 处理accept的任务:将新来的连接注册到selector里
class AcceptHandler implements Runnable {
private final Selector sel;
private final SocketChannel channel;
AcceptHandler(Selector sel, SocketChannel channel) {
this.sel = sel;
this.channel = channel;
}
@Override
public void run() {
try {
channel.configureBlocking(false);
channel.register(sel, SelectionKey.OP_READ);
} catch (IOException e) {
e.printStackTrace();
}
}
}
// 处理客户端请求的任务:根据客户发来的请求,进行回复
class ReadHandler implements Runnable {
private final ByteBuffer buf;
private final SocketChannel channel;
ReadHandler(SocketChannel channel, ByteBuffer buf) {
this.channel = channel;
this.buf = buf;
}
public void run() {
try {
// 处理请求并返回结果,这里可能做大量工作哦
buf.flip();
while (buf.hasRemaining())
channel.write(buf);
} catch (IOException e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) {
System.out.println("服务器开启中...");
new Thread(new EchoServerSelector(8081, 4)).start();
System.out.println("服务器开启成功");
}
}

This work is licensed under a CC A-S 4.0 International License.