본문 바로가기

사이드 프로젝트/Http 요청 순차 처리 솔루션 개발 프로젝트

HTTP 요청 순차 처리 솔루션 개발 프로젝트 - 2

안녕하세요.

이번 글에서는 프로그램 동작 방식, 그리고 사용 방법에 대해서 소개하도록 하겠습니다.

1. 프로그램 동작 방식

각 인스턴스와 구성요소에 대한 설명은 아래와 같습니다.

  • ServiceQueuingManager(A) : 클라이언트 요청의 순차 처리를 위한 매니저 인스턴스
    • DeferredServiceQueuingEventHolder(A-1) : 클라이언트 요청에 비동기 응답을 위한 임시 객체 저장하는 인스턴스
    • ServiceQueuingEventProducer(A-2) : 클라이언트 요청을 이벤트 스트림에 발행하는 것을 담당하는 인스턴스
    • ServiceQueuingEventResultListener(A-3) : 클라이언트가 요청한 서비스의 결과 응답을 기다리는 인스턴스
  • ServiceQueuingEventSubscriber(B) : 이벤트 스트림에 저장된 클라이언트 요청을 구독하는 인스턴스
    • ApplicationExecutor(B-1) : 클라이언트 요청을 실제로 처리하여 응답을 반환하는 인스턴스
    • ServiceQueuingResultPublisher(B-2) : 메소드 응답을 전달하는 인스턴스

 

각 인스턴스가 동작하는 흐름은 아래와 같습니다.

Service Flow

  • 1. 클라이언트가 Http 요청을 전송합니다.
  • 2. ServiceQueuingManager에서 이를 처리합니다
    • 2.1 비동기로 응답하기 위해 요청에 대한 응답인 DeferredResult를 임시로 저장합니다.
    • 2.2 실행할 서비스를 Event Stream(Kafka)에 이벤트로 발행합니다.
    • 2.3 실행할 서비스의 결과를 받아오기 위해 Message Queue(Redis)를 구독합니다.
  • 3. Event Stream(Kafka)에 발행된 이벤트를 구독합니다.
  • 4. 클라이언트가 요청한 서비스를 실행, 그리고 결과를 생성하여 Message Queue(Redis) 메세지 발행자에게 전달합니다.
  • 5. Message Queue(Redis)에 서비스 실행 결과를 메세지로 발행합니다.
  • 6. A-3은 Message Queue(Redis)로부터 실행 결과 메세지를 구독하여 DeferredResult에 담아 클라이언트에 반환합니다.

 

2. 프로그램 사용 방법

 

프로그램을 사용하기 위해 아래와 같이 스프링 빈을 등록해야 합니다.

@Configuration
@RequiredArgsConstructor
public class ServiceQueuingManagerConfig {
    private final ServiceQueuingEventProducer queuedEventPublisher;
    private final ServiceQueuingEventResultListener queuedEventResultListener;
    private final ConsumerFactory<String, ServiceQueuingDetails> queuedEventConsumerFactory;
    private final ApplicationContext applicationContext;
    private final RedisTemplate<String, Object> genericRedisTemplate;
    private final Environment env;

    @Bean
    public ServiceQueuingManager<YourResponseDto> defaultServiceQueuingManager() {
        DefaultServiceQueuingManagerFactory<YourResponseDto> factory = new DefaultServiceQueuingManagerFactory<>(
                env.getProperty("spring.kafka.topic.service-queuing.topic-name"),
                deferredResultHolder(),
                queuedEventPublisher,
                kafkaServiceQueuingEventSubscriber(),
                queuedEventResultListener
        );
        return factory.createManager();
    }
    @Bean
    public DeferredServiceQueuingEventHolder<YourResponseDto> deferredResultHolder() {
        return new DeferredServiceQueuingEventHolder<YourResponseDto>();
    }
    @Bean
    public DefaultApplicationServiceExecutor applicationServiceExecutor() {
        return new DefaultApplicationServiceExecutor(applicationContext);
    }
    @Bean
    public RedisServiceQueuingResultPublisher<YourResponseDto> serviceQueuingResultPublisher() {
        return new RedisServiceQueuingResultPublisher<>(genericRedisTemplate);
    }
    @Bean
    public KafkaServiceQueuingEventSubscriber<YourResponseDto> kafkaServiceQueuingEventSubscriber() {
        return new KafkaServiceQueuingEventSubscriber<>(
                serviceQueuingResultPublisher(),
                applicationServiceExecutor(),
                queuedEventConsumerFactory
        );
    }

