NIO实现多用户聊天demo,通过demo深入理解NIO三大组件:buffer、channel、selector

Obelia ·
更新时间:2024-11-01
· 672 次阅读

目录运行效果截图:源代码:Server端:Client端:使用时注意:(持续更新) 运行效果截图:

启动server端:
在这里插入图片描述
依次启动client1、client2、client3并输入相应昵称:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
三个clent端各自说一句话(红框内为输入的话):
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

源代码: Server端: package demo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.Channel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.HashSet; import java.util.Iterator; import java.util.Set; /** * 网络多客户端聊天室 * 1. 支持多客户端的连接 * 2. 客户端初次连接时,服务端提示输入昵称,如果昵称已经有人使用,提示重新输入,如果昵称唯一,则登录成功,之后发送消息都需要按照规定格式带着昵称发送消息 * 3. 有客户端登录后,将当前登录人和在线人数发送所有客户端 * 4. 服务器收到已登录客户端输入内容,转发至其他所有登录客户端。 */ public class NIOChatServer { private int port = 8080; private Charset charset = Charset.forName("UTF-8"); //用来记录在线人数,以及昵称 private static HashSet users = new HashSet(); private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称"; //相当于自定义协议格式,与客户端协商好 //用于分割当前登录人信息和发送内容 private static String USER_CONTENT_SPILIT = "#@#"; private Selector selector = null; public NIOChatServer(int port) throws IOException{ this.port = port; //创建通道 ServerSocketChannel server = ServerSocketChannel.open(); //绑定端口号 server.bind(new InetSocketAddress(this.port)); //设置为非阻塞 server.configureBlocking(false); //创建选择器 selector = Selector.open(); //注册事件 //OP_ACCEPT就绪条件:当收到一个客户端的连接请求时,该操作就绪。这是ServerSocketChannel上唯一有效的操作。 //OP_CONNECT就绪条件:只有客户端SocketChannel会注册该操作,当客户端调用SocketChannel.connect()时,该操作会就绪。 //OP_READ就绪条件:该操作对客户端和服务端的SocketChannel都有效,当OS的读缓冲区中有数据可读时,该操作就绪。 //OP_WRITE就绪条件:该操作对客户端和服务端的SocketChannel都有效,当OS的写缓冲区中有空闲的空间时(大部分时候都有),该操作就绪。 server.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务已启动,监听端口是:" + this.port); } /* * 开始监听 */ public void listen() throws IOException{ while(true) { //当没有满足条件的事件的时候,会一直阻塞 //返回值wait,表示有多少通道已经就绪 int wait = selector.select(); if(wait == 0) continue; //可以通过这个方法,知道可用通道的集合 Set keys = selector.selectedKeys(); Iterator iterator = keys.iterator(); while(iterator.hasNext()) { SelectionKey key = (SelectionKey) iterator.next(); //移除,防止重复处理 iterator.remove(); //处理 process(key); } } } public void process(SelectionKey key) throws IOException { //当通道处于就绪状态并且注册的就绪条件为SelectionKey.OP_ACCEPT时: if(key.isValid() && key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel)key.channel(); SocketChannel client = server.accept(); //非阻塞模式 client.configureBlocking(false); //注册选择器,并设置为读取模式,收到一个连接请求,然后起一个SocketChannel,并注册到selector上,之后这个连接的数据,就由这个SocketChannel处理 client.register(selector, SelectionKey.OP_READ); //将此对应的channel设置为准备接受其他客户端请求 key.interestOps(SelectionKey.OP_ACCEPT); client.write(charset.encode("请输入你的昵称")); } //处理来自客户端的数据读取请求 if(key.isValid() && key.isReadable()){ //返回该SelectionKey对应的 Channel,其中有数据需要读取 SocketChannel client = (SocketChannel)key.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); StringBuilder content = new StringBuilder(); try{ while(client.read(buff) > 0) { buff.flip(); content.append(charset.decode(buff)); } //将此对应的channel设置为准备下一次接受数据 key.interestOps(SelectionKey.OP_READ); }catch (IOException io){ key.cancel(); if(key.channel() != null) { key.channel().close(); } } if(content.length() > 0) { String[] arrayContent = content.toString().split(USER_CONTENT_SPILIT); //注册用户 if(arrayContent != null && arrayContent.length == 1) { String nickName = arrayContent[0]; if(users.contains(nickName)) { client.write(charset.encode(USER_EXIST)); } else { users.add(nickName); int onlineCount = onlineCount(); String message = "欢迎 " + nickName + " 进入聊天室! 当前在线人数:" + onlineCount; broadCast(null, message); } } //注册完了,发送消息 else if(arrayContent != null && arrayContent.length > 1) { String nickName = arrayContent[0]; String message = content.substring(nickName.length() + USER_CONTENT_SPILIT.length()); message = nickName + " 说 " + message; if(users.contains(nickName)) { //不回发给发送此内容的客户端 broadCast(client, message); } } } } } public int onlineCount() { int res = 0; for(SelectionKey key : selector.keys()){ Channel target = key.channel(); if(target instanceof SocketChannel){ res++; } } return res; } public void broadCast(SocketChannel client, String content) throws IOException { //广播数据到所有的SocketChannel中 for(SelectionKey key : selector.keys()) { Channel targetchannel = key.channel(); //如果client不为空,不回发给发送此内容的客户端 if(targetchannel instanceof SocketChannel && targetchannel != client) { SocketChannel target = (SocketChannel)targetchannel; target.write(charset.encode(content)); } } } public static void main(String[] args) throws IOException { new NIOChatServer(8080).listen(); } } Client端: package demo; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Iterator; import java.util.Scanner; import java.util.Set; public class NIOChatClient { private final InetSocketAddress serverAdrress = new InetSocketAddress("localhost", 8080); private Selector selector = null; private SocketChannel client = null; private String nickName = ""; private Charset charset = Charset.forName("UTF-8"); private static String USER_EXIST = "系统提示:该昵称已经存在,请换一个昵称"; private static String USER_CONTENT_SPILIT = "#@#"; public NIOChatClient() throws IOException{ selector = Selector.open(); //连接远程主机的IP和端口 client = SocketChannel.open(serverAdrress); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); } public void session(){ //开辟一个新线程从服务器端读数据 new Reader().start(); //开辟一个新线程往服务器端写数据 new Writer().start(); } private class Writer extends Thread{ @Override public void run() { try{ //在主线程中 从键盘读取数据输入到服务器端 Scanner scan = new Scanner(System.in); while(scan.hasNextLine()){ String line = scan.nextLine(); if("".equals(line)) continue; //不允许发空消息 if("".equals(nickName)) { nickName = line; line = nickName + USER_CONTENT_SPILIT; } else { line = nickName + USER_CONTENT_SPILIT + line; } // client.register(selector, SelectionKey.OP_WRITE); client.write(charset.encode(line));//client既能写也能读,这边是写 } scan.close(); }catch(Exception e){ } } } private class Reader extends Thread { public void run() { try { while(true) { int readyChannels = selector.select(); if(readyChannels == 0) continue; Set selectedKeys = selector.selectedKeys(); //可以通过这个方法,知道可用通道的集合 Iterator keyIterator = selectedKeys.iterator(); while(keyIterator.hasNext()) { SelectionKey key = (SelectionKey) keyIterator.next(); keyIterator.remove(); process(key); } } } catch (IOException io){ } } private void process(SelectionKey key) throws IOException { if(key.isReadable()){ //使用 NIOServerDemoBak 读取 Channel中的数据,这个和全局变量client是一样的,因为只注册了一个SocketChannel //client既能写也能读,这边是读 SocketChannel sc = (SocketChannel)key.channel(); ByteBuffer buff = ByteBuffer.allocate(1024); String content = ""; while(sc.read(buff) > 0) { buff.flip(); content += charset.decode(buff); } //若系统发送通知名字已经存在,则需要换个昵称 if(USER_EXIST.equals(content)) { nickName = ""; } System.out.println(content); key.interestOps(SelectionKey.OP_READ); } } } public static void main(String[] args) throws IOException { new NIOChatClient().session(); } } 使用时注意:(持续更新) SelectKey注册了写事件,不在合适的时间去除掉,会一直触发写事件,因为写事件是代码触发的 client.register(selector, SelectionKey.OP_WRITE); 或者sk.interestOps(SelectionKey.OP_WRITE) 执行了这以上任一代码都会无限触发写事件,跟读事件不同,一定注意 每个channel只对应一个SelectionKey

▄█▀█●各位同仁,如果我的代码对你有帮助,请给我一个赞吧,为了下次方便找到,也可关注加收藏呀
如果有什么意见或建议,也可留言区讨论


作者:程序人生_小高



buffer 多用户 demo nio

需要 登录 后方可回复, 如果你还没有账号请 注册新账号