温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

Java中多线程Reactor模式怎么实现

发布时间:2021-12-03 11:32:27 来源:亿速云 阅读:185 作者:iii 栏目:开发技术

这篇文章主要讲解了“Java中多线程Reactor模式怎么实现”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“Java中多线程Reactor模式怎么实现”吧!

多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件。

让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求。这样一来连接请求与IO请求分开执行提高通道的并发量。同时多个Reactor带来的好处是多个selector可以提高通道的检索速度

1、 主服务器

package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Set; import java.util.concurrent.atomic.AtomicInteger; class MultiThreadEchoServerReactor {     ServerSocketChannel serverSocket;     AtomicInteger next = new AtomicInteger(0);     Selector bossSelector = null;     Reactor bossReactor = null;     //selectors集合,引入多个selector选择器     //多个选择器可以更好的提高通道的并发量     Selector[] workSelectors = new Selector[2];     //引入多个子反应器     //如果CPU是多核的可以开启多个子Reactor反应器,这样每一个子Reactor反应器还可以独立分配一个线程。     //每一个线程可以单独绑定一个单独的Selector选择器以提高通道并发量     Reactor[] workReactors = null;     MultiThreadEchoServerReactor() throws IOException {         bossSelector = Selector.open();         //初始化多个selector选择器         workSelectors[0] = Selector.open();         workSelectors[1] = Selector.open();         serverSocket = ServerSocketChannel.open();         InetSocketAddress address =                 new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,                         NioDemoConfig.SOCKET_SERVER_PORT);         serverSocket.socket().bind(address);         //非阻塞         serverSocket.configureBlocking(false);         //第一个selector,负责监控新连接事件         SelectionKey sk =                 serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);         //附加新连接处理handler处理器到SelectionKey(选择键)         sk.attach(new AcceptorHandler());         //处理新连接的反应器         bossReactor = new Reactor(bossSelector);         //第一个子反应器,一子反应器负责一个选择器         Reactor subReactor1 = new Reactor(workSelectors[0]);         //第二个子反应器,一子反应器负责一个选择器         Reactor subReactor2 = new Reactor(workSelectors[1]);         workReactors = new Reactor[]{subReactor1, subReactor2};     }     private void startService() {         new Thread(bossReactor).start();         // 一子反应器对应一条线程         new Thread(workReactors[0]).start();         new Thread(workReactors[1]).start();     }     //反应器     class Reactor implements Runnable {         //每条线程负责一个选择器的查询         final Selector selector;         public Reactor(Selector selector) {             this.selector = selector;         }         public void run() {             try {                 while (!Thread.interrupted()) {                     //单位为毫秒                     //每隔一秒列出选择器感应列表                     selector.select(1000);                     Set<SelectionKey> selectedKeys = selector.selectedKeys();                     if (null == selectedKeys || selectedKeys.size() == 0) {                         //如果列表中的通道注册事件没有发生那就继续执行                         continue;                     }                     Iterator<SelectionKey> it = selectedKeys.iterator();                     while (it.hasNext()) {                         //Reactor负责dispatch收到的事件                         SelectionKey sk = it.next();                         dispatch(sk);                     }                     //清楚掉已经处理过的感应事件,防止重复处理                     selectedKeys.clear();                 }             } catch (IOException ex) {                 ex.printStackTrace();             }         }         void dispatch(SelectionKey sk) {             Runnable handler = (Runnable) sk.attachment();             //调用之前attach绑定到选择键的handler处理器对象             if (handler != null) {                 handler.run();             }         }     }     // Handler:新连接处理器     class AcceptorHandler implements Runnable {         public void run() {         try {                 SocketChannel channel = serverSocket.accept();                 Logger.info("接收到一个新的连接");                 if (channel != null) {                     int index = next.get();                     Logger.info("选择器的编号:" + index);                     Selector selector = workSelectors[index];                     new MultiThreadEchoHandler(selector, channel);                 }             } catch (IOException e) {                 e.printStackTrace();             }             if (next.incrementAndGet() == workSelectors.length) {                 next.set(0);             }         }     }     public static void main(String[] args) throws IOException {         MultiThreadEchoServerReactor server =                 new MultiThreadEchoServerReactor();         server.startService();     } }

按上述的设计思想,在主服务器中实际上设计了三个Reactor,一个主Reactor专门负责连接请求并配已单独的selector,但是三个Reactor的线程Run函数是做的相同的功能,都是根据每个线程内部的selector进行检索事件列表,若注册的监听事件发生了则调用dispactch分发到每个Reactor对应的Handler。

这里需要注意的一开始其实只有负责连接事件的主Reactor在注册selector的时候给相应的key配了一个AcceptorHandler()。

 //第一个selector,负责监控新连接事件         SelectionKey sk =                 serverSocket.register(bossSelector, SelectionKey.OP_ACCEPT);         //附加新连接处理handler处理器到SelectionKey(选择键)         sk.attach(new AcceptorHandler());

但是Reactor的run方法里若相应的selector key发生了便要dispatch到一个Handler。这里其他两个子Reactor的Handler在哪里赋值的呢?其实在处理连接请求的Reactor中便创建了各个子Handler,如下代码所示:
主Handler中先是根据服务器channel创建出客服端channel,在进行子selector与channel的绑定。

   int index = next.get();                    Logger.info("选择器的编号:" + index);                    Selector selector = workSelectors[index];                    new MultiThreadEchoHandler(selector, channel);

2、IO请求handler+线程池

package com.crazymakercircle.ReactorModel; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; class MultiThreadEchoHandler implements Runnable {     final SocketChannel channel;     final SelectionKey sk;     final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);     static final int RECIEVING = 0, SENDING = 1;     int state = RECIEVING;     //引入线程池     static ExecutorService pool = Executors.newFixedThreadPool(4);     MultiThreadEchoHandler(Selector selector, SocketChannel c) throws IOException {         channel = c;         channel.configureBlocking(false);         //唤醒选择,防止register时 boss线程被阻塞,netty 处理方式比较优雅,会在同一个线程注册事件,避免阻塞boss         selector.wakeup();         //仅仅取得选择键,后设置感兴趣的IO事件         sk = channel.register(selector, 0);         //将本Handler作为sk选择键的附件,方便事件dispatch         sk.attach(this);         //向sk选择键注册Read就绪事件         sk.interestOps(SelectionKey.OP_READ);         //唤醒选择,是的OP_READ生效         selector.wakeup();         Logger.info("新的连接 注册完成");     }     public void run() {         //异步任务,在独立的线程池中执行         pool.execute(new AsyncTask());     }     //异步任务,不在Reactor线程中执行     public synchronized void asyncRun() {         try {             if (state == SENDING) {                 //写入通道                 channel.write(byteBuffer);                 //写完后,准备开始从通道读,byteBuffer切换成写模式                 byteBuffer.clear();                 //写完后,注册read就绪事件                 sk.interestOps(SelectionKey.OP_READ);                 //写完后,进入接收的状态                 state = RECIEVING;             } else if (state == RECIEVING) {                 //从通道读                 int length = 0;                 while ((length = channel.read(byteBuffer)) > 0) {                     Logger.info(new String(byteBuffer.array(), 0, length));                 }                 //读完后,准备开始写入通道,byteBuffer切换成读模式                 byteBuffer.flip();                 //读完后,注册write就绪事件                 sk.interestOps(SelectionKey.OP_WRITE);                 //读完后,进入发送的状态                 state = SENDING;             }             //处理结束了, 这里不能关闭select key,需要重复使用             //sk.cancel();         } catch (IOException ex) {             ex.printStackTrace();         }     }     //异步任务的内部类     class AsyncTask implements Runnable {         public void run() {             MultiThreadEchoHandler.this.asyncRun();         }     } }

3、客户端

在处理IO请求的Handler中采用了线程池,已达到异步处理的目的。

package com.crazymakercircle.ReactorModel; import com.crazymakercircle.NioDemoConfig; import com.crazymakercircle.util.Dateutil; import com.crazymakercircle.util.Logger; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.util.Iterator; import java.util.Scanner; import java.util.Set; /**  * create by 尼恩 @ 疯狂创客圈  **/ public class EchoClient {     public void start() throws IOException {         InetSocketAddress address =                 new InetSocketAddress(NioDemoConfig.SOCKET_SERVER_IP,                         NioDemoConfig.SOCKET_SERVER_PORT);         // 1、获取通道(channel)         SocketChannel socketChannel = SocketChannel.open(address);         Logger.info("客户端连接成功");         // 2、切换成非阻塞模式         socketChannel.configureBlocking(false);         //不断的自旋、等待连接完成,或者做一些其他的事情         while (!socketChannel.finishConnect()) {         }         Logger.tcfo("客户端启动成功!");         //启动接受线程         Processer processer = new Processer(socketChannel);         new Thread(processer).start();     }     static class Processer implements Runnable {         final Selector selector;         final SocketChannel channel;         Processer(SocketChannel channel) throws IOException {             //Reactor初始化             selector = Selector.open();             this.channel = channel;             channel.register(selector,                     SelectionKey.OP_READ | SelectionKey.OP_WRITE);         }         public void run() {             try {                 while (!Thread.interrupted()) {                     selector.select();                     Set<SelectionKey> selected = selector.selectedKeys();                     Iterator<SelectionKey> it = selected.iterator();                     while (it.hasNext()) {                         SelectionKey sk = it.next();                         if (sk.isWritable()) {                             ByteBuffer buffer = ByteBuffer.allocate(NioDemoConfig.SEND_BUFFER_SIZE);                             Scanner scanner = new Scanner(System.in);                             Logger.tcfo("请输入发送内容:");                             if (scanner.hasNext()) {                                 SocketChannel socketChannel = (SocketChannel) sk.channel();                                 String next = scanner.next();                                 buffer.put((Dateutil.getNow() + " >>" + next).getBytes());                                 buffer.flip();                                 // 操作三:发送数据                                 socketChannel.write(buffer);                                 buffer.clear();                             }                         }                         if (sk.isReadable()) {                             // 若选择键的IO事件是“可读”事件,读取数据                             SocketChannel socketChannel = (SocketChannel) sk.channel();                             //读取数据                             ByteBuffer byteBuffer = ByteBuffer.allocate(1024);                             int length = 0;                             while ((length = socketChannel.read(byteBuffer)) > 0) {                                 byteBuffer.flip();                                 Logger.info("server echo:" + new String(byteBuffer.array(), 0, length));                                 byteBuffer.clear();                             }                         }                         //处理结束了, 这里不能关闭select key,需要重复使用                         //selectionKey.cancel();                     }                     selected.clear();                 }             } catch (IOException ex) {                 ex.printStackTrace();             }         }     }     public static void main(String[] args) throws IOException {         new EchoClient().start();     } }

感谢各位的阅读,以上就是“Java中多线程Reactor模式怎么实现”的内容了,经过本文的学习后,相信大家对Java中多线程Reactor模式怎么实现这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI