본문 바로가기
공부/NoSQL

[NoSQL] Redis Pub/Sub 이론부터 코드까지! A to Z (2)

by persi0815 2025. 12. 18.

진행하고 있는 프로젝트의 운영환경의 서버가 다중 인스턴스로 되어 있다. 한 서버로의 변경 요청이 왔을 때, 이를 SSE로 해당 서버와 연결된 모든 클라이언트에 전달하고 있었는데, 다른 서버에 연결된 클라이언트들은 해당 변경 사항을 전달받지 못하는 문제가 생겼다. 해당 문제를 해결하기 위해 메시지 브로커 방식을 고려해 보았고, Redis Pub/Sub을 활용한 메시징 큐 방식을 적용할 수 있었다. 이 과정에서 학습했던 Redis의 Pub/Sub 내용에 대해 정리해보고자 글을 쓴다. 

 

참고한 자료는 Redis 공식문서, Spring Data Redis 공식문서, 개발자를 위한 레디스다. 

 

해당 글은 이전 글에서 다룬 이론을 토대로, Spring Boot 프로젝트에 적용한 코드를 다룬다.  


Spring Data Redis 파해치며 코드로 작성해보자. 

Spring Data Redis는 Redis를 위한 전용 메시징 통합 기능을 제공하는데, 두가지 핵심 기능으로 나뉜다.

메시지를 발생하는 생산자 역할인 Publication과 메시지를 구독하는 소비자 역할인 Subscription. 

 

생산자는 평소 데이터 저장시 사용하던 RedisTemplate을 사용해서 메시지를 발행합니다. 

 

소비자는 비동기 수신과 동시 수신 두가지 방법이 모두 가능한데,

비동기 수신은 메시지가 올 때까지 기다리지 않고 자기가 할 일을 하다가 메시지가 오면 반응하는 방식이다. 이때, 전용 메시지 리스터 컨테이너를 사용하여 메시지 수신기인 Message-Driven POJOs(MDPs)를 만든다

동기 방식은 메시지가 올 때까지 실시간으로 연결을 붙잡고 기다리는 방식으로, RedisConnection 수준의 저수준 접근이 필요하다. 

 

Publishing (Sending Messages)

메시지를 발행할 때 두 가지 레벨의 도구가 있다. 둘 다 publish 메서드를 제공하지만, 로직이 다르다. 

1) Low-level : (Reactive)RedisConnection

날것의 바이트 배열을 직접 다뤄야 한다. 

// send message through connection
RedisConnection con = …
byte[] msg = …
byte[] channel = …
con.pubSubCommands().publish(msg, channel);

 

2) High-level : (Reactive)RedisOperations

객체를 메시지에 담을 수 있으며, convertAndSend를 통해 직렬화와 전송이 한번에 가능하다. 

또한, 메시지를 성공적으로 수신한 클라이언트의 수를 리턴해주기도 한다. 

// send message through RedisOperations
RedisOperations operations = …
Long numberOfClients = operations.convertAndSend("hello!", "world");

 

*Reactive는 비동기 논블로킹 환경을 위한 것으로, 결과를 기다리지 않고, 스트림 형태로 처리한다. 

 

그렇다면, 먼저 RedisOperations 인터페이스를 구현한 객체를 만들어야 한다. new로 객체를 만들고, connectionFactory와 ObjectMapper, Serializer를 설정해줬다. 

@Bean
public RedisTemplate<String, SseRedisMessage> sseRedisTemplate(RedisConnectionFactory connectionFactory) {
    RedisTemplate<String, SseRedisMessage> template = new RedisTemplate<>();
    template.setConnectionFactory(connectionFactory);

    // ObjectMapper 설정
    ObjectMapper objectMapper = new ObjectMapper();
    objectMapper.registerModule(new JavaTimeModule());
    objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);

    // Serializer 설정
    Jackson2JsonRedisSerializer<SseRedisMessage> serializer =
        new Jackson2JsonRedisSerializer<>(objectMapper, SseRedisMessage.class);

    template.setKeySerializer(new StringRedisSerializer());
    template.setValueSerializer(serializer);
    template.setHashKeySerializer(new StringRedisSerializer());
    template.setHashValueSerializer(serializer);

    template.afterPropertiesSet();
    return template;
}

