Netty Server

Netty Server

 

이번 글에서는 앞서 Netty - EventLoop와 Channel 에서 살펴본 Netty로 간단한 echo server를 만들어보고, Codec과 Bootstrap에 대해 살펴보겠습니다.

 

Netty - EventLoop와 Channel

Netty 고성능 네트워크 서버를 설계할 때 가장 먼저 맞닥뜨리는 한계는 스레드 기반 동시성 모델의 비효율성입니다. 요청마다 스레드를 생성하거나, 블로킹 I/O로 인해 스레드가 대기 상태에 머무

freshdev.tistory.com

 

BaseEchoServer

 

@Slf4j
public class BaseEchoServer {
    public static void main(String[] args) {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup(4);

        NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
        parentGroup.register(serverSocketChannel);
        serverSocketChannel.pipeline().addLast(acceptor(childGroup));

        serverSocketChannel.bind(new InetSocketAddress(8080))
                .addListener(future -> {
                    if (future.isSuccess()) {
                        log.info("Server bound to port 8080");
                    }
                });
    }

    private static ChannelInboundHandler acceptor(EventLoopGroup childGroup) {
        EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);

        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("Acceptor.channelRead");
                if (msg instanceof SocketChannel socketChannel) {
                    socketChannel.pipeline()
                            .addLast(executorGroup, new LoggingHandler(LogLevel.INFO))
                            .addLast(echoHandler());
                    childGroup.register(socketChannel);
                }
            }
        };
    }

    private static ChannelInboundHandler echoHandler() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof ByteBuf buf) {
                    try {
                        int len = buf.readableBytes();
                        CharSequence body = buf.readCharSequence(len, StandardCharsets.UTF_8);
                        log.info("EchoHandler.channelRead: " + body);

                        buf.readerIndex(0);  // rewind
                        ByteBuf result = buf.copy();
                        ctx.writeAndFlush(result)
                                .addListener(ChannelFutureListener.CLOSE);
                    } finally {
                        ReferenceCountUtil.release(msg);
                    }
                }
            }
        };
    }
}

 

Netty 서버는 일반적으로 두 개의 EventLoopGroup을 사용합니다.

  • parentGroup: ACCEPT 이벤트 담당
    (ServerSocketChannel 등록, 연결 수락)
  • childGroup: READ/WRITE 이벤트 담당
    (실제 데이터 I/O 처리)

1. ServerSocketChannel 생성 및 parentGroup 등록

먼저 NioEventLoopGroup을 parentGroup과 childGroup으로 나누어 생성하고, NioServerSocketChannel을 직접 생성하여 parentGroup의 EventLoop에 등록합니다. 서버 채널이 EventLoop에 등록되면 Selector가 ACCEPT 이벤트를 감지할 준비가 완료되며, 이후 서버 채널의 pipeline에 Acceptor 역할을 수행하는 핸들러를 등록하여 새로운 연결이 들어왔을 때 child 채널 초기화를 처리할 수 있도록 구성합니다.

Netty에서는 accept 이벤트를 위한 전용 메서드가 없습니다. Java NIO에서 ServerSocketChannel.accept() 가 발생하면 Netty는 그 결과로 생성된 SocketChannel을 inbound read 이벤트처럼 처리하여 channelRead()로 전달한다.

 

2. Acceptor 핸들러에서 socketChannel 초기화 및 childGroup 등록

ACCEPT 이벤트가 발생하면 Acceptor 핸들러의 channelRead가 호출되며, msg로 전달된 SocketChannel에 대해 child pipeline을 설정합니다. 여기에서는 LoggingHandler와 Echo 기능을 제공하는 EchoHandler를 순서대로 추가하고, 이렇게 초기화된 socketChannel을 childGroup의 EventLoop 중 하나에 등록하여 향후 READ/WRITE 이벤트를 처리하도록 분리합니다.

 

3. EchoHandler에서 메시지 처리 및 writeAndFlush 수행

