NIO相关基础篇二
转载请注明原创出处,谢谢!
上篇NIO相关基础篇一,主要介绍了一些基本的概念以及缓冲区(Buffer)和通道(Channel),本篇继续NIO相关话题内容,主要就是文件锁、以及比较关键的Selector,后续还会继续有一到二篇左右与NIO内容相关。
文件锁(FileLock)
在看RocketMQ源码中,发现有关于文件锁的import,但是具体使用代码里面注释调了[回头看看为什么,理解下,到时候会在某篇文章里进行说明](实现一个事情的方法很多,所以不一定就一种),但是为了知识的完整性,还是准备讲下文件锁,可能以后或者那个地方可以使用,或者大家在那里使用到都可以继续留言讨论。
文件锁和其他我们了解并发里面的锁很多概念类似,当多个人同时操作一个文件的时候,只有第一个人可以进行编辑,其他要么关闭(等第一个人操作完成之后可以操作),要么以只读的方式进行打开。
在java nio中提供了新的锁文件功能,当一个线程将文件锁定之后,其他线程无法操作此文件,文件的锁操作是使用FileLock类来进行完成的,此类对象需要依赖FileChannel进行实例化。
文件锁方式:
- 共享锁:允许多个线程进行文件读取。
- 独占锁:只允许一个线程进行文件的读写操作。
备注:文件锁定以整个 Java 虚拟机来保持。但它们不适用于控制同一虚拟机内多个线程对文件的访问。多个并发线程可安全地使用文件锁定对象。
Java文件依赖FileChannel的主要涉及如下4个方法:
方法 | 说明 |
---|---|
lock() | 获取对此通道的文件的独占锁定。 |
lock(long position, long size, boolean shared) | 获取此通道的文件给定区域上的锁定。 |
tryLock() throws IOException | 试图获取对此通道的文件的独占锁定。 |
tryLock(long position, long size, boolean shared) throws IOException | 试图获取对此通道的文件给定区域的锁定。 |
无 | lock()等同于lock(0L, Long.MAX_VALUE, false) |
无 | tryLock()等同于tryLock(0L, Long.MAX_VALUE, false) |
lock()和tryLock()的区别:
- lock()阻塞的方法,锁定范围可以随着文件的增大而增加。无参lock()默认为独占锁;有参lock(0L, Long.MAX_VALUE, true)为共享锁。
- tryLock()非阻塞,当未获得锁时,返回null。无参tryLock()默认为独占锁;有参tryLock(0L, Long.MAX_VALUE, true)为共享锁。
简单实例代码:
File file = new File("d:" + File.separator + "test.txt") ; FileOutputStream output = null ; FileChannel fout = null ; try { output = new FileOutputStream(file,true) ; fout = output.getChannel() ;// 得到通道 FileLock lock = fout.tryLock() ; // 进行独占锁的操作 if(lock!=null){ System.out.println(file.getName() + "文件锁定") ; Thread.sleep(5000) ; lock.release() ; // 释放 System.out.println(file.getName() + "文件解除锁定。") ; } } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(fout!=null){ try { fout.close(); } catch (IOException e) { e.printStackTrace(); } } if(output!=null){ try { output.close(); } catch (IOException e) { e.printStackTrace(); } } }
运行结果:
**说明:**目前也很少接触到关于文件锁的灵活运用,RocketMQ那块代码给注释了,如果那我了解,欢迎留言讨论。
Selector
说明:
- FileChannel是可读可写的Channel,它必须阻塞,不能用在非阻塞模式中。
- SocketChannel与FileChannel不同:新的Socket Channel能在非阻塞模式下运行并且是可选择的。不再需要为每个socket连接指派线程了。使用新的NIO类,一个或多个线程能管理成百上千个活动的socket连接,使用Selector对象可以选择可用的Socket Channel。
以前的Socket程序是阻塞的,服务器必须始终等待客户端的连接,而NIO可以通过Selector完成非阻塞操作。
备注:其实NIO主要的功能是解决服务端的通讯性能。上篇NIO相关基础篇一的知识,马上这块也是需要使用到的。
Selector一些主要方法:
方法 | 说明 |
---|---|
open() | 打开一个选择器。 |
select() | 选择一组键,其相应的通道已为 I/O 操作准备就绪。 |
selectedKeys() | 返回此选择器的已选择键集。 |
SelectionKey的四个重要常量:
字段 | 说明 |
---|---|
OP_ACCEPT | 用于套接字接受操作的操作集位。 |
OP_CONNECT | 用于套接字连接操作的操作集位。 |
OP_READ | 用于读取操作的操作集位。 |
OP_WRITE | 用于写入操作的操作集位。 |
**说明:**其实四个常量就是Selector监听SocketChannel四种不同类型的事件。
如果你对不止一种事件感兴趣,那么可以用"位或"操作符将常量连接起来,如下:int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
#NIO 简单实例代码服务端代码:
int port = 8000; // 通过open()方法找到Selector Selector selector = Selector.open(); // 打开服务器的通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); // 服务器配置为非阻塞 serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); InetSocketAddress address = new InetSocketAddress(port); // 进行服务的绑定 serverSocket.bind(address); // 注册到selector,等待连接 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务器运行,端口:" + 8000); // 数据缓冲区 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (true) { if ((selector.select()) > 0) { // 选择一组键,并且相应的通道已经准备就绪 Set<SelectionKey> selectedKeys = selector.selectedKeys();// 取出全部生成的key Iterator<SelectionKey> iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // 取出每一个key if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 接收新连接 和BIO写法类是都是accept SocketChannel client = server.accept(); // 配置为非阻塞 client.configureBlocking(false); byteBuffer.clear(); // 向缓冲区中设置内容 byteBuffer.put(("当前的时间为:" + new Date()).getBytes()); byteBuffer.flip(); // 输出内容 client.write(byteBuffer); client.register(selector, SelectionKey.OP_READ); } else if (key.isReadable() && key.isValid()) { SocketChannel client = (SocketChannel) key.channel(); byteBuffer.clear(); // 读取内容到缓冲区中 int readSize = client.read(byteBuffer); if (readSize > 0) { System.out.println("服务器端接受客户端数据:" + new String(byteBuffer.array(), 0, readSize)); client.register(selector, SelectionKey.OP_WRITE); } } else if (key.isWritable() && key.isValid()) { SocketChannel client = (SocketChannel) key.channel(); byteBuffer.clear(); // 向缓冲区中设置内容 byteBuffer.put(("欢迎关注匠心零度,已经收到您的反馈,会第一时间回复您。感谢支持!!!").getBytes()); byteBuffer.flip(); // 输出内容 client.write(byteBuffer); client.register(selector, SelectionKey.OP_READ); } } selectedKeys.clear(); // 清楚全部的key } }
客户端代码:
// 打开socket通道 SocketChannel socketChannel = SocketChannel.open(); // 设置为非阻塞方式 socketChannel.configureBlocking(false); // 通过open()方法找到Selector Selector selector = Selector.open(); // 注册连接服务端socket动作 socketChannel.register(selector, SelectionKey.OP_CONNECT); // 连接 socketChannel.connect(new InetSocketAddress("127.0.0.1", 8000)); /* 数据缓冲区 */ ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (true) { if ((selector.select()) > 0) { // 选择一组键,并且相应的通道已经准备就绪 Set<SelectionKey> selectedKeys = selector.selectedKeys();// 取出全部生成的key Iterator<SelectionKey> iter = selectedKeys.iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); // 取出每一个key if (key.isConnectable()) { SocketChannel client = (SocketChannel) key.channel(); if (client.isConnectionPending()) { client.finishConnect(); byteBuffer.clear(); // 向缓冲区中设置内容 byteBuffer.put(("isConnect,当前的时间为:" + new Date()).getBytes()); byteBuffer.flip(); // 输出内容 client.write(byteBuffer); } client.register(selector, SelectionKey.OP_READ); } else if (key.isReadable() && key.isValid()) { SocketChannel client = (SocketChannel) key.channel(); byteBuffer.clear(); // 读取内容到缓冲区中 int readSize = client.read(byteBuffer); if (readSize > 0) { System.out.println("客户端接受服务器端数据:" + new String(byteBuffer.array(), 0, readSize)); client.register(selector, SelectionKey.OP_WRITE); } } else if (key.isWritable() && key.isValid()) { SocketChannel client = (SocketChannel) key.channel(); byteBuffer.clear(); // 向缓冲区中设置内容 byteBuffer.put(("nio文章学习很多!").getBytes()); byteBuffer.flip(); // 输出内容 client.write(byteBuffer); client.register(selector, SelectionKey.OP_READ); } } selectedKeys.clear(); // 清楚全部的key } }
程序运行结果截图:
**说明:**上面仅仅是一个demo,其实使用nio开发复杂度很高的,需要考虑:链路的有效性校验机制(安全认证、半包和粘包等)、链路的断连重连机制(网络闪断重连)、可靠性设计(心跳检测,消息重发、黑白名单)以及可扩展性的考虑等等,这些都是很复杂,那里搞不好就容易出错,看netty 权威指南的时候 记得作者他们那个时候还没有netty,经常出现一些莫名问题需要进行解决,而很多问题netty已经帮我们解决了,所以有必要好好看看netty了(目前作者也在看netty权威指南,唯一不爽的时候,里面大量代码,习惯用工具查看代码(编辑器查看代码变色,可以跳转等),求netty权威指南代码地址,看书里面代码特别变扭!,谢谢)
简单聊几句AIO
虽然NIO在网络操作中提供了非阻塞方法,但是NIO的IO行为还是同步的,对于NIO来说,我们的业务线程是在IO操作准备好时,才得到通知,接着就有这个线程自行完成IO操作,但是IO操作的本身其实还是同步的。
AIO是异步IO的缩写,相对与NIO来说又进了一步,它不是在IO准备好时再通知线程,而是在IO操作完成后在通知线程,所以AIO是完全不阻塞的,我们的业务逻辑看起来就像一个回调函数了。
**备注:**AIO就是简单提提,NIO还没有搞明白,后续还有一到二篇左右与NIO内容相关,主要谈谈一些select、poll、epoll、零拷贝等一些内容,如果有关零拷贝比较好的资料,欢迎在留言区进行留言,让零度也学习下,系统的了解下,谢谢!!!
如果读完觉得有收获的话,欢迎点赞、关注、加公众号【匠心零度】。
个人公众号,欢迎关注,查阅更多精彩历史!!!