R2DBC

R2DBC

 

Reactive Relational Database Connectivity(R2DBC)는 비동기·논블로킹 방식으로 관계형 데이터베이스에 접근하기 위해 설계된 프로그래밍 모델입니다. 이 프로젝트는 2017년 Pivotal에서 개발을 시작하여 2018년부터 공식적으로 후원되기 시작했습니다.

 

R2DBC는 Reactive Streams 사양을 기반으로 하며, Project Reactor 위에서 구현된 관계형 데이터베이스용 Reactive 드라이버 표준을 제공합니다. 이를 통해 기존 JDBC의 동기 처리 한계를 벗어나, 완전히 비동기적인 데이터베이스 접근을 지원합니다.

 

 

JDBC/JPA 와 Non-Blocking

 

JDBC는 1997년에 등장한 아주 오래된 API 스펙으로, JDBC 드라이버는 소켓 read/write를 동기 방식으로 수행합니다. WebFlux는 이벤트 루프 기반으로 수천 개 요청을 적은 스레드로 처리해야 하는데, JDBC가 스레드를 멈춘다면 전체 서버의 처리량이 크게 떨어질 수밖에 없습니다.

 

JPA/Hibernate는 내부적으로 JDBC를 사용합니다. JPA 내부 커넥션 풀도 결국 JDBC Connection 객체로 구성되어 있고, EntityManager의 모든 쿼리도 JDBC를 감싸는 구조입니다. 따라서 JPA도 논블로킹을 지원할 수 있는 기반을 갖고 있지 않습니다.

 

JDBC 스펙의 한계는 다음과 같습니다.

 

  • 콜백 기반 API 없음
  • Future/Promise 개념 없음
  • Reactive Streams 개념 없음
  • 드라이버와 커넥션 관리가 스레드에 종속적

 

 

R2DBC SPI

 

R2DBC SPI(Service Provider Interface)는 Reactive 방식으로 관계형 데이터베이스와 통신하기 위한 드라이버 표준 규약입니다.
R2DBC 전체 구조의 핵심을 이루는 계층으로, Spring Data R2DBC와 같은 상위 프레임워크는 이 SPI가 정의한 인터페이스만을 통해 데이터베이스에 접근합니다.

 

전통적인 JDBC가 동기·블로킹 API로 구성되어 있는 반면, R2DBC SPI는 Reactive Streams 사양을 기반으로 비동기·논블로킹 DB 접근 모델을 표준화하기 위해 설계되었습니다. 따라서 각 데이터베이스(MySQL, PostgreSQL 등)는 이 SPI를 구현한 드라이버를 제공하며, 실제 네트워크 I/O와 프로토콜 처리는 구현체 내부의 Reactive DB Client에서 수행됩니다.

 

다음은 R2DBC SPI의 4가지 핵심 인터페이스들입니다.

 

ConnectionFactory

ConnectionFactory는 R2DBC에서 데이터베이스 연결을 비동기로 생성하는 팩토리 인터페이스입니다.
모든 R2DBC 동작의 시작점이며, JDBC의 DriverManager 또는 DataSource 역할에 대응합니다.

public interface ConnectionFactory {

    Publisher<? extends Connection> create();

    ConnectionFactoryMetadata getMetadata();

}

 

Connection

Connection은 데이터베이스와의 실제 세션(Session) 을 의미합니다.
트랜잭션 제어, Statement 생성, Connection 종료 등 DB 작업의 중심이 되는 객체입니다.

public interface Connection extends Closeable {

    Publisher<Void> beginTransaction();

    Publisher<Void> beginTransaction(TransactionDefinition definition);

    @Override
    Publisher<Void> close();

    Publisher<Void> commitTransaction();

    Batch createBatch();

    Publisher<Void> createSavepoint(String name);

    boolean isAutoCommit();

    IsolationLevel getTransactionIsolationLevel();

