Kafka 스트림즈 DSL Window Processing

2 minute read

스트림즈 DSL - window processing

스트림 데이터를 분석할 때 가장 많이 활용하는 프로세싱 중 하나는 윈도우 연산이다. 윈도우 연산은 특정 시간에 대응하여 취합 연산을 처리할 때 활용한다.

카프카 스트림즈에서 제공하는 윈도우 프로세싱 4가지를 지원한다. 모든 프로세싱은 메세지 키를 기준으로 취합된다. 그러므로 해탕 토픽에 동일한 파티션에는 동일만 메세지 키가 있는 레코드가 존재해야지만 정확한 취합이 가능하다.

만약 커스텀 파티셔너를 사용하여 동일 메세지 키가 동일한 파티션에 저장되는 것을 보장하지 못하거나 메세지 키를 넣지 않으면 관련 연산이 불가능하다.

카프카 스트림즈에서 제공하는 윈도우 연산 종류는 다음과 같다.


텀블링 윈도우

Untitled

  • 텀블링 윈도우는 서로 겹치지 않은 윈도우를 특정 간격으로 지속적으로 처리할 때 사용한다.
  • 윈도우 최대 사이즈에 도달하면 해당 시점에 데이터를 취합하여 결과를 도출한다.
  • 텀블링 윈도우는 단위 시간당 데이터가 필요한 경우 사용할 수 있다.
    • 예를 들어 매 5분간 접속한 고객의 수를 측정하여 방문자 추이를 실시간 취합하는 경우 텀플링 윈도우를 사용할 수 있다.

호핑 윈도우

Untitled

  • 호핑 윈도우는 일정 시간 간격으로 겹치는 윈도우가 존재하는 윈도우 연산을 처리할 경우 사용한다.
  • 호핑 윈도우는 윈도우 사이즈와 윈도우 간격 2가지 변수를 가진다.
  • 윈도우 사이즈는 연산을 수행할 최대 윈도우 사이즈를 뜻하고 윈도우 간격은 서로 다른 윈도우 간 간격을 뜻한다.
  • 텀플링 윈도와 다르게 동일한 키의 데이터를 서로 다른 윈도우에서 여러번 연산될 수 있다.

슬라이딩 윈도우

Untitled

  • 슬라이딩 윈도우는 호핑 윈도우와 유사하지만 데이터의 정확한 시간을 바탕으로 윈도우 사이즈에 포함되는 데이터를 모두 연산에 포함시키는 특징이 있다.
  • 각각의 레코드에 포함된 타임스탬프의 시간을 토대로 한다.

세션 윈도우

Untitled

  • 세션 윈도우는 동일 메세지 키의 데이터를 한 세션에 묶어 연산할 때 사용한다.
  • 세션의 최대 만료시간에 따라 윈도우 사이즈가 달라진다.
  • 세션 만료 시간이 지나게 되면 세션 윈도우가 종료되고 해당 윈도우의 모든 데이터를 취합하여 연산한다.
  • 그렇기 때문에 세션 윈도우의 윈도우 사이즈는 가변적이다.

텀블링 윈도우 예제 코드

카프카 스트림즈에서 텀블링 윈도우를 사용하기 위해서는 groupByKey와 windowed를 사용해야 한다.

windowedBy의 파라미터는 텀블링 윈도우의 사이즈를 뜻한다.

이후 텀블링 연산으로 출력된 데이터는 KTable로 커밋 internal 마다 출력된다.

val builder = StreamsBuilder()
val stream = builder.stream(TEST_LOG);

val countTable = 
	stream
		.groupByKey()
		.windowedBy(
			TimeWindows.of(
				Duration.ofSeconds(5)
			)
		)
		.count()

countTable
	.toStream()
	.foreach { key, value ->
		log.info {
			"${key.key()} is [${key.window().startTime()}~${key.window().endTime()}] count : $value"
		}
	}

윈도우 연산시 주의해야할 사항

카프카 스트림즈는 커밋(기본값 30초)을 수행할 때 윈도우 사이즈가 종료되지 않아도 중간 정산 데이터를 출력한다.

Untitled

커밋 시점마다 윈도우의 연산 데이터를 출력하기 때문에 동일 윈도우 사이즈(시간)의 데이터는 2개 이상 출력 될 수 있다.

Untitled

  1. 최종적으로 각 윈도우에 맞는 데이터를 출력하고 싶다면 Windowed를 기준으로 동일 윈도우 시간 데이터는 겹쳐쓰기(upsert)하는 방식으로 처리하는 것이 좋다.
  2. 예를 들어 0~5초의 A데이터가 포함도니 윈도우 취합 데이터가 들어오면 해당 데이터를 유니크 키로 설정하고 새로 들어온 데이터를 겹쳐쓰는 것이다.
  3. 위 경우에는 최초에 0~5초 A데이터가 2개 취합된 데이터가 처음 저장되고, 추후에 6초에 출력된 3개 취합된 데이터가 최종 저장된다.
  4. 결과적으로 A가 0~5초에 3개 count된 것을 확인할 수 있게 된다.
-->

Categories:

Updated:

Comments