본문 바로가기
Backend/Spring Boot

SSE 동작 원리 네트워크 단까지 파해쳐보자!

by persi0815 2025. 9. 2.

프로젝트에서 SSE를 적용하게 될 것 같아 공부해보았다. 
 
아래의 자료들을 참고했다. 
https://docs.spring.io/spring-framework/reference/web/webmvc/mvc-ann-async.html#mvc-ann-async-sse
https://www.baeldung.com/spring-server-sent-events


1. SSE란 무엇일까? 

클라이언트의 요청으로 인해 서버와 클라이언트가 TCP 연결을 맺고, 해당 연결을 유지하며 서버가 클라이언트에게 단방향으로 여러차례 스트림 데이터를 전달(스트리밍)하는 것이다. 이를 통해 클라이언트의 지속적인 요청 없이도 서버에 변화가 생기면 즉시 클라이언트에게 전달할 수 있다는 특징이 있다. 
 
ResponseBodyEmitter라는 반환 타입을 이용해 객체들의 스트림(연속적인 흐름)을 만들어낼 수 있다. 즉, 보통의 API는 데이터(객체) 하나를 주고 끝내지만, 해당 클래스를 사용하면, 하나의 연결 안에서 데이터를 여러번 흐름처럼 보낼 수 있다는 뜻이다. 이때 각 객체는 HttpMessageConverter에 의해 JSON이나 텍스트 같은 데이터 형식으로 자동 직렬화된다. 그 결과가 HTTP 응답에 바로 쓰여지게 된다. 
 
SseEmitter는 ResponseBodyEmitter의 자식 클래스이며, '어떤 데이터'를 보낼 것인가와 '언제까지 연결을 유지'할 것인가를 다룬다. 이때 '누구'에게 보낼 것인가에 대해서는 서블릿 컨테이너(Tomcat)이 쥐고 있는 소켓 연결에 달려있다. 
 
클라이언트가 서버에게 SSE 연결을 요청하게 되면, 서버는 Emitter 객체를 생성하고, 메모리에 저장한 후, 사용자에게 SseEmitter를 반환한다. SseEmitter에는 사용자 관련 정보가 일절 없다. 하지만, 이때 Tomcat이 컨트롤러의 리턴타입을 보고 해당 클라이언트와의 소켓 연결을 유지한다. 그리고 스프링이 알아서 응답 헤더를 설정하고 스트리밍 준비를 하게 된다.

