Kafka 카프카 스트림즈 애플리케이션 개발

less than 1 minute read

Gradle

```groovy
dependencies {
	compile 'org.apache.kafka:kafka-streams:2.5.0'
}
```

필터링 스트림즈 애플리케이션

토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우만 필터링 하는 스트림즈 애플리케이션을 스트림 프로세서를 사용하여 만들 수 있다.

메세지 키 또는 메세키 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때는 filter() 메서드를 사용하면 된다. filter() 메서드는 스트림즈 DSL 에서 사용 가능한 필터링 스트림 프로세서이다.

Untitled

  • code

      import org.apache.kafka.streams.StreamsBuilder
      import org.apache.kafka.streams.kstream.KStream
        
      class StreamFilter {
          private val APPLICATION_NAME = "streams-filter-application"
          private val BOOTSTRAP_SERVERS = "my-kafka:9092"
          private val STREAM_LOG = "stream_log"
          private val STREAM_LOG_FILTER = "stream_log_filter"
        
          fun streaming() {
              val builder = StreamsBuilder()
              val streamLog: KStream<String, String> = builder.stream(STREAM_LOG)
              streamLog
                  .filter { _, value ->
                      value.length > 5
                  }
                  .to(STREAM_LOG_FILTER)
          }
      }
        
      fun main() {
          StreamFilter().streaming()
      }
    
  • shell

    • 토픽 생성

        $ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic stream_log --create
        $ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic stream_log_filter --create
      
    • produce [stream_log]

        $ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic stream_log
        > 0
        > 01
        > 012
        > 0123
        > 01234
        > 012345 // stream filter 통과
      
    • consume [stream_log_filter]

        $ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic stream_log_filter
        012345 // 메세지 컨슘
      
-->

Categories:

Updated:

Comments