![[Spring] SSE(Server Sent Events)를 사용하여 실시간 알림 기능 사용하기](https://img1.daumcdn.net/thumb/R750x0/?scode=mtistory2&fname=https%3A%2F%2Fblog.kakaocdn.net%2Fdn%2FkVRSe%2FbtsG6XVZcpO%2FVVK2aPcYr3mxkvjvwxUkyK%2Fimg.jpg)
SSE (Servet Sent Events)란
SSE는 Server Sent Events의 줄임말입니다. 서버에서 클라이언트로 실시간 이벤트를 전달하는 기술입니다.
위 사진을 보면 클라이언트가 서버와 연결을 먼저 진행합니다. 이를 구독(subscribe)라고 합니다. 이후에 서버에서 클라이언트로 단방향 통신을 진행하며, 클라이언트의 요청이 없이도 서버에서 응답을 내릴 수 있습니다.
SSE의 장점은 다음과 같습니다.
- 실시간으로 이벤트를 발생시킬 수 있다.
- 연결이 끊어지면 자동으로 연결을 시도한다.
- HTTP 통신이다.
실시간 알림의 경우 서버에서 클라이언트 방향으로만 데이터를 전송하면 되기 때문에, 양방향으로 통신을 진행할 필요가 없습니다. 따라서 SSE를 사용하여 실시간 알림을 구현해보도록 하겠습니다.
스프링부트에서 SSE 구현하기
Controller
먼저 클라이언트와 서버가 구독 상태이어야 합니다. GET 요청을 통해 처리할 수 있으며, 요청 Mime Type으로 "text/event-stream"으로 전송해야 합니다. text/event-stream은 SSE를 사용하기 위한 미디어 타입입니다.
@RestController
@RequiredArgsConstructor
@RequestMapping("/sse")
public class SseController {
private final SseEmitterService sseEmitterService;
/**
* 클라이언트의 이벤트 구독을 수락한다. text/event-stream은 SSE를 위한 Mime Type이다. 서버 -> 클라이언트로 이벤트를 보낼 수 있게된다.
*/
@GetMapping(value = "/subscribe/{userId}", produces = "text/event-stream")
public SseEmitter subscribe(@PathVariable Long userId) {
return sseEmitterService.subscribe(userId);
}
/**
* 이벤트를 구독 중인 클라이언트에게 데이터를 전송한다.
*/
@PostMapping("/broadcast/{userId}")
public void broadcast(@PathVariable Long userId, @RequestBody EventPayload eventPayload) {
sseEmitterService.broadcast(userId, eventPayload);
}
}
Service
서비스 레이어에서는 구독 메서드와 데이터 전송 메서드를 작성합니다.
구독 메서드 - subscribe()
SseEmitter 객체는 클라이언트의 sse 연결 요청에 응답을 하기 위해 사용됩니다. 객체 생성시에 DEFAULT_TIMEOUT를 통해 sse 연결 시간을 설정할 수 있습니다. 시간이 만료되어 연결이 끊어진다면 연결을 다시 시도합니다.
- onCompletion() : 모든 데이터가 정상적으로 전송되면 호출됩니다.
- onTimeout() : 유효 시간이 만료되면 호출됩니다. 클라이언트의 활동이 감지 되지 않는 경우를 의미합니다.
클라이언트와 서버를 구독하는 시점에 이벤트를 강제로 발생시킵니다. SSE 연결 이후에 클라이언트의 활동이 감지되지 않아 유효시간이 만료되어 503 에러가 발생하는 것을 방지하기 위해 사용됩니다.
이벤트 발생 메서드 - broadcast()
이벤트를 전달할 클라이언트의 정보(userId)와 전송할 데이터를 받습니다. 클라이언트 정보를 통해 SseEmitter 객체를 조회하고, 해당 객체에 데이터를 전송합니다.
@Service
@RequiredArgsConstructor
public class SseEmitterService {
// SSE 이벤트 타임아웃 시간
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
/**
* 클라이언트의 이벤트 구독을 허용하는 메서드
*/
public SseEmitter subscribe(Long userId) {
// sse의 유효 시간이 만료되면, 클라이언트에서 다시 서버로 이벤트 구독을 시도한다.
SseEmitter sseEmitter = emitterRepository.save(userId, new SseEmitter(DEFAULT_TIMEOUT));
// 사용자에게 모든 데이터가 전송되었다면 emitter 삭제
sseEmitter.onCompletion(() -> emitterRepository.deleteById(userId));
// Emitter의 유효 시간이 만료되면 emitter 삭제
// 유효 시간이 만료되었다는 것은 클라이언트와 서버가 연결된 시간동안 아무런 이벤트가 발생하지 않은 것을 의미한다.
sseEmitter.onTimeout(() -> emitterRepository.deleteById(userId));
// 첫 구독시에 이벤트를 발생시킨다.
// sse 연결이 이루어진 후, 하나의 데이터로 전송되지 않는다면 sse의 유효 시간이 만료되고 503 에러가 발생한다.
sendToClient(userId, "subscribe event, userId : " + userId);
return sseEmitter;
}
/**
* 이벤트가 구독되어 있는 클라이언트에게 데이터를 전송
*/
public void broadcast(Long userId, EventPayload eventPayload) {
sendToClient(userId, eventPayload);
}
private void sendToClient(Long userId, Object data) {
SseEmitter sseEmitter = emitterRepository.findById(userId);
try {
sseEmitter.send(
SseEmitter.event()
.id(userId.toString())
.name("sse")
.data(data)
);
} catch (IOException ex) {
emitterRepository.deleteById(userId);
throw new RuntimeException("연결 오류 발생");
}
}
}
Repository
Map 인터페이스를 사용하여 SseEmitter 객체를 관리합니다. ConcurrentHashMap은 멀티스레드 환경에서 동시성을 보장하기 위해 사용합니다.
- findById() : userId를 사용하여 해당 클라이언트의 SseEmitter 객체를 조회한다.
- boardcast를 위해 구독 중인 사용자의 SseEmitter를 조회한다.
- save() : 해당 클라이언트를 위한 SseEmitter를 생성하고 Map에 저장한다.
- deleteById() : 클라이언트의 SseEmitter 객체를 제거한다.
@Repository
public class EmitterRepository {
private final Map<Long, SseEmitter> emitters = new ConcurrentHashMap<>();
public SseEmitter findById(Long userId) {
return emitters.get(userId);
}
public SseEmitter save(Long userId, SseEmitter sseEmitter) {
emitters.put(userId, sseEmitter);
return emitters.get(userId);
}
public void deleteById(Long userId) {
emitters.remove(userId);
}
}
회원가입 시에 실시간 알림 발생시키기
사용자가 회원 가입을 하게 되면 관리자에게 실시간으로 알림이 전송되도록 해보겠습니다.
UserController
관리자의 Id가 1L이라고 가정하겠습니다.
사용자 등록 DTO
@Getter
public class SaveUserRequestDto {
private String username;
}
@RestController
@RequiredArgsConstructor
@RequestMapping("/users")
public class UserController {
private final UserService userService;
@PostMapping
public ResponseEntity<SaveUserResponseDto> saveUser(@RequestBody SaveUserRequestDto request) {
SaveUserResponseDto response = userService.save(1L, request);
return ResponseEntity.status(HttpStatus.CREATED).body(response);
}
}
Service
관리자 Id로 SseEmitter 객체를 조회하고, 해당 객체에 실시간으로 알림을 발생시키게 됩니다.
@Service
@RequiredArgsConstructor
public class UserService {
private final SseEmitterService emitterService;
private final UserRepository userRepository;
public SaveUserResponseDto save(Long userId, SaveUserRequestDto request) {
User user = createUser(request);
userRepository.save(user); // 신규 사용자 저장
User targetUser = getTargetUser(userId); // 관리자 정보
// 관리자에게 신규 사용자 정보를 실시간으로 전송
emitterService.broadcast(targetUser.getId(),
EventPayload.builder()
.userId(user.getId().toString())
.username(user.getUsername())
.build()
);
return SaveUserResponseDto.builder().userId(user.getId()).username(user.getUsername())
.build();
}
private User getTargetUser(Long userId) {
return userRepository.findById(userId)
.orElseThrow(() -> new NoSuchElementException("사용자를 찾을 수 없습니다. userId : " + userId));
}
private static User createUser(SaveUserRequestDto request) {
return User.builder()
.username(request.getUsername())
.build();
}
}
API 호출
관리자 계정과 서버가 구독 상태로 유지됩니다.
URL을 브라우저로 접속하면 아래 화면이 출력됩니다. 화면에 보이는 메시지는 SseEmitter 객체를 생성하는 시점에 503 에러를 방지하기 위해서 강제로 broadcast를 발생시킨 메시지입니다.
신규 사용자가 등록되면 관리자에게만 실시간으로 알림이 전송됨을 확인할 수 있습니다.
아래만 실시간으로 알림이 오지 않습니다. 의도했던대로 관리자에게만 실시간 알림이 전송되도록 구현했기 때문입니다.