Redis는 기본적으로 Binary Safe한 시스템으로 어떤 바이트 데이터가 들어오든 그대로 저장할 수 있다. 하지만, 사람이 cli로 데이터를 조회했을 때 알아듣기 쉽도록, 문제없이 다시 객체로 만들 수 있도록 직렬화 설정을 해준다. 여기서는 크게 상관없지만, Redis 내부의 연산을 위해서 Redis가 이해할 수 있는 data type 중 하나로 모양을 바꾸어 전달하기도 한다. *Redis의 data type에 대해서 궁금하다면, https://redis.io/docs/latest/develop/data-types/를 참고하자. 

 

만일 따로 직렬화 설정을 안하면, 기본값으로 JdkSerializationRedisSerializer 직렬화 방식을 쓰는데, 이는 자바 객체를 이진 바이트로 바꾸어 Redis에서 조회하면 ' \xac\xed\x00\x05sr\x00\x12...'와 같이 알아들 수 없다. 

그래서 String 타입은 StringRedisSerializer로 문자열 그대로로 두게끔 하고, SseRedisMessage 클래스 객체들은 Jackson2JsonRedisSerializer로 Json 타입으로 직렬화했다. 

 

또한, LocalDateTime 타입을 ObjectMapper 없이 넘기면 Json 변환기가 이를 숫자의 배열로 만든다. 하지만, 배열 방식은 가독성이 너무 떨어져 ObjectMapper를 통해 "2023-10-27T10:00:00"과 같은 문자열로 바꿔주었다. 

 

위에서 만든 RedisTemplate 객체를 롬복으로 주입받아, SSE_TOPIC에 메시지를 발행한다면 다음과 같다. 

@Slf4j
@Component
@RequiredArgsConstructor
public class SseRedisPublisher {

	private static final String SSE_TOPIC = "SSE_TOPIC";
	private final RedisTemplate<String, SseRedisMessage> sseRedisTemplate;

	public void publish(SseRedisMessage message) {
		try {
			sseRedisTemplate.convertAndSend(SSE_TOPIC, message);
			log.debug("Published SSE message to Redis - topic: {}, event: {}, correlationId: {}",
				message.topic(), message.event(), message.correlationId());
		} catch (Exception e) {
			log.error("Failed to publish SSE message to Redis - topic: {}, event: {}, error: {}",
				message.topic(), message.event(), e.getMessage(), e);
		}
	}
}

 

Subscribing (Receiving Messages)

RedisConnection은 subscribe와 pSubscribe 메서드를 통해 여러 채널이나 패턴을 인자로 넘겨 구독할 수 있다. 또한, getSubscription, isSubscribed 메서드로 현재 구독 상태를 조회하거나 수정할 수 있다. 

 

하지만, 특정 연결(Connection)에서 subscribe를 호출하면, 해당 연결을 관리하는 스레드는 메시지가 올 때까지 '메시지 수신 대기 상태'가 되어 아무것도 못하고 Block된다. 다른 스레드가 해당 연결에 대해 unsubscribe를 호출하면 그제서야 스레드는 풀려나게 된다. 이로 인해 스레드 자원이 낭비되는 문제가 있었다. 이러한 스레드 Block 문제를 해결하기 위한 솔루션으로 Message Listener Container가 등장했다.

 

메시지 수신 대기 상태가 되면, 구독 관련 명령 외에 SET, GET 과 같은 일반 명령을 내리면 예외가 발생한다고 나와있었는데, 이를 통해 Spring Data Redis는 기본적으로 RESP2 프로토콜을 사용한다는 것을 알 수 있었고, RESP3은 드라이버 설정을 통해 활성화할 수 있다고 한다. 다만, Message Listener Container를 사용하게 되면, 연결 하나로 구독과 일반 명령을 섞어 쓰는 멀티플렉싱 기술이 필요없어진다. 

 

메시지를 받았을 때 무엇을 할지 정의하려면, MessageListener 인터페이스를 구현하면 된다. onMessage()는 새 메시지가 도착할 때마다 호출되는데, 해당 메서드를 통해 message의 내용, message가 들어온 채널 이름, 패턴 구독시 일치했던 패턴 이름 정보를 얻을 수 있다. 

 

Message Listener Containers

