Non-Blocking IO with Netty Mariano Cortesi Fernando Zunino @marianocortesi @fzunino
Por que Non Blocking IO?
Por que Non Blocking IO?
Por que Non Blocking IO?
Por que Non Blocking IO?
Construyendo servidores Estructura básica Leer Decodificar Codificar Enviar Procesar Request Request Respuesta Respuesta
El Camino Tradicional ✓ Thread por conexión ✓ Uso de IO bloqueante ✓ Modelo sencillo de programación Leer Decodificar Procesar Codificar Enviar Cliente Handler Cliente Servidor Leer Decodificar Procesar Codificar Enviar Handler Cliente Leer Decodificar Procesar Codificar Enviar Handler
An Echo Server • Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
An Echo Server • Java OIO public class ThreadPoolEchoServer { Inicialización de socket para escucha public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
An Echo Server • Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { Inicialización de ServerSocket servSock = new ServerSocket(20007); pool de threads Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
An Echo Server • Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Se acepta conexión Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
An Echo Server • Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } Se ejecuta worker thread }
An Echo Server • Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } Obtención de Streams de public void run() { entrada y salida try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; Se lee hasta que el cliente cierre la conexión byte[] receiveBuf = new byte[256]; (bloqueante) while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } Se reenvia lo leido al } catch (IOException e) { // ... cliente (bloqueante) } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } el socket Se cierra } } }
An Echo Server • Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
El Camino Tradicional: Problemas
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K)
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes ✓ Context Switching Overhead
El Camino Tradicional: Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes ✓ Context Switching Overhead ✓ Consumo de memoria
Non Blocking IO: What’s the story? ✓ Modelo orientado a eventos Reactor Pattern ✓ Un único thread de procesamiento ✓ Uso de Readiness Notification y Non Blocking IO Leer Cliente Decodificar read events Procesar Cliente Reactor write events Codificar Cliente Enviar
Non Blocking IO vs. Blocking IO
Non Blocking IO en Java
Non Blocking IO en Java ✓ Java NIO
Non Blocking IO en Java ✓ Java NIO ✓ Frameworks
Non Blocking IO en Java ✓ Java NIO ✓ Frameworks ✓ Apache Mina
Non Blocking IO en Java ✓ Java NIO ✓ Frameworks ✓ Apache Mina ✓ JBoss Netty
Non Blocking IO en Java - NIO Diseño Java NIO Dispatcher EventHandler * 1 Selector SelectionKey SelectableChannel SocketChannel java.nio.channels
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... Crea selector para } monitorear sockets public void run() throws IOException { pasivos y conexiones Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Inicialización de socket Selector selector = Selector.open(); para escucha en forma no bloqueante ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); Registra socket listenChannel.register(selector, SelectionKey.OP_ACCEPT); declarando interés en nuevas conexiones while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); Monitorea actividad en listenChannel.configureBlocking(false); todos los sockets listenChannel.register(selector, SelectionKey.OP_ACCEPT); registrados while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { Detecta y dispara eventos SelectionKey key = keyIter.next(); de acuerdo al tipo if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; Acepta nueva conexión public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } Registra nuevo socket public void handleRead(SelectionKey key) throws IOException { declarando interés en SocketChannel clntChan = (SocketChannel) key.channel(); lectura y asocia buffer ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); Lee datos en buffer if (bytesRead == -1) { // Did the other end close? asociado clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); Detecta fin de conexión } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } Declara interés en public void handleWrite(SelectionKey key) throws IOException { escritura si hay datos a ByteBuffer buf = (ByteBuffer) key.attachment(); escribir buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); Envia datos al socket if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } Revoca interés en escritura buf.compact(); si no hay nada para escribir } }
An Echo Server • Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
An Echo Server • Java NIO public interface EventHandler { void handleAccept(SelectionKey key) throws IOException; void handleRead(SelectionKey key) throws IOException; void handleWrite(SelectionKey key) throws IOException; } public class NIOEchoServer { public static void main(String[] args) throws IOException { Dispatcher dispatcher = new Dispatcher(20007, new EchoProtocolEventHandler()); dispatcher.run(); } }
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven application network framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of high performance & high scalability maintainable protocol servers & clients
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application rapid development framework and tools for of maintainable high performance & high scalability protocol servers & clients
Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients Warning! Netty no es un WebServer
Netty Model • Channels creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Representa a una conexion P2P (point to point) ✓ Abstrae UDP / TCP ✓ Abstrae NIO / OIO ✓ API interacción conexión (abrir, cerrar, escribir, leer)
Netty Model • ChannelBuffers creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
Netty Model • ChannelBuffers creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Abstrae de NIO Buffers & byte arrays ✓ API simplificada respecto NIO Buffers ✓ Toda lectura y escritura se hace con ellos
Netty Model • ChannelFactory creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
Netty Model • ChannelFactory creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Crear Channels ✓ ChannelFactory impl. para UDP / TCP y OIO / NIO ✓ Non Blocking con: NioServerSocketChannelFactory
Netty Model • ChannelEvent creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
Netty Model • ChannelEvent creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Representa un evento I/O asociado a un Channel ✓ Puede ser upstream (entrada) or downstream (salida) ✓ Se pueden crear eventos custom ✓ Algunos tipos de evento: ★ messageReceived ★ channelOpen ★ channelClosed ★ write
Netty Model • ChannelPipeline creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
Netty Model • ChannelPipeline creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Asociado a un Channel ✓ Atrapa y Handlea los ChannelEvents ✓ Implementa el patron Intercepting Filter
Netty Model • ChannelHandler creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
Netty Model • ChannelHandler creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Building block de un ChannelPipeline ✓ Handlea ChannelEvents (upstream y/o downstream) ✓ Decide sobre el forwarding de eventos a sus pares upstream Handler Handler Handler Handler event N -1 First Last 2nd downstream event
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { Utility Class para public static void main(String[] args) throws Exception {inicializar un Server // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( ChannelFactory para Tcp new NioServerSocketChannelFactory( w/ NIO Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. Boss pool thread ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); Workers pool thread // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipelineFactory para cada nuevo Channel return Channels.pipeline(new EchoServerHandler()); establcido } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); Un solo handler. El echo Handler // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Finalmente inicializamos el servidor Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Handler Utility class para handlear public class EchoServerHandler extends SimpleChannelHandler { eventos típicos @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override Event Handler para public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { nuevo mensaje e.getChannel().write(e.getMessage()); } @Override Event Handler para void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { public excepciones e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Contexto del Canal Channel ch = e.getChannel(); Permite interactuar con su ch.close(); pipeline } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } Interacción con el Channel: Lectura y Escritura @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
An Echo Server • Netty OIO Vuelta a OIO! public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new OioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } }
An Echo Server • Netty OIO Vuelta a OIO! public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( ChannelFactory para Tcp new OioServerSocketChannelFactory( w/ OIO Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } }
A Distributed Logger • Diseño Protocolo ✓ Formato textual, de linea ✓ Paquete tamaño variable ✓ Mensaje por fin de linea ✓ Formato: “${level} - ${mensaje}” DEBUG - Algo esta mal!!
Framer Handler Decoder Handler pipeline LoggerPrinter Handler A Distributed Logger • Diseño
Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Framer
A Distributed Logger • Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { super(8000, Delimiters.lineDelimiter()); } }
A Distributed Logger • Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { Framer para demarcar super(8000, Delimiters.lineDelimiter()); paquetes a través de un } delimitador }
A Distributed Logger • Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { super(8000, Delimiters.lineDelimiter()); } } Delimitador para lineas
Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Framer
Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Decoder
A Distributed Logger • Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
A Distributed Logger • Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override El framer deja un public void messageReceived(ChannelHandlerContext ctx, MessageEventmsg { ChannelBuffer como e) Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
A Distributed Logger • Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); parseo y creación de un LogEvent LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
A Distributed Logger • Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); } propago el mensaje, como LogEvent
Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Decoder
Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Printer
A Distributed Logger • Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
A Distributed Logger • Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer El msg ahora es un @Override LogEvent public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
A Distributed Logger • Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
Conclusiones ✓ Non-Blocking IO ✓ Importa! ✓ Aplicaciones altamente concurrentes (c10k) ✓ Netty ✓ java nio made easy ✓ modelo flexible para crear servidores NIO (o OIO)
¿¿ Preguntas ?? We are hiring! Apply at jobs@zaubersoftware.com