childGroup에 등록된 socketChannel이 데이터를 수신하면 EchoHandler의 channelRead가 실행됩니다. 이 핸들러는 수신한 ByteBuf의 내용을 읽어 로그로 출력하고, 원본 데이터를 유지하기 위해 copy()로 새로운 ByteBuf를 생성한 뒤 writeAndFlush를 통해 클라이언트로 응답을 전송합니다. write 작업이 완료되면 Listener를 통해 채널을 종료하며, 마지막으로 원본 ByteBuf는 명시적으로 release하여 메모리 누수를 방지합니다.

 

 

서버를 실행시킨 후 다음 명령어를 통해 메세지를 보내보겠습니다.

$ echo "hello" | nc 127.0.0.1 8080

 

로그를 확인해보면, 먼저 서버가 포트 8080에 바인딩된 뒤 클라이언트가 메시지를 전송하면 가장 먼저 Acceptor의 channelRead가 활성화되며 새로운 SocketChannel이 생성됩니다.

 

이후 해당 채널은 childGroup의 EventLoop에 등록되고, 등록 과정에서 REGISTERED와 ACTIVE 로그가 출력됩니다. 클라이언트가 보낸 "hello" 메시지는 pipeline의 LoggingHandler에서 먼저 READ 로그로 기록된 뒤, 다음 단계인 EchoHandler에서 실제 메시지 내용을 처리하고 그대로 응답을 전송합니다.

 

최종적으로 writeAndFlush() 호출을 통해 데이터가 클라이언트로 전송되고, 전송이 완료된 시점에 등록해 둔 ChannelFutureListener.CLOSE가 실행되면서 채널이 종료됩니다. 이때 INACTIVE → UNREGISTERED → CLOSE 로그가 순차적으로 출력되며 연결이 정상적으로 닫힌 것을 확인할 수 있습니다.

 

 

EnhancedEchoServer

 

앞의 BaseEchoServer의 경우 로깅을 제외한 모든 처리를 echoHandler에서 처리하는 구조였지만, EnhancedEchoServer에서는 요청과 응답 레이어가 분리된 구조입니다. 추가로 서버 바인딩이 실패할 경우 parentGroup과 childGroup을 종료하는 로직을 추가했습니다.

@Slf4j
public class EnhancedEchoServer {
    public static void main(String[] args) {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup(4);

        NioServerSocketChannel serverSocketChannel = new NioServerSocketChannel();
        parentGroup.register(serverSocketChannel);
        serverSocketChannel.pipeline().addLast(acceptor(childGroup));

        serverSocketChannel.bind(new InetSocketAddress(8080))
                .addListener(future -> {
                    if (future.isSuccess()) {
                        log.info("Server bound to port 8080");
                    } else {
                        log.info("Failed to bind to port 8080");
                        parentGroup.shutdownGracefully();
                        childGroup.shutdownGracefully();
                    }
                });
    }

    private static ChannelInboundHandler acceptor(EventLoopGroup childGroup) {
        EventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);

        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                log.info("Acceptor.channelRead");
                if (msg instanceof SocketChannel socketChannel) {
                    socketChannel.pipeline()
                            .addLast(executorGroup, new LoggingHandler(LogLevel.INFO))
                            .addLast(requestHandler(),
                                    responseHandler(),
                                    echoHandler()
                            );
                    childGroup.register(socketChannel);
                }
            }
        };
    }

    private static ChannelInboundHandler requestHandler() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof ByteBuf buf) {
                    try {
                        int len = buf.readableBytes();
                        CharSequence body = buf.readCharSequence(len, StandardCharsets.UTF_8);
                        log.info("RequestHandler.channelRead: " + body);

                        ctx.fireChannelRead(body);
                    } finally {
                        ReferenceCountUtil.release(msg);
                    }
                }
            }
        };
    }

    private static ChannelOutboundHandler responseHandler() {
        return new ChannelOutboundHandlerAdapter() {
            @Override
            public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                if (msg instanceof String body) {
                    log.info("ResponseHandler.write: " + msg);
                    final ByteBuf buf = ctx.alloc().buffer();
                    buf.writeCharSequence(body, StandardCharsets.UTF_8);
                    ctx.write(buf, promise);
                }
            }
        };
    }

    private static ChannelInboundHandler echoHandler() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof String request) {
                    log.info("EchoHandler.channelRead: " + request);

                    ctx.writeAndFlush(request)
                            .addListener(ChannelFutureListener.CLOSE);
                }
            }
        };
    }
}

 