Redis의 저수준 구독은 Blocking 방식이라서, 리스너 하나하나마다 연결과 스레드를 직접 관리해야 하는 번거로움이 있었다. 이를 해결하기 위해 Spring은 MDP와 Redis 사이에서 채널 등록, 메시지 수신 위한 스레드 관리, 자원(연결) 획득 및 해제, 예외 변환 등 모든 인프라적인 설정을 돕는 컨테이너(RedisMessageListenerContainer)를 제공하고, 개발자는 비즈니스 로직(MDP)에만 집중하도록 했다. 

 

만일 구독 확인이나 구독 해지 확인 이벤트를 받기 위해서는 SubscriptionListener를 구현하면 된다. 컨테이너를 사용하는 Redis Pub/Sub은 비동기로 작동하기에 subscribe()를 호출했다고 해서 즉시 메시지를 받을 상태가 되는 것이 아니다. 네트워크를 타고 Redis 서버에 도달해 등록되는 과정이 필요하기에 동기화 문제를 방지하거나 해결하기 위해 SubscriptionListener를 사용할 수 있다. 

 

Message Listener Container를 통해 채널을 구독하고 나면, 컨테이너가 Redis 채널로부터 메시지를 받아 등록된 Message Listener들에게 전달(Dispatch)한다.

"Message Listener Container lets one connection and one thread be shared by multiple listeners even though they do not share a subscription."
by https://docs.spring.io/spring-data/redis/reference/redis/pubsub.html

이때, 여러 리스너가 서로 다른 채널을 구독하더라도, 컨테이너는 "단 하나의 연결과 단 하나의 스레드를 공유"해서 사용할 수 있게 해준다. 이를 통해 애플리케이션이 수백 개의 채널을 추적하더라도 실행 비용이 일정하게 유지할 수 있고, 서버를 재시작하지 않고도 실행 중에 리스너를 추가하거나 제거할 수 있다. 또한, 리스너가 필요할 때만 연결을 맺고, 모든 리스너가 사라지면 자동으로 연결을 끊고 스레드를 해제한다. 

 

비동기 처리를 하기에 이를 효율적으로 처리하기 위해 컨테이너는 자바의 Executor 또는 Spring의 TaskExecutor를 필요로 한다. 또한, 서비스의 부하에 따라 스레드 풀 크기를 조절하여 동시 처리에 최적의 성능을 낼 수 있도록 설정하기 위해서 운영환경에서는 적절한 TaskExecutor 선택이 권장된다고 한다. (해당 내용의 코드도 아래에서 살펴보자)

 

The MessageListenerAdapter

지금까지 MDP와 Redis 사이에서 인프라적인 설정을 돕는 컨테이너에 대해 알아보았는데, 그러면 이제 실질적으로 메시지를 받은 후의 처리를 담당하는 MessageListenerAdapter를 알아보자.

"In a nutshell, it lets you expose almost any class as a MDP"

MessageListenerAdapter는 거의 모든 종류의 일반 클래스를 메시지 구독 객체(MDP)로 탈바꿈 시켜준다. 원래는 위에 나왔던것처럼 Redis의 메시지를 받으려면 MessageListener 인터페이스를 상속받아 onMessage(Message message, byte[] pattern) 같은 정해진 메서드를 구현해야 한다. 하지만, adapter를 사용하면 해당 인터페이스를 구현할 필요가 없어진다. 

 

먼저, 수신한 메시지를 처리하는 메서드(handleMessage)를 담은 MDP(SseRedisSubscriber)를 만든다. 아래와 같이 Redis로 메시지를 받아서 메시지 토픽의 Emitter를 통해 SSE를 전달하게 했다. 이처럼 따로 만든 클래스에는 redis 의존성을 하나도 import하지 않았는데, 그렇기에 Redis 없이도 단독으로 테스트하기 좋고, 다른 메시징 시스템으로 바꿔도 로직을 수정할 필요가 없어 확장성이 뛰어나다

@Slf4j
@Component
@RequiredArgsConstructor
public class SseRedisSubscriber {

	private final SseEmitterRegistry sseEmitterRegistry;
    
	private final ObjectMapper objectMapper = new ObjectMapper()
		.registerModule(new JavaTimeModule());