Non blocking io with netty

  • 1.
    Non-Blocking IO with Netty Mariano Cortesi Fernando Zunino @marianocortesi @fzunino
  • 2.
    Por que NonBlocking IO?
  • 3.
    Por que NonBlocking IO?
  • 4.
    Por que NonBlocking IO?
  • 5.
    Por que NonBlocking IO?
  • 6.
    Construyendo servidores Estructura básica Leer Decodificar Codificar Enviar Procesar Request Request Respuesta Respuesta
  • 7.
    El Camino Tradicional ✓ Thread por conexión ✓ Uso de IO bloqueante ✓ Modelo sencillo de programación Leer Decodificar Procesar Codificar Enviar Cliente Handler Cliente Servidor Leer Decodificar Procesar Codificar Enviar Handler Cliente Leer Decodificar Procesar Codificar Enviar Handler
  • 8.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
  • 9.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { Inicialización de socket para escucha public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
  • 10.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { Inicialización de ServerSocket servSock = new ServerSocket(20007); pool de threads Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
  • 11.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Se acepta conexión Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
  • 12.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } Se ejecuta worker thread }
  • 13.
    An Echo Server• Java OIO public class ThreadPoolEchoServer { public static void main(String[] args) throws IOException { ServerSocket servSock = new ServerSocket(20007); Executor service = Executors.newCachedThreadPool(); while (!Thread.interrupted()) { Socket clntSock = servSock.accept(); service.execute(new EchoWorker(clntSock)); } } }
  • 14.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
  • 15.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } Obtención de Streams de public void run() { entrada y salida try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
  • 16.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; Se lee hasta que el cliente cierre la conexión byte[] receiveBuf = new byte[256]; (bloqueante) while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
  • 17.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } Se reenvia lo leido al } catch (IOException e) { // ... cliente (bloqueante) } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
  • 18.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } el socket Se cierra } } }
  • 19.
    An Echo Server• Java OIO public class EchoWorker implements Runnable { public EchoWorker(final Socket s) { this.socket = s; } public void run() { try { InputStream in = this.socket.getInputStream(); OutputStream out = this.socket.getOutputStream(); int recvMsgSize; byte[] receiveBuf = new byte[256]; while ((recvMsgSize = in.read(receiveBuf)) != -1) { out.write(receiveBuf, 0, recvMsgSize); } } catch (IOException e) { // ... } finally { try { this.socket.close(); } catch (IOException e) { // ... } } } }
  • 20.
  • 21.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes
  • 22.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización
  • 23.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes
  • 24.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K)
  • 25.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes
  • 26.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes ✓ Context Switching Overhead
  • 27.
    El Camino Tradicional:Problemas ✓ Estado compartido entre clientes ✓Sincronización ✓ Priorización de clientes ✓ Alta escala (C10K) ✓ Conexiones Persistentes ✓ Context Switching Overhead ✓ Consumo de memoria
  • 28.
    Non Blocking IO:What’s the story? ✓ Modelo orientado a eventos Reactor Pattern ✓ Un único thread de procesamiento ✓ Uso de Readiness Notification y Non Blocking IO Leer Cliente Decodificar read events Procesar Cliente Reactor write events Codificar Cliente Enviar
  • 29.
    Non Blocking IOvs. Blocking IO
  • 30.
  • 31.
    Non Blocking IOen Java ✓ Java NIO
  • 32.
    Non Blocking IOen Java ✓ Java NIO ✓ Frameworks
  • 33.
    Non Blocking IOen Java ✓ Java NIO ✓ Frameworks ✓ Apache Mina
  • 34.
    Non Blocking IOen Java ✓ Java NIO ✓ Frameworks ✓ Apache Mina ✓ JBoss Netty
  • 35.
    Non Blocking IOen Java - NIO Diseño Java NIO Dispatcher EventHandler * 1 Selector SelectionKey SelectableChannel SocketChannel java.nio.channels
  • 36.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 37.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... Crea selector para } monitorear sockets public void run() throws IOException { pasivos y conexiones Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 38.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Inicialización de socket Selector selector = Selector.open(); para escucha en forma no bloqueante ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 39.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); Registra socket listenChannel.register(selector, SelectionKey.OP_ACCEPT); declarando interés en nuevas conexiones while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 40.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); Monitorea actividad en listenChannel.configureBlocking(false); todos los sockets listenChannel.register(selector, SelectionKey.OP_ACCEPT); registrados while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 41.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { Detecta y dispara eventos SelectionKey key = keyIter.next(); de acuerdo al tipo if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 42.
    An Echo Server• Java NIO public class Dispatcher { public Dispatcher(final int port, final EventHandler handler) { ... } public void run() throws IOException { Selector selector = Selector.open(); ServerSocketChannel listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(this.port)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); while (!Thread.interrupted()) { if (selector.select(3000) == 0) continue; Iterator<SelectionKey> keyIter = selector.selectedKeys().iterator(); while (keyIter.hasNext()) { SelectionKey key = keyIter.next(); if (key.isAcceptable()) { handler.handleAccept(key); } if (key.isReadable()) { handler.handleRead(key); } if (key.isValid() && key.isWritable()) { handler.handleWrite(key); } keyIter.remove(); // remove from set of selected keys } } } }
  • 43.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 44.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; Acepta nueva conexión public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 45.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } Registra nuevo socket public void handleRead(SelectionKey key) throws IOException { declarando interés en SocketChannel clntChan = (SocketChannel) key.channel(); lectura y asocia buffer ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 46.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); Lee datos en buffer if (bytesRead == -1) { // Did the other end close? asociado clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 47.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); Detecta fin de conexión } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 48.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } Declara interés en public void handleWrite(SelectionKey key) throws IOException { escritura si hay datos a ByteBuffer buf = (ByteBuffer) key.attachment(); escribir buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 49.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); Envia datos al socket if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 50.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } Revoca interés en escritura buf.compact(); si no hay nada para escribir } }
  • 51.
    An Echo Server• Java NIO public class EchoProtocolEventHandler implements EventHandler { private static final int BUFSIZE = 256; public void handleAccept(SelectionKey key) throws IOException { SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept(); clntChan.configureBlocking(false); clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer.allocate(BUFSIZE)); } public void handleRead(SelectionKey key) throws IOException { SocketChannel clntChan = (SocketChannel) key.channel(); ByteBuffer buf = (ByteBuffer) key.attachment(); long bytesRead = clntChan.read(buf); if (bytesRead == -1) { // Did the other end close? clntChan.close(); } else if (bytesRead > 0) { key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE); } } public void handleWrite(SelectionKey key) throws IOException { ByteBuffer buf = (ByteBuffer) key.attachment(); buf.flip(); SocketChannel clntChan = (SocketChannel) key.channel(); clntChan.write(buf); if (!buf.hasRemaining()) { key.interestOps(SelectionKey.OP_READ); } buf.compact(); } }
  • 52.
    An Echo Server• Java NIO public interface EventHandler { void handleAccept(SelectionKey key) throws IOException; void handleRead(SelectionKey key) throws IOException; void handleWrite(SelectionKey key) throws IOException; } public class NIOEchoServer { public static void main(String[] args) throws IOException { Dispatcher dispatcher = new Dispatcher(20007, new EchoProtocolEventHandler()); dispatcher.run(); } }
  • 53.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
  • 54.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
  • 55.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven application network framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients
  • 56.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of high performance & high scalability maintainable protocol servers & clients
  • 57.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application rapid development framework and tools for of maintainable high performance & high scalability protocol servers & clients
  • 58.
    Netty!! from Netty Homepage The Netty project is an effort to provide an asynchronous event-driven network application framework and tools for rapid development of maintainable high performance & high scalability protocol servers & clients Warning! Netty no es un WebServer
  • 59.
    Netty Model •Channels creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Representa a una conexion P2P (point to point) ✓ Abstrae UDP / TCP ✓ Abstrae NIO / OIO ✓ API interacción conexión (abrir, cerrar, escribir, leer)
  • 60.
    Netty Model •ChannelBuffers creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
  • 61.
    Netty Model •ChannelBuffers creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Abstrae de NIO Buffers & byte arrays ✓ API simplificada respecto NIO Buffers ✓ Toda lectura y escritura se hace con ellos
  • 62.
    Netty Model •ChannelFactory creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
  • 63.
    Netty Model •ChannelFactory creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Crear Channels ✓ ChannelFactory impl. para UDP / TCP y OIO / NIO ✓ Non Blocking con: NioServerSocketChannelFactory
  • 64.
    Netty Model •ChannelEvent creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
  • 65.
    Netty Model •ChannelEvent creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Representa un evento I/O asociado a un Channel ✓ Puede ser upstream (entrada) or downstream (salida) ✓ Se pueden crear eventos custom ✓ Algunos tipos de evento: ★ messageReceived ★ channelOpen ★ channelClosed ★ write
  • 66.
    Netty Model •ChannelPipeline creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
  • 67.
    Netty Model •ChannelPipeline creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Asociado a un Channel ✓ Atrapa y Handlea los ChannelEvents ✓ Implementa el patron Intercepting Filter
  • 68.
    Netty Model •ChannelHandler creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler
  • 69.
    Netty Model •ChannelHandler creates handles ChannelFactory Channel ChannelPipeline es with & writ generates is sorted list of s read handles ChannelBuffer ChannelEvent ChannelHandler ✓ Building block de un ChannelPipeline ✓ Handlea ChannelEvents (upstream y/o downstream) ✓ Decide sobre el forwarding de eventos a sus pares upstream Handler Handler Handler Handler event N -1 First Last 2nd downstream event
  • 70.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 71.
    An Echo Server• Netty NIO Main Class public class EchoServer { Utility Class para public static void main(String[] args) throws Exception {inicializar un Server // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 72.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( ChannelFactory para Tcp new NioServerSocketChannelFactory( w/ NIO Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 73.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. Boss pool thread ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); Workers pool thread // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 74.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { ChannelPipelineFactory para cada nuevo Channel return Channels.pipeline(new EchoServerHandler()); establcido } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 75.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); Un solo handler. El echo Handler // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 76.
    An Echo Server• Netty NIO Main Class public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } } Finalmente inicializamos el servidor Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 77.
    An Echo Server• Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 78.
    An Echo Server• Netty NIO Handler Utility class para handlear public class EchoServerHandler extends SimpleChannelHandler { eventos típicos @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 79.
    An Echo Server• Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override Event Handler para public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { nuevo mensaje e.getChannel().write(e.getMessage()); } @Override Event Handler para void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { public excepciones e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 80.
    An Echo Server• Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Contexto del Canal Channel ch = e.getChannel(); Permite interactuar con su ch.close(); pipeline } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 81.
    An Echo Server• Netty NIO Handler public class EchoServerHandler extends SimpleChannelHandler { @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { e.getChannel().write(e.getMessage()); } Interacción con el Channel: Lectura y Escritura @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) { e.getCause().printStackTrace(); Channel ch = e.getChannel(); ch.close(); } } Ejemplo en: http://docs.jboss.org/netty/3.2/xref/org/jboss/netty/example/echo/package-summary.html
  • 82.
    An Echo Server• Netty OIO Vuelta a OIO! public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new OioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } }
  • 83.
    An Echo Server• Netty OIO Vuelta a OIO! public class EchoServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( ChannelFactory para Tcp new OioServerSocketChannelFactory( w/ OIO Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the pipeline factory. bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() throws Exception { return Channels.pipeline(new EchoServerHandler()); } }); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8080)); } }
  • 84.
    A Distributed Logger• Diseño Protocolo ✓ Formato textual, de linea ✓ Paquete tamaño variable ✓ Mensaje por fin de linea ✓ Formato: “${level} - ${mensaje}” DEBUG - Algo esta mal!!
  • 85.
    Framer Handler Decoder Handler pipeline LoggerPrinter Handler A Distributed Logger • Diseño
  • 86.
    Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Framer
  • 87.
    A Distributed Logger• Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { super(8000, Delimiters.lineDelimiter()); } }
  • 88.
    A Distributed Logger• Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { Framer para demarcar super(8000, Delimiters.lineDelimiter()); paquetes a través de un } delimitador }
  • 89.
    A Distributed Logger• Framer Decoder Handler Framer Handler LoggerPrinter Handler Framer import org.jboss.netty.handler.codec.frame.DelimiterBasedFrameDecoder; import org.jboss.netty.handler.codec.frame.Delimiters; public class LoggerFramerHandler extends DelimiterBasedFrameDecoder { public LoggerFramerHandler() { super(8000, Delimiters.lineDelimiter()); } } Delimitador para lineas
  • 90.
    Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Framer
  • 91.
    Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Decoder
  • 92.
    A Distributed Logger• Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
  • 93.
    A Distributed Logger• Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override El framer deja un public void messageReceived(ChannelHandlerContext ctx, MessageEventmsg { ChannelBuffer como e) Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
  • 94.
    A Distributed Logger• Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); parseo y creación de un LogEvent LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); }
  • 95.
    A Distributed Logger• Decoder Decoder Handler Framer Handler LoggerPrinter Handler Decoder @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { Charset ch = Charset.defaultCharset(); String msg = ((ChannelBuffer) e.getMessage()).toString(ch); String[] args = msg.split("-", 2); LogEvent logEvent = new LogEvent(Level.valueOf(args[0].trim()), args[1].trim()); Channels.fireMessageReceived(ctx, logEvent); } propago el mensaje, como LogEvent
  • 96.
    Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Decoder
  • 97.
    Framer Handler Decoder Handler LoggerPrinter Handler A Distributed Logger • Printer
  • 98.
    A Distributed Logger• Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
  • 99.
    A Distributed Logger• Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer El msg ahora es un @Override LogEvent public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
  • 100.
    A Distributed Logger• Printer Decoder Handler Framer Handler LoggerPrinter Handler Printer @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) { LogEvent logEvent = (LogEvent) e.getMessage(); System.out.println(logEvent); }
  • 101.
    Conclusiones ✓ Non-Blocking IO ✓ Importa! ✓ Aplicaciones altamente concurrentes (c10k) ✓ Netty ✓ java nio made easy ✓ modelo flexible para crear servidores NIO (o OIO)
  • 102.
    ¿¿ Preguntas ?? Weare hiring! Apply at jobs@zaubersoftware.com

Editor's Notes