RequestHandler

RequestHandler는 클라이언트가 전송한 네트워크 데이터를 담고 있는 ByteBuf를 읽어, 애플리케이션 레이어에서 처리하기 쉬운 String 형태로 변환하는 역할을 담당합니다.

 

ResponseHandler

ResponseHandler는 애플리케이션 레이어에서 처리된 결과(String)를 네트워크로 전송 가능한 ByteBuf 형태로 다시 변환하는 역할을 담당합니다. ByteBuf에 대한 자세한 내용은 Netty - ByteBuf에서 살펴보겠습니다.

 

서버를 실행시킨 후 다음 명령어를 통해 메세지를 보내보겠습니다.

$ echo "hello" | nc 127.0.0.1 8080

 

로그를 확인해보면, 이전 BaseEchoServer와 동일하게 서버가 포트 8080에 바인딩된 뒤 클라이언트가 메시지를 전송하면 가장 먼저 Acceptor의 channelRead가 활성화되며 새로운 SocketChannel이 생성됩니다.

 

이후 해당 채널은 childGroup의 EventLoop에 등록되고, 등록 과정에서 REGISTERED와 ACTIVE 로그가 출력됩니다. 클라이언트가 보낸 "hello" 메시지는 pipeline의 LoggingHandler에서 먼저 READ 로그로 기록된 뒤, RequestHandler를 지나면서 ByteBuf → String 형태로 디코딩됩니다. 이후 EchoHandler에서 해당 문자열을 출력하고 응답으로 다시 내려보내면, 이 응답 문자열은 outbound 흐름을 따라 ResponseHandler로 전달되어 네트워크 전송이 가능한 String → ByteBuf 형태로 인코딩됩니다.

 

최종적으로 writeAndFlush() 호출을 통해 데이터가 클라이언트로 전송되고, 전송이 완료된 시점에 등록해 둔 ChannelFutureListener.CLOSE가 실행되면서 채널이 종료됩니다. 이때 INACTIVE → UNREGISTERED → CLOSE 로그가 순차적으로 출력되며 연결이 정상적으로 닫힌 것을 확인할 수 있습니다.

 

 

CodecEchoServer

 

Encoder, Decoder, Codec

Netty에서는 앞에서 구현한 EnhancedEchoServer의 RequestHandler(Decoder 역할)와 ResponseHandler(Encoder 역할)와 동일한 책임을 수행하는 다양한 Codec 클래스들을 이미 기본으로 제공하고 있습니다.

Message는 객체를, Byte는 bytes를 가리킵니다.

 

이 중에서 StringEncoder와 StringDecoder를 사용해 RequestHandler와 ResponseHandler를 대체할 수 있습니다.

 

ServerBootstrap

Netty에서 서버를 생성할 때 서버 설정을 단계적으로 구성할 수 있는 편의 클래스인 사용하는 ServerBootstrap을 제공합니다. 이 객체는 EventLoopGroup 등록부터 채널 타입 지정, 핸들러 초기화, 포트 바인딩까지 서버 구동에 필요한 설정을 체계적으로 적용할 수 있도록 도와줍니다.

 

