Kafka 스트림즈 DSL Queryable store

less than 1 minute read

카프카 스트림즈에서 KTable은 카프카 토픽의 데이터를 로컬의 rocksDB에 Meterialized View로 만들어 두고 사용하기 때문에 레코드의 메세지 키, 메세지 값을 기반으로 keyValueStore로 사용할 수 있다.

특정 토픽을 KTable로 사용하고 ReadOnlyKeyValueStore로 뷰를 가져오면 메세지 키를 기반으로 토픽 데이터를 조회할 수 있게 된다.

마치 카프카를 사용하여 로컬 캐시를 구현한것과 유사하다고 볼 수 있다.

  • code

      val builder = StreamsBuilder()
      val addressTable = builder.table(ADDRESS_TABLE, Meterialized.as(ADDRESS_TABLE))
        
      val keyValueStore = streams.store(
      	StoreQueryParameters.fromNameAndType(
      		ADDRESS_TABLE, QueryableStoreTypes.keyValueStore()
      	)
      )
        
      val address: KeyValueIterator = keyValueStore.all()
      address.forEachRemaining { keyValue ->
      	log.info { keyValue }
      }
    
-->

Categories:

Updated:

Comments