    Publisher<Void> releaseSavepoint(String name);

    Publisher<Void> rollbackTransaction();

    Publisher<Void> rollbackTransactionToSavepoint(String name);

    Publisher<Void> setAutoCommit(boolean autoCommit);

    Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel);

}

 

Statement

Statement는 SQL 문장과 파라미터를 보관하고 실행하는 객체입니다.
JDBC의 PreparedStatement와 유사한 역할을 하지만 완전 비동기입니다.

public interface Statement {

    Statement add();

    Statement bind(int index, Object value);

    Statement bind(String name, Object value);

    Statement bindNull(int index, Class<?> type);

    Statement bindNull(String name, Class<?> type);

    Publisher<? extends Result> execute();

}

 

Result & Row

Result는 Statement 실행 결과를 나타내는 Reactive 객체입니다.
DB에서 전달되는 Row를 스트리밍 방식으로 Flux Row 형태로 제공합니다.

public interface Result {

    Publisher<Long> getRowsUpdated();

    <T> Publisher<T> map(BiFunction<Row, RowMetadata, ? extends T> mappingFunction);

    Result filter(Predicate<Segment> filter);

    <T> Publisher<T> flatMap(Function<Segment, ? extends Publisher<? extends T>> mappingFunction);

    interface Segment {

    }

    interface RowSegment extends Segment {

        Row row();

    }

    interface Message extends Segment {

        R2dbcException exception();

        int errorCode();

        @Nullable
        String sqlState();

        String message();

    }

}
public interface Row extends Readable {

    RowMetadata getMetadata();

}

public interface RowMetadata {

    ColumnMetadata getColumnMetadata(int index);

    ColumnMetadata getColumnMetadata(String name);

    List<? extends ColumnMetadata> getColumnMetadatas();

}

 

 

R2DBC MySQL

 

R2DBC SPI는 단순한 인터페이스 모음이기 때문에, 실제 동작을 이해하려면 드라이버가 SPI를 어떻게 구현했는지를 확인해야 합니다. MySQL의 R2DBC 드라이버는 오픈소스로 공개되어 있으며, Netty 기반 Reactive Client와 MySQL 프로토콜을 직접 구현하고 있습니다.

 

이제부터 r2dbc-mysql의 실제 구현체를 살펴보면서 SPI 규약이 어떻게 구현되어 있는지, 그리고 내부적으로 Reactive DB Client(Netty)와 어떻게 연결되는지 그 흐름을 살펴보겠습니다.

 

MySqlConnectionFactory

MySqlConnectionFactory는 R2DBC MySQL 드라이버에서 데이터베이스 연결을 생성하는 진입점입니다.
create()를 호출하면 설정 객체(MySqlConnectionConfiguration)를 기반으로 Client.connect()를 호출하여 Netty 기반의 Reactive DB Client를 만들고, 이를 포함한 MySqlSimpleConnection을 반환합니다.

public final class MySqlConnectionFactory implements ConnectionFactory {

    private final MySqlConnectionConfiguration configuration;
    private final LazyQueryCache queryCache;

    private MySqlConnectionFactory(MySqlConnectionConfiguration configuration) {
        this.configuration = configuration;
        this.queryCache = new LazyQueryCache(configuration.getQueryCacheSize());
    }

    @Override
    public Mono<? extends MySqlConnection> create() {
        ...
        return getMySqlConnection(
                configuration, ssl,
                queryCache,
                address,
                user,
                password
        );
    }

    @Override
    public ConnectionFactoryMetadata getMetadata() {
        return MySqlConnectionFactoryMetadata.INSTANCE;
    }

    public static MySqlConnectionFactory from(MySqlConnectionConfiguration configuration) {
        return new MySqlConnectionFactory(configuration);
    }

