컬리 검색이 카프카를 들여다본 이야기 2

카프카 스트림즈를 추가하다

1. 들어가며

안녕하세요, 컬리에서 검색/추천 서비스를 개발하고 있는 서민우입니다. 최근 컬리에는 대규모 시스템 개편이 있었고, 이에 따라 저희는 검색 인덱싱 파이프라인 구조를 변경해야 했습니다. 이 기회에 카프카를 조금 더 깊게 들여다볼 기회가 있었는데요. 컬리 기술 블로그를 통해 그 경험을 공유드리고자 합니다.

이번 포스팅은 두번째 포스팅으로, 같은 키를 공유하는 서로 다른 토픽의 메시지를 조합해 하나의 메시지를 완성하여 유통하는 카프카 스트림즈를 적용해본 기록입니다.

첫번째 포스팅부터 읽기

2. 검색 시스템 변경 요구사항

1편에서 자세히 언급한 바를 요약하면, 저희는 2가지의 정보를 구독하여 검색엔진 외부에서 하나의 메시지로 조합하여 인덱싱을 해야 했습니다. 이런 요구사항을 해결하기 위한 1차 구현은 다음과 같이 레디스를 일종의 중간 저장소로 활용하는 방법이었습니다.

  • 각각의 토픽에서 수신한 메시지를 정제해 레디스에 적재한다
  • 하나의 상품을 구성하는 데이터가 모두 적재되면 레디스에서 읽어들여
  • 데이터를 하나로 합쳐 검색엔진에 색인을 요청한다

한편 저희 팀은 두가지 데이터를 조합하기 위해 좀더 효과적이고 단순한 방법은 없을지 고민했고, 그 결과 카프카 스트림즈 라이브러리를 발견할 수 있었습니다.

3. 스트림즈를 공부해보자

다음은 아파치에서 제공하는 카프카 스트림즈 문서의 일부입니다.

카프카 스트림즈는 카프카에 저장된 데이터를 처리하고 분석하는 클라이언트 라이브러리로 진입장벽이 굉장히 낮다는 장점을 가지고 있습니다.

출처: Apache Kafka Streams

아래는 Stream DSL을 이용해 순수 자바로 구현한 스트림즈 파이프라인 로직입니다.

public class StreamsProcessor {
  
  private static final String sourceTopicA = "MSG-SOURCE-A";
  private static final String sourceTopicB = "MSG-SOURCE-B";
  private static final String mergedTopic = "MSG-MERGED";

  public static void buildPipeline() {

    StreamsBuilder builder = new StreamsBuilder();

    KTable<String, SourceAMessage> sourceATable
        = builder.table(sourceTopicA, Consumed.with(
        STRING_SERDE,
        CustomSerdes.sourceAMessageSerde()
    ));

    KStream<String, SourceBMessage> sourceBStream
        = builder.stream(sourceTopicB, Consumed.with(
        STRING_SERDE,
        CustomSerdes.sourceBMessageSerde()
    ));

    sourceBList.join(sourceATable, joiner)
        .filter((key, value) -> Objects.nonNull(value))
        .to(mergedTopic);

    KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProperties);
    streams.start();
  }
  public static void main(String[] args) {
    buildPipeline();
  }
}

각 토픽마다 카프카 컨슈머와 카프카 프로듀서를 만들어서 일일이 동작시키는 것이 아니라 자바 코드 만으로 같은 키를 가진 메시지끼리 병합하여 메시지를 발행하는 것까지 손쉽게 할 수 있습니다. 다음으로 우리는 이 데이터 처리 파이프라인 로직을 스프링 프레임워크에서 동작할 수 있도록 수정해야합니다.

4. 스트림즈를 스프링 어플리케이션으로 구성하자

먼저 도입했던 방법은 스프링 프레임워크에서 제공하는 이벤트 기능을 사용하는 것이었습니다. Application Context 가 만들어진 이후에 동작하면 되기 때문에 스프링에서 기본적으로 제공하는 ApplicationStaredEventEventListener 로 받아서 동작시켰습니다.