	public void handleMessage(String message) {
		try {
			SseRedisMessage redisMessage = objectMapper.readValue(message, SseRedisMessage.class);
			sendToLocalEmitters(redisMessage);
		} catch (Exception e) {
			log.error("Failed to deserialize Redis message: {}", e.getMessage(), e);
		}
	}
       ...

 

그리고 Adapter가 메시지 처리를 할 수 있도록, 만들어둔 MDP를 생성자 주입으로 주입받는 MessageListenerAdapter 타입의 객체를 만든다. 그러면 Redis에서 메시지가 오면 adapter가 지정된 객체 안에서 "handleMessage"라는 이름의 메서드를 찾아 실행한다. 

@Bean
public MessageListenerAdapter sseMessageListenerAdapter(SseRedisSubscriber subscriber) {
    return new MessageListenerAdapter(subscriber, "handleMessage");
}

 

수신 메서드들은 메시지의 내용에 따라 다양한 타입으로 선언될 수 있는데, apapter가 중간에서 Redis의 바이트 데이터를 원하는 자바 타입으로 자동 변환해줄 수 있다. 예를 들어 메시지가 문자열이면, handleMessage(String s)가 호출되고, 메시지가 JSON 객체면 handleMessage(Map m)가 호출된다.  

 

Adapter에 Serializer를 설정하면, 저수준 바이트 데이터를 고수준 자바 객체로 변환할 수 있다. 그리고, 메서드 실행 중에 에러가 나도 서버가 뻗지 않고, 컨테이너가 예외를 낚아채서 로그를 남기는 등 기본 처리를 대신 해준다. 위 코드에서는 handleMessage 메서드가 String으로 받아서 메서드 안에서 객체 변환을 하고 있는데, Adapter에 직렬화 기능을 추가하려면 다음과 같이 작성하면 된다. 

@Bean
public MessageListenerAdapter sseMessageListenerAdapter(SseRedisSubscriber subscriber) {
    MessageListenerAdapter adapter = new MessageListenerAdapter(subscriber, "handleMessage");
    
    // 만약 날짜 타입(LocalDateTime 등)이 있다면 ObjectMapper 설정이 필요
    ObjectMapper objectMapper = new ObjectMapper()
        .registerModule(new JavaTimeModule()); // Java 8 날짜 모듈 등록
    serializer.setObjectMapper(objectMapper); // 생성한 ObjectMapper 주입!!
    
    adapter.setSerializer(serializer);
    return adapter;
}

 

그러면 수신부는 다음과 같이 바뀔 수 있다. 

// AS-IS: String으로 받아서 직접 파싱
public void handleMessage(String message) {
    SseRedisMessage redisMessage = objectMapper.readValue(message, SseRedisMessage.class);
    // ... 로직
}

// TO-BE: 어댑터가 이미 변환해준 객체를 바로 받음
public void handleMessage(SseRedisMessage redisMessage) {
    // 이미 redisMessage는 완벽한 객체 상태
    log.debug("Received Redis message - topic: {}", redisMessage.topic());
    sendToLocalEmitters(redisMessage);
}

 

다음으로 Adapter를 컨테이너에 등록을 해주면 된다. 근데, 이전에 언급됐던 TaskExecutor로 스레드 개수 설정하는 부분과 함께 알아보자. 

Batch로 최대 100개 메시지를 뭉쳐 최대 10개의 스레드로 처리하도록 하는 예시를 작성해봤다. 스레드 개수나 큐 크기는 상황에 맞게 조절하면 된다. 

@Bean
public TaskExecutor messageBatchTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(5);    // 기본적으로 5개 스레드 유지
    executor.setMaxPoolSize(10);     // 최대 10개까지 확장
    executor.setQueueCapacity(100);  // 대기 줄은 100개까지
    executor.setThreadNamePrefix("RedisSub-");
    executor.initialize();
    return executor;
}

이렇게 messageBatchTaskExecutor 빈을 만들어줬고, 이제 컨테이너에 Adapter와 함께 다음과 같이 설정해주면 된다. 

@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(
    RedisConnectionFactory connectionFactory,
    TaskExecutor messageBatchTaskExecutor,
    MessageListenerAdapter sseMessageListenerAdapter,
    ChannelTopic sseChannelTopic) {
    RedisMessageListenerContainer container = new RedisMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.addMessageListener(sseMessageListenerAdapter, sseChannelTopic);
    container.setTaskExecutor(messageBatchTaskExecutor);
    return container;
}