    private static Mono<MySqlConnection> getMySqlConnection(
        final MySqlConnectionConfiguration configuration,
        final MySqlSslConfiguration ssl,
        final LazyQueryCache queryCache,
        final SocketAddress address,
        final String user,
        @Nullable final CharSequence password
    ) {
        return Mono.fromSupplier(() -> {
            ...
        }).flatMap(context -> Client.connect(
            ssl,
            address,
            configuration.isTcpKeepAlive(),
            configuration.isTcpNoDelay(),
            context,
            configuration.getConnectTimeout(),
            configuration.getLoopResources(),
            configuration.getResolver(),
            configuration.isMetrics()
        )).flatMap(client -> {
            ...
            
            new MySqlSimpleConnection(
                client,
                codecs,
                queryCache.get(),
                configuration.getPreferPrepareStatement()
            )
        });
    }

    ...
    
}

 

MySqlConnectionConfiguration

MySqlConnectionConfiguration은 MySQL 접속에 필요한 정보(호스트, 포트, 사용자, 비밀번호, 데이터베이스, 타임아웃 등)를 담는 불변(immutable) 구성 객체입니다. ConnectionFactory와 Client가 연결 과정을 수행할 때 필요한 모든 설정을 제공하는 역할을 합니다.

public final class MySqlConnectionConfiguration {

    private static final int DEFAULT_PORT = 3306;

    private final String domain;

    private final int port;

    @Nullable
    private final Duration connectTimeout;

    private final String user;

    @Nullable
    private final CharSequence password;

    private final String database;

    @Nullable
    private final Duration statementTimeout;
    
    ...
}

 

Client

Client.connect()는 Reactor Netty의 TcpClient를 사용해 MySQL 서버와 비동기 TCP 연결을 생성하고, 이 연결을 처리하는 ReactorNettyClient를 반환합니다. ReactorNettyClient는 실제 MySQL 프로토콜 송수신, 패킷 처리, 스트리밍 결과 전송 등 R2DBC 드라이버의 핵심 I/O 동작을 담당합니다.

public interface Client {

    Mono<Void> close();

    Mono<Void> forceClose();
    
    ...

    static Mono<Client> connect(MySqlSslConfiguration ssl, SocketAddress address, boolean tcpKeepAlive,
        boolean tcpNoDelay, ConnectionContext context, @Nullable Duration connectTimeout,
        LoopResources loopResources, @Nullable AddressResolverGroup<?> resolver, boolean metrics) {

        TcpClient tcpClient = TcpClient.newConnection()
            .runOn(loopResources)
            .metrics(metrics);

        ...

        return tcpClient.remoteAddress(() -> address).connect()
            .map(conn -> new ReactorNettyClient(conn, ssl, context));
    }
}

 

 

다음은 직접 살펴본 메서드를 바탕으로 R2DBC MySQL 드라이버를 직접 사용해서 커넥션을 생성 후 쿼리를 실행하는 예제입니다.

public static void main(String[] args) {

    MySqlConnectionConfiguration configuration =
        MySqlConnectionConfiguration.builder()
            .host("127.0.0.1")
            .port(3306)
            .user("root")
            .password("password")
            .database("testdb")
            .build();

    ConnectionFactory connectionFactory = MySqlConnectionFactory.from(configuration);

    Mono.from(connectionFactory.create())
        .flatMapMany(connection ->
            connection.createStatement("SELECT id, name FROM users")
                      .execute()
                      .flatMap(result ->
                          result.map((row, metadata) ->
                              row.get("name", String.class)
                          )
                      )
                      .concatWith(close(connection))
        )
        .doOnNext(System.out::println)
        .blockLast();
}

private static Mono<Void> close(Connection connection) {
    return Mono.from(connection.close());
}
실제로는 blockLast()를 호출하지 않고 비동기적으로 호출을 해야하나, 위 예제에서는 메인 함수가 먼저 종료될 수 있어 블로킹 호출을 하였습니다.

'Spring > Data' 카테고리의 다른 글

Spring Data R2DBC  (0) 2025.12.23