@Component
@RequiredArgsConstructor
public class IndexStreamProcessor implements ApplicationListener<ApplicationStartedEvent> {

  /**
   * 카프카 스트림즈 파이프라인을 구성합니다.
   */
  public void buildPipeline() {

    StreamsBuilder builder = new StreamsBuilder();

    ...
    
    KafkaStreams streams = new KafkaStreams(builder.build(), kafkaProperties);
    streams.start();

  }

  @Override
  public void onApplicationEvent(ApplicationStartedEvent event) {
    buildPipeline();
  }
}

이렇게 스프링 프레임워크 위에 얹은 카프카 스트림즈 어플리케이션은 메시지를 수신할 때마다 데이터를 처리하고 병합하는 과정을 신속하게 하는 본연의 임무와 더불어, 스프링 프레임워크가 지닌 기능까지 활용할 수 있는 훌륭한 어플리케이션이 되었습니다.

5. 소리없이 죽어버린 스트림즈

하지만 기쁨도 잠시, 스트림즈 스레드가 다음과 같이 로그를 남기고 죽어버렸습니다.

... stream-client [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d] All stream threads have died. The instance will be in error state and should be closed.
... stream-thread [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d-StreamThread-2] Shutdown complete
... Unregistering application SEARCH-KAFKA-STREAMS with eureka with status DOWN
... stream-client [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d] State transition from ERROR to PENDING_SHUTDOWN
... stream-thread [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d-StreamThread-1] Informed to shut down
... stream-thread [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d-StreamThread-2] Informed to shut down
... stream-thread [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d-StreamThread-3] Informed to shut down
... stream-client [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d] State transition from PENDING_SHUTDOWN to NOT_RUNNING
... stream-client [kafka-streams-app-58be416e-9c5d-41d4-be8c-f6c046cea86d] Streams client stopped completely

동작을 멈춘 이유는 간단했습니다. 어플리케이션을 구동한 테스트 환경에서 발행된 일부 메시지가 약속된 규격에 어긋나있었고, 따라서 처리과정에서 오류를 일으켰기 때문입니다.

규격에 맞지 않는 메시지에 대한 핸들링을 해야 하는 것과는 별개로, 우리가 다뤄야 할 문제가 있었는데요. 이처럼 더이상 메시지가 처리되지 않는 상황임에도 불구하고 스프링 어플리케이션 자체의 상태는 정상으로 나타난다는 것이었습니다.

시스템을 운영하는 입장에서, 카프카 스트림즈의 헬스체크를 지속적으로 수행해 스레드가 죽으면 어플리케이션에 알리는 추가 조치가 필요했습니다.

6. 스프링 카프카를 통해 가시성을 확보하자

찾아본 결과, 다행히도 스프링 카프카에서 카프카 스트림즈를 지원한다는 것을 알 수 있었습니다. 그렇다면 의존 관계를 통해 스프링이 스트림즈 객체를 불러올 수 있고 상태에 대한 모니터링도 주기적으로 할 수 있을 거라 판단했습니다.

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(KafkaStreamsDefaultConfiguration.class)
public @interface EnableKafkaStreams {
}

출처: docs.spring.io

구현하기

다음과 같이 @EnableKafkaStreams 어노테이션을 활용해 Configuration 과 데이터 처리 파이프라인의 구조를 수정했습니다.

@Configuration
@EnableKafkaStreams
public class KafkaStreamsConfig {

  @Value("${spring.kafka.consumer.bootstrap-servers}")
  private String bootstrapAddress;

  @Value("${spring.kafka.streams.application-id:kafka-streams-app}")
  private String applicationName;

  @Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
  public KafkaStreamsConfiguration kafkaStreamsConfig() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationName);
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, String().getClass().getName());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, String().getClass().getName());
    props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, "3");
    return new KafkaStreamsConfiguration(props);
  }
}
@Component
@RequiredArgsConstructor
public class IndexStreamProcessor {