    @Bean
    public KafkaMessageListenerContainer<String, ServiceQueuingDetails> defaultQueuedEventListener() {
        String topic = env.getProperty("spring.kafka.topic.service-queuing.topic-name");
        String groupId = env.getProperty("spring.kafka.topic.service-queuing.group-id");
        KafkaMessageListenerContainer<String, ServiceQueuingDetails> listener = kafkaServiceQueuingEventSubscriber().createListener(topic, groupId);
        listener.start();
        return listener;
    }
}

 

프로그램 동작 방식에서 살펴볼 수 있듯, ServiceQueuingManage를 스프링 빈으로 등록해야 합니다.

ServiceQueuingManager는 deferredResultHolder, queuedEventPublisher,  ServiceQueuingEventSubscriber, queuedEventResultListener를 반드시 필드로 갖고 있어야 합니다. 해당 필드들은 이미 구현된 것을 사용해도 되고, 또는 새롭게 구현하여 사용해도 상관 없습니다.

 

ServiceQueuingEventSubscriber 또한 applicationExecutor와 serviceQueuingResultPublisher를 필드로 가져야 하며, 디폴트로 제공되는 것을 사용하거나 직접 구현할 수 있습니다. ApplicationExecutor는 리플렉션을 활용하여 서비스를 실행하도록 구현되어 있습니다.

 

개발자는 순차 처리하고자 하는 서비스의 반환 타입(클래스)을 지정해야 합니다. 위 예제에서는 <YourResponseDto>로 지정되어 있습니다. 제네릭을 활용하여 여러 서비스들에도 보편적으로 활용될 수 있도록 구현하였습니다.

 

위 프로그램을 아래와 같이 사용할 수 있습니다.

@Service("queuedEventTestService")
@RequiredArgsConstructor
@Slf4j
public class QueuedEventTestService {
    private final ServiceQueuingManager<ResponseDTO> serviceQueuingManager;
    private final DeferredServiceQueuingEventHolder<ResponseDTO> deferredServiceQueuingEventHolder;

    public DeferredResult<ResponseDTO> doTest(String clientUniqueKey, String s) {
        DeferredResult<ResponseDTO> execute = serviceQueuingManager.execute(response -> {
            Long requestId = response.getRequestId();
            ResponseDTO responseDTO = response.getResponse();
            DeferredResult<ResponseDTO> deferredResult = deferredServiceQueuingEventHolder.get(requestId)
                    .get();
            deferredResult.setResult(responseDTO);
        }, clientUniqueKey, "queuedEventTestService", "doService", s);
        return execute;
    }

    public ResponseDTO doService(String s) {
        return new ResponseDTO("success", "[doService Method] " + s);
    }

}

 

doTest 메소드를 해석하면,

스프링 빈으로 등록된 "queuedEventTestService" Bean의 "doService" 메소드를 "파라미터 s"와 함께 실행하라는 것을 의미하고, 이때 doService 메소드는 "clientUniqueKey"에 따라 Kafka Event Stream의 특정 파티션에 발행되어 순차적으로 처리됩니다.

 

실행이 완료되면,

첫번째 파라미터인 람다 함수가 실행되어 deferredResult에 결과 값을 바인딩 한 후, 클라이언트에 결과를 반환합니다.

 

 

다음 글에서는 해당 기능을 활용하여 테스트를 해본 결과에 대해서 소개하도록 하겠습니다.

감사합니다.