温馨提示×

温馨提示×

您好,登录后才能下订单哦!

密码登录×
登录注册×
其他方式登录
点击 登录注册 即表示同意《亿速云用户服务条款》

netty的怎么实现及运用到gmq中

发布时间:2022-03-14 16:33:17 来源:亿速云 阅读:161 作者:iii 栏目:web开发

本篇内容主要讲解“netty的怎么实现及运用到gmq中”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“netty的怎么实现及运用到gmq中”吧!

一、背景

书接上文手写MQ框架(三)-客户端实现 ,前面通过web的形式实现了mq的服务端和客户端,现在计划使用netty来改造一下。前段时间学习了一下netty的使用。大概有一些想法。

netty封装了socket的使用,我们通过简单的调用即可构建高性能的网络应用。我计划采用以下例子来对gmq进行改造。

二、netty是什么

Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

netty是一个java框架,是网络编程框架,支持异步、事件驱动的特性,所以性能表现很好。

三、netty的简单实现

1、服务端

1)SimpleServerHandler

Handler是处理器,handler 是由 Netty 生成用来处理 I/O 事件的。

package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.DefaultChannelGroup; import io.netty.util.concurrent.GlobalEventExecutor; public class SimpleServerHandler extends SimpleChannelInboundHandler<String> {     public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);     @Override     public void handlerAdded(ChannelHandlerContext ctx) throws Exception {         Channel incoming = ctx.channel();         System.out.println("[SERVER] - " + incoming.remoteAddress() + " 加入\n");         channels.add(ctx.channel());     }     @Override     public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {         Channel incoming = ctx.channel();         System.out.println("[SERVER] - " + incoming.remoteAddress() + " 离开\n");         channels.remove(ctx.channel());     }          @Override     protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {         Channel incoming = ctx.channel();         System.out.println("[" + incoming.remoteAddress() + "]" + s);         if(s == null || s.length() == 0) {             incoming.writeAndFlush("消息是空的呀!\n");         } else { //            MqRouter<?> mqRouter = JSONObject.parseObject(s, MqRouter.class); //            System.out.println(mqRouter.getUri());             String responseMsg = "收到了," + s + "\n";             incoming.writeAndFlush(responseMsg);         }     }     @Override     public void channelActive(ChannelHandlerContext ctx) throws Exception {         Channel incoming = ctx.channel();         System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"在线");     }     @Override     public void channelInactive(ChannelHandlerContext ctx) throws Exception {         Channel incoming = ctx.channel();         System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"掉线");     }     @Override     public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {         Channel incoming = ctx.channel();         System.out.println("SimpleChatClient:"+incoming.remoteAddress()+"异常");                  cause.printStackTrace();         ctx.close();     } }

2)SimpleServerInitializer

SimpleServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleServerHandler 等。

package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleServerInitializer extends ChannelInitializer<SocketChannel> {     @Override     protected void initChannel(SocketChannel ch) throws Exception {         ChannelPipeline pipeline = ch.pipeline();         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));         pipeline.addLast("decoder", new StringDecoder());         pipeline.addLast("encoder", new StringEncoder());         pipeline.addLast("handler", new SimpleServerHandler());                  System.out.println("SimpleChatClient:" + ch.remoteAddress() + "连接上");     } }

3)SimpleServer

package me.lovegao.netty.learnw3c.mqdemo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; public class SimpleServer {     private int port;     public SimpleServer(int port) {         this.port = port;     }     public void run() throws Exception {         EventLoopGroup bossGroup = new NioEventLoopGroup();         EventLoopGroup workerGroup = new NioEventLoopGroup();         try {             ServerBootstrap b = new ServerBootstrap();             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class)                     .childHandler(new SimpleServerInitializer()).option(ChannelOption.SO_BACKLOG, 128)                     .childOption(ChannelOption.SO_KEEPALIVE, true);             System.out.println("SimpleChatServer 启动了");             ChannelFuture f = b.bind(port).sync();             f.channel().closeFuture().sync();         } finally {             workerGroup.shutdownGracefully();             bossGroup.shutdownGracefully();             System.out.println("SimpleChatServer 关闭了");         }     }     public static void main(String[] args) throws Exception {         int port;         if (args.length > 0) {             port = Integer.parseInt(args[0]);         } else {             port = 8080;         }         new SimpleServer(port).run();     } }

 2、客户端

1)SimpleClientHandler

package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class SimpleClientHandler extends SimpleChannelInboundHandler<String> {     @Override     protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {         System.out.println("收到的信息:" + s);     } }

2)SimpleClientInitializer

package me.lovegao.netty.learnw3c.mqdemo; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.DelimiterBasedFrameDecoder; import io.netty.handler.codec.Delimiters; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {     @Override     protected void initChannel(SocketChannel ch) throws Exception {         ChannelPipeline pipeline = ch.pipeline();         pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));         pipeline.addLast("decoder", new StringDecoder());         pipeline.addLast("encoder", new StringEncoder());         pipeline.addLast("handler", new SimpleClientHandler());     } }

3)SimpleClient

package me.lovegao.netty.learnw3c.mqdemo; import java.io.BufferedReader; import java.io.InputStreamReader; import io.netty.bootstrap.Bootstrap; import io.netty.channel.Channel; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; public class SimpleClient {     private final String host;     private final int port;          public SimpleClient(String host, int port) {         this.host = host;         this.port = port;     }     public static void main(String[] args) throws Exception {         new SimpleClient("localhost", 8080).run();     }          public void run() throws Exception {         EventLoopGroup group = new NioEventLoopGroup();         try {             Bootstrap bootstrap = new Bootstrap()                     .group(group)                     .channel(NioSocketChannel.class)                     .handler(new SimpleClientInitializer());             Channel channel = bootstrap.connect(host, port).sync().channel();             BufferedReader in = new BufferedReader(new InputStreamReader(System.in));             while(true) {                 String line = in.readLine();                 if(line.equals("exit!")) {                     break;                 }                 channel.writeAndFlush(line + "\r\n");             }         } catch(Exception e) {             e.printStackTrace();         } finally {             group.shutdownGracefully();         }     } }

3、学习中的一些事

在我把教程上的代码略微改了一下,测试时发现客户端能够发出消息,服务端能够收到消息,服务端也走到了回复客户端的流程,但是客户端却收不到消息。还原代码后是正常的,想了半天,最后才发现是改代码的的时候漏掉了“\n”这个标识,以此导致客户端始终不打印消息。

四、netty如何运用到gmq中

1、运用有什么问题

netty只封装了网络交互,gmq整体使用了gmvc框架,而gmvc框架目前还无法脱离servlet。而我又不太想把之前写的代码全部改为自己new的方式。

2、解决方式

1)改造gmvc框架

对gmvc框架进行重构,使得能够脱离servlet使用。也就是将IOC功能剥离开。

优点:一步到位,符合整体的规划。

缺点:gmq的迭代会延迟一段时间。

2)暂时抛弃gmvc框架

暂时将目前依赖的gmvc框架给去除掉,优先完成gmq的迭代。待后期gmvc框架改造完成后再进行改造。

优点:能够尽早的完成gmq的功能。

缺点:先移除框架,后期再套上框架,相当于做了两次多余的功。费时费力。

到此,相信大家对“netty的怎么实现及运用到gmq中”有了更深的了解,不妨来实际操作一番吧!这里是亿速云网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!

向AI问一下细节

免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。

AI