  /**
   * 카프카 스트림즈 파이프라인을 구성합니다.
   */
  @Autowired
  public void buildPipeline(StreamsBuilder builder) {

    KTable<String, SourceAMessage> sourceATable
        = builder.table(sourceTopicA, Consumed.with(
        STRING_SERDE,
        CustomSerdes.sourceAMessageSerde()
    ));

    KStream<String, SourceBMessage> sourceBStream
        = builder.stream(sourceTopicB, Consumed.with(
        STRING_SERDE,
        CustomSerdes.sourceBMessageSerde()
    ));

    sourceBList.join(sourceATable, joiner)
        .filter((key, value) -> Objects.nonNull(value))
        .to(mergedTopic);
  }
}

처음에 구현한 내용과의 큰 차이는 보시다시피 직접 KafkaStreams를 생성하거나 시작하지 않고 StreamsBuilder@Autowired 로 주입했다는 점입니다.

즉, 스프링이 직접 스트림즈 클라이언트를 생성하여 생명 주기를 관리하도록 맡길 수 있게된 것입니다. 그 결과 개발자는 비즈니스 로직에만 온전히 집중할 수 있게 됩니다.

헬스체크

카프카 스트림즈의 상태 주기를 통해 정상적으로 동작하고 있는 상태는 Created, Running, Re-Balacing 로, 문제가 있는 상태는 Pending Shutdown, Not Running, Error 로 구분할 수 있습니다. 한 가지 문제는, 스트림즈 클라이언트가 Error 상태로 빠지면 재시작을 하는 것 이외에 클라이언트를 살릴 수 있는 방법이 마땅히 없다는 점입니다.

그래서 저희는 실시간 헬스체크를 통해 스트림즈 클라이언트의 상태를 확인하고 문제가 생겼을 때 빠르게 대처할 수 있는 환경을 만들었습니다. 카프카 스트림즈 클라이언트가 스프링에서 관리하도록 수정을 했기에 가능한 일이었습니다.

@Component
@RequiredArgsConstructor
public class KafkaStreamsHealthIndicator implements HealthIndicator {

  private final StreamsBuilderFactoryBean streamBuilder;

  @Override
  public Health health() {
    KafkaStreams kafkaStreams = streamBuilder.getKafkaStreams();
    State kafkaStreamsState = kafkaStreams.state();

    // CREATED, RUNNING, RE-BALANCING
    if (kafkaStreamsState == State.CREATED || kafkaStreamsState.isRunningOrRebalancing()) {
      return Health.up().build();
    }

    // ERROR, NOT_RUNNING, PENDING_SHUTDOWN
    return Health.down().withDetail("state", kafkaStreamsState.name()).build();
  }
}

7. 마무리

이번 편에 소개한 저희의 경험을 정리하면 다음과 같습니다.

  • 카프카 스트림즈를 이용하여 필요한 정보의 병합을 쉽고 효과적으로 할 수 있다.
  • 카프카 스트림즈 어플리케이션의 관리는 스프링에 일임할 수 있다.
  • 카프카 스트림즈의 상태 또한 스프링을 통해 접근 및 모니터링 할 수 있다.

저희가 시도해본 것 보다 더 좋은 설계는 얼마든지 존재할 것입니다. 하지만 이번 기회를 통해 요구사항에 대응하기 위한 구조를 팀 차원에서 고민하고, 서로 방법을 제시해가며 다양한 시도를 해본 경험이 더 나은 검색시스템 개발을 향한 밑거름이 될 거라 생각합니다.

이상으로 2부에 걸친 컬리 검색의 카프카 이야기를 마치겠습니다.

궁금한 내용은 댓글로 남겨주세요. 읽어주셔서 감사합니다!


컬리에서 함께 검색/추천 서비스를 개발해보고 싶으시다면: 커머스플랫폼 검색/추천 개발자 채용