ServerBootstrap은 다음과 같은 주요 메서드를 갖습니다.

 

  • group()
    서버 스레드 구조를 설정 (parentGroup은 연결 수락, childGroup은 read/write를 처리)
  • channel()
    사용할 Channel 타입을 지정 (ServerSocketChannel)
  • childHandler(new ChannelInitializer<SocketChannel>() { ... })
    새로 accept된 Channel마다 실행할 초기화 로직을 등록 (파이프라인 설정)
  • option()
    서버 소켓 채널(ServerSocketChannel, 부모 채널)에 적용되는 옵션을 설정
  • childOption()
    accept된 후 생성되는 클라이언트 소켓 채널(SocketChannel, 자식 채널)에 적용할 옵션을 설정
  • bind(port)
    지정한 포트로 서버를 바인딩 (결과는 ChannelFuture로 반환)

childHandler()의 인자로 사용되는 ChannelInitializer는 새로 생성된 Channel을 초기화하는 데 특화된 핸들러로, 보통 이 안에서 애플리케이션 로직을 구현하기 위해 ChannelPipeline에 여러 핸들러를 추가합니다. 애플리케이션 규모가 커질수록 파이프라인에 추가되는 핸들러 수도 증가할 것이며, 결국 이 익명 클래스를 별도의 최상위 클래스로 분리하게 되는 경우도 많습니다.

 

ServerBootstrap의 bind()가 호출되면 내부적으로 init() 메서드가 실행되고, 이 과정에서 ServerSocketChannel의 파이프라인에 ServerBootstrapAcceptor가 자동으로 등록됩니다.

@Override
void init(Channel channel) {
    ...

    ChannelPipeline p = channel.pipeline();
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs,
                            extensions));
                }
            });
        }
    });
	
    ...
}

 

이전에 작성했던 EnhancedEchoServer에 Codec과 ServerBootstrap을 추가하여 CodecEchoServer를 만들어보겠습니다.

@Slf4j
public class CodecEchoServer {
    public static void main(String[] args) throws InterruptedException {
        EventLoopGroup parentGroup = new NioEventLoopGroup();
        EventLoopGroup childGroup = new NioEventLoopGroup(4);
        DefaultEventExecutorGroup executorGroup = new DefaultEventExecutorGroup(4);
        StringEncoder stringEncoder = new StringEncoder();
        StringDecoder stringDecoder = new StringDecoder();

        try {
            ServerBootstrap b = new ServerBootstrap();
            ChannelFuture f = b.group(parentGroup, childGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(executorGroup, new LoggingHandler(LogLevel.INFO))
                                    .addLast(stringEncoder, stringDecoder, echoHandler());
                        }
                    })
                    .option(ChannelOption.SO_BACKLOG, 128)
                    .childOption(ChannelOption.SO_KEEPALIVE, true)
                    .bind(8080);

            log.info("Server bound to port 8080");

            f.channel().closeFuture().sync();
        } finally {
            parentGroup.shutdownGracefully();
            childGroup.shutdownGracefully();
            executorGroup.shutdownGracefully();
        }
    }

    private static ChannelInboundHandler echoHandler() {
        return new ChannelInboundHandlerAdapter() {
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                if (msg instanceof String request) {
                    log.info("EchoHandler.channelRead: " + request);

                    ctx.writeAndFlush(request)
                            .addListener(ChannelFutureListener.CLOSE);
                }
            }
        };
    }
}

 

동일하게 서버를 실행시킨 후 다음 명령어를 통해 메세지를 보내보겠습니다.

$ echo "hello" | nc 127.0.0.1 8080

 

로그를 확인해보면 대체한 Codec과 ServerBootstrap이 잘 동작하는 것을 알 수 있습니다.

'Spring > Webflux' 카테고리의 다른 글

Reactor operators  (0) 2025.12.08
Netty - ByteBuf  (2) 2025.12.03
Netty - EventLoop와 Channel  (0) 2025.12.02
Spring portfolio  (0) 2025.11.27
Reactive Programing  (0) 2025.11.25