Kafka 카프카 스트림즈 애플리케이션 개발
Gradle
```groovy
dependencies {
compile 'org.apache.kafka:kafka-streams:2.5.0'
}
```
필터링 스트림즈 애플리케이션
토픽으로 들어온 문자열 데이터 중 문자열의 길이가 5보다 큰 경우만 필터링 하는 스트림즈 애플리케이션을 스트림 프로세서를 사용하여 만들 수 있다.
메세지 키 또는 메세키 값을 필터링하여 특정 조건에 맞는 데이터를 골라낼 때는 filter() 메서드를 사용하면 된다. filter() 메서드는 스트림즈 DSL 에서 사용 가능한 필터링 스트림 프로세서이다.
-
code
import org.apache.kafka.streams.StreamsBuilder import org.apache.kafka.streams.kstream.KStream class StreamFilter { private val APPLICATION_NAME = "streams-filter-application" private val BOOTSTRAP_SERVERS = "my-kafka:9092" private val STREAM_LOG = "stream_log" private val STREAM_LOG_FILTER = "stream_log_filter" fun streaming() { val builder = StreamsBuilder() val streamLog: KStream<String, String> = builder.stream(STREAM_LOG) streamLog .filter { _, value -> value.length > 5 } .to(STREAM_LOG_FILTER) } } fun main() { StreamFilter().streaming() }
-
shell
-
토픽 생성
$ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic stream_log --create $ bin/kafka-topics.sh --bootstrap-server my-kafka:9092 --topic stream_log_filter --create
-
produce [stream_log]
$ bin/kafka-console-producer.sh --bootstrap-server my-kafka:9092 --topic stream_log > 0 > 01 > 012 > 0123 > 01234 > 012345 // stream filter 통과
-
consume [stream_log_filter]
$ bin/kafka-console-consumer.sh --bootstrap-server my-kafka:9092 --topic stream_log_filter 012345 // 메세지 컨슘
-
Comments