@GetMapping(value = "/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
Public SseEmitter subscribe(
    @RequestParam String topics,
    HttpServletRequest request,
    HttpServletResponse response
){
    String sessionId = request.getSession().getId();
    Set<String> topicSet = parseTopics(topics);

    SseEmitter emitter = new SseEmitter(60 * 60 * 1000L);
    try {
        emitter.send(SseEmitter.event()
            .name(SseEventNames.CONNECTED)
            .reconnectTime(5000L)
            .data("SSE connection established"));

        sseEmitterRegistry.subscribe(sessionId, topicSet, emitter);
        ...
        
     return emitter;
}

produces=MediaType.TEXT_EVENT_STREAM_VALUE 이 부분을 통해 브라우저에게 지금부터 보내는 데이터는 일반 텍스트가 아니라 SSE 형식의 스트림임을 선언해 헤더에 'text/event-stream'을 붙여준다. 이를 통해 event-stream 타입은 데이터가 한 줄(data: ... \n\n)만 들어와도 즉시 자바스크립트의 EventSource 객체로 넘겨준다. 그래서 EventSource 객체로 자바 스크립트에서 아래와 같이 연결 에러처리나 연결 성공 알림 등 이벤트 처리를 하는 것이다. 

const eventSource = new EventSource('/api/sse/subscribe?userId=user123');

// 연결 성공
eventSource.addEventListener('connected', function(event) {
	console.log('SSE 연결됨:', event.data);
	showConnectionStatus('연결됨', 'success');
});

// 주문 완료 알림
eventSource.addEventListener('order_completed', function(event) {
	const orderData = JSON.parse(event.data);
	showNotification(`주문 #${orderData.orderId}이 완료되었습니다.`);
});

// 시스템 공지
eventSource.addEventListener('system_notice', function(event) {
	const notice = JSON.parse(event.data);
	showSystemAlert(notice.message, notice.priority);
});

// Heartbeat (연결 상태 확인)
eventSource.addEventListener('ping', function(event) {
	updateLastHeartbeat(new Date());
});

// 서버 종료 알림
eventSource.addEventListener('shutdown', function(event) {
	showWarning('서버가 재시작됩니다. 잠시 후 다시 연결됩니다.');
	// 자동 재연결 로직 구현 가능
});

// 연결 에러 처리
eventSource.onerror = function(event) {
	console.error('SSE 연결 에러:', event);
	showConnectionStatus('연결 끊김', 'error');
};

// 페이지 종료 시 연결 정리
window.addEventListener('beforeunload', function() {
	eventSource.close();
});

 

또한, 데이터를 전송할 때에는 브라우저가 알아들을 수 있는 W3C SSE 표준 규격에 따라 형식이 지정되어 보낸다. 
 
다만, SSE는 Internet Explorer(IE) 브라우저를 지원하지 않기 때문에 IE까지 포함한 브라우저 호환성이 필요하다면 다른 통신 방식을 고려해야 한다. 또한 HTTP/1.1 환경에서는 브라우저 구현 정책에 따라 동일 호스트에 대해 동시에 유지할 수 있는 TCP 연결 수가 약 6개로 제한될 수 있다. 반면 HTTP/2.0에서는 단일 TCP 연결 내에서 여러 요청을 스트림 단위로 멀티플렉싱할 수 있어 이러한 연결 수 제약이 완화되었다. 

2. 코드로 알아보는 이벤트 발행과 처리

SseEventPublisher: 이벤트 발행

private final ApplicationEventPublisher eventPublisher;

// Topic에 이벤트 발행
public void publishToTopic(String topic, String event, Object data) {
    publishToTopic(topic, event, data, null);
}

// correlationId 포함 Topic 이벤트 발행
public void publishToTopic(String topic, String event, Object data, String correlationId) {
    SseEventDto sseEvent = SseEventDto.builder()
        .topic(topic)
        .event(event)
        .data(data)
        .timestamp(LocalDateTime.now())
        .id(id)
        .build();

    eventPublisher.publishEvent(sseEvent);
    log.debug("SSE event published - topic: {}, event: {}", topic, event);
}

 

SseEventListener

// SSE 이벤트 처리
@EventListener
public void handleSseEvent(SseEventDto event) {
    try {
        if (event.topic() != null && !event.topic().isBlank()) {
            // 발행
            sseService.sendToLocalEmitters(event);
        } else {
            // 전체 브로드캐스트
            sseService.broadcast(event);
        }
    } catch (Exception e) {
        log.error("Failed to handle SSE event: {}", event, e);
    }
}

 

SseServiceImpl: emitter 찾아서 전송

@Override
private void sendToLocalEmitters(SseEventDto sseMessage) {
 	if (!sseMessage.isValid()) {
        log.warn("Invalid SSE event: {}", sseEvent);
        return;
    }
    
    Set<SseEmitter> emitters;

    // Handle broadcast (wildcard topic)
    if ("*".equals(sseMessage.topic())) {
        emitters = sseEmitterRegistry.getAllEmitters();
        log.debug("Broadcasting to all local emitters");
    } else {
        emitters = sseEmitterRegistry.getTopicEmitters(sseMessage.topic());
    }

    if (emitters.isEmpty()) {
        log.debug("No local emitters for topic: {}", sseMessage.topic());
        return;
    }

    emitters.removeIf(emitter -> {
        try {
            emitter.send(SseEmitter.event()
                .name(sseMessage.event())
                .data(sseMessage.data())
                .id(sseMessage.correlationId())
            );
            return false;
        } catch (Exception e) {
            if (e instanceof IOException io && isClientDisconnectionError(io)) {
                log.debug("Client disconnected, removing emitter");
            } else {
                log.warn("Failed to send SSE event to emitter, removing: {}", e.getMessage());
            }
            sseEmitterRegistry.removeEmitter(emitter);
            return true; // true 시 연결이 끊겼다 판단돼, emitter에서 remove
        }
    });
}

 

3. SSE의 format과 전송 과정

  • id: 이벤트의 고유 식별자 (보통 숫자를 1씩 늘리거나 타임스탬프를 사용).
  • data: 실제 전달할 내용.
  • retry: 연결이 끊겼을 때 브라우저가 "몇 초 뒤에 다시 연결할까?"를 정하는 값 (밀리초 단위).
  • event: SSE 타입
emitter.send(SseEmitter.event()
                .name(sseMessage.event())
                .data(sseMessage.data())
                .id(sseMessage.id())
            );

예를 들어 위와 같은 코드로 메시지를 만들고, 전송할 수 있다. SseEmitter class의 정적 메서드(event)를 통해 SseEventBuilderImpl 객체를 만들고, name(), data(), id() 등의 메서드를 통해 이벤트의 데이터를 넣게 된다. 

SseEmitter.class

 
이렇게 모든 값을 담은 최종 SseEventBuilderImpl 객체가 완성되었다면, send()로 메시지를 보냅니다. 

SseEmitter.class
ResponseBodyEmitter.class

결국 SseEmitter가 상속하고 있는 상위 클래스인 ResponseBodyEmitter를 통해 메시지가 전송이 된다. 여러 스레드가 동시에 send()를 호출할 때, 데이터가 서로 뒤섞이지 않도록 한 번에 한 메시지만 보내도록 동기화(synchronized)하고 있음을 확인할 수 있다. 
또한, Assert.state()를 통해 데이터 보내기 전 이미 닫힌 emitter인가 확인하고, 종료되었다면 IOExecption을 터뜨려 반환하게 된다. 

ResponseBodyEmitter.class
ResponseBodyEmitterReturnValueHandler.class

while을 돌면서 SseEventBuilder로 만든 데이터 조각들을 하나씩 꺼내서 sendInternal로 넘긴다. 그리고 마지막에 flush()를 호출함으로써 데이터를 버퍼에 쌓아두지 말고 즉시 클라이언트에게 쏘게 된다.

ResponseBodyEmitterReturnValueHandler.class

sendInternal에서는 SseEventBuilder 객체를 누가 HTTP 응답으로 바꿀 수 있는지 while문을 통해 찾는다. 여기 핸들러에는 ServerHttpResponse 타입의 outputMessage 객체가 있는데, 이는 스프링이 서블릿의 HttpServletResponse를 감싸서 만든 객체다. 결국 아래처럼 outputMessage.getBody()를 호출하면, response.getOutputStream()을 호출하는 것과 같다. 해당 스트림에 데이터를 쓰면 네트워크를 타고 클라이언트(브라우저)에게 전송되는 것이다. 

HttpMessageConverter

SSE는 데이터를 한 번에 다 보내는게 아니라 계속 스트림을 유지하는 형태로 instanceof StreamingHttpOutputMessage 조건문에 들어갈 확률이 높다. 조건문 안과 밖에서 헤더와 바디가 설정된다. 그리  new StreamingHttpOutputMessage.Body() 를 통해 일종의 "콜백" 예약이 되고, if문이 포함된 메서드 종료 후, outputMessage.flush()가 트리거로 작동하며 writeTo 메서드가 실행된다. 이를 통해 설정한 값들이 writeInternal에 의해 버퍼에 작성된다. 

ServletServerHttpResponse.class

outputMessage.flush()가 실행되면, writeTo를 DelegatingServerHttpResponse에서 기다리다가, 끝나면 delegate.flush로  ServletServerHttpResponse로 전달되어 servletResponse.flushBuffer()가 실행된다. 이로써 최종적으로 버퍼에 쌓아뒀던 데이터들을 Tomcat이 TCP/IP 소켓 스트림으로 쏟아붓게 된다. 
 

4. SSE의 전송 과정

자바는 컴파일을 통해 JVM이 이해하는 자바 바이트 코드로 변환되는데, 자바 바이트 코드는 JVM 프로세스 위에서 동작한다. JVM의 OutputStream은 네이티브 코드인 C/C++로 작성된 write() 시스템 콜을 호출한다. 그리고 커널 모드로 전환되면, 유저 프로세스가 가진 메모리(버퍼)에 담긴 데이터가 커널 메모리 영역에 있는 소켓 송신 버퍼로 복사된다. 데이터가 커널의 소켓 송신 버퍼에 도착하면, TCP 프로토콜 스텍이 MSS 단위로 쪼개어 세그먼트를 만든다. 그리고, 캡슐화를 통해 패킷으로 만든 후, DMA가 커널 메모리에 있는 패킷을 NIC의 메모리로 직접 옮긴다. 이후, 이더넷 프레임으로 감 싼 뒤, 전기 신호로 케이블을 통해 밖으로 쏘아진다. 이렇게 쏘아진 데이터까 클라이언트(브라우저)에 도착하면, OS 커널은 ACK 패킷을 서버로 보내고, 커널 메모리(송신 버퍼)에서 해당 데이터를 비운다. 

 

TCP 소켓을 통해 데이터가 클라이언트에게 전달되는 과정은 일반적인 요청과 별 다를 것이 없는데, 하나 다른 점은 SSE는 전이중 통신 파이프라인을 계속 열어둔 채로, 서버에서 클라이언트로 흐르는 방향(Downstream)을 계속 사용한다.

 

파이프라인을 계속 열어두며 좋은 점은 데이터 하나 보낼 때마다 TCP 3-Way Handshake를 통해 연결을 맺고, 4-Way Handshake로 연결을 끊으며 매번 새로운 소켓을 생성하고 소멸시키는 오버헤드를 줄이고, 매번 SYN/ACK를 주고 받는 지연시간이 사라진다는 점이다. 

 

5. 연결이 끊긴다면? 

위에서 데이터 보내기 전 스프링이 이미 닫힌 emitter인가 확인하고, 종료되었다면 IOExecption을 터뜨리다고 했다. 만일 클라이언트가 연결을 끊어서, 서버측에서 데이터를 보낼 시 IOException이 발생한다면, 애플리케이션은 연결을 정리할 책임이 없고, complete()나 completeWithError()를 직접 호출하면 안된다. 대신, 서블릿 컨테이너(Tomcat 등)가 자동으로 AsyncListener를 통해 에러 알림을 시작하고, 이를 통해 Spring MVC가 내부적으로 completWithError를 호출하고, 마지막으로 ASYNC dispatch를 수행하도록 둬야 한다. 서블릿 API는 클라이언트가 연결을 끊었을 때 알림을 주지 않는데, 그렇기에 데이터를 주기적으로 보내는 것이 중요하다. 그래서 등장한 것이 HeartBeat 로직이며 이를 통해 좀비 연결을 비교적 빠르게 인지하고 처리해 리소스를 확보할 수 있다.
 
ID를 담아서 브라우저에게 SSE를 전달하면, 브라우저는 이를 저장하고 있다가 연결이 끊기면 재연결 후, 직전에 받았던 ID(Last-Event-ID)를 담아서 보내면 서버가 해당 id를 통해 DB나 캐시에서 데이터를 가져와 다시 쏴줄 수 있다. Last-Event-ID를 통해 놓친 메시지에 대한 복구를 하려면, 분산환경에서 fire-and-forget인 Pub/Sub이 아니라 Redis Streams과 같이 데이터를 바로바로 삭제하지 않는 방식을 선택해야 한다.