컴퓨터/아파치 카프카

4. 카프카 스트림즈와 카프카 커넥트

sidedoor 2024. 9. 25. 00:03

카프카 스트림즈는 토픽에 적제된 데이터를 상태기반 또는 비상태기반으로 실시간 변환하여 다른 토픽에 적재하는 라이브러리이다.

 

 - 상태기반: 클라이언트와 서버 간의 상호작용이 계속될 때, 시스템이 이전 상호작용의 상태를 기억하여 이전의 데이터를 메모리에 저장하고, 이를 기반으로 처리한다.

시스템이 상태를 기억해야 하기 때문에 복잡도가 증가할 수 있으며, 상태 관리가 필요합니다. 서버는 클라이언트의 상태를 유지하고, 이를 바탕으로 연속적인 요청을 처리한다.

이의 장점은 세션이나 사용자 관련 데이터를 계속 유지하기 때문에, 여러 단계의 상호작용이 필요한 경우 유용하다.

하지만 상태를 저장하고 관리하기 때문에 메모리 사용량이 많아질 수 있으며, 확장성이 떨어질 수 있다.

 

 - 비상태기반: 클라이언트의 요청을 받을 때마다 해당 요청에 대해 독립적으로 처리하는데, 이전 요청의 상태나 데이터를 기억하지 않고 각 요청을 독립적으로 처리한다.

따라서 데이터의 유실과 중복이 없지만, 연속적인 상호작용이 필요한 서비스에서는 클라이언트가 매번 상태 정보를 다시 제공해야한다는 단점이 있다.

 

스트림 데이터 처리를 위해서 다른 애프리케이션도 많지만 카프카 스트림을 사용해야만 하는 이유는?

카프카 클러스터와 완벽하게 호환이 되면서 처리에 필요한 편리한 기능을 제공해준다.

 

JVM위에서 하나의 프로세스로 진행이 되어 기존의 빅데이터 처리에 필요했던 분산 시스템이나 스케줄링 프로그램들은 스트림즈를 운영하는데 필요가 없다.

 

스트림즈 내부적으로 스레드를 1개 이상 생성할 수 있고 스레드는 1개 이상의 태스크를 가진다.

여기서 테스크는 스트림즈 애플리케이션을 실행하면 생기는 데이터 처리의최소 단위이다.

만약 3개의 파티션을 처리를 하면 내부에 3개의 태스크가 생긴다.

이는 컨슈머 스레드를 늘리는 방법과 동일하게 병렬처리를 위해 스트림즈 스레드의 개수를 늘리면서 처리량을 늘릴 수 있다.

이때 태스크의 수는 변하지 않는다.

스트림즈 애플리케이션 구조

 

토폴로지

토폴로지는 2개 이상의 노드와 선으로 이루어진 집합을 뜻한다.

토폴로지 종류

카프카 스트림즈에서 토폴로지를 이루는 노드를 하나의 프로세서라고 부르고 노드와 노드를 이은 선을 스트림이라고 부른다.

스트림은 토픽의 데이터를 뜻하는데 이전에 프로듀서와 컨슈머에서 활용한 레코드와 동일하다.

프로세스에는 소스 프로세서, 스트림 프로세서, 싱크 프로세서가 존재한다.

 

소스 프로세서

데이터 처리를 위해 최초로 선언하는 노드로 하나 이상의 토픽에서 데이터를 가져온다.

스트림 프로세서는 다른 프로세서가 반환한 데이터를 가져오는 역할로 변환이나 분기처리같은 데이터 처리를 한다.

싱크 프로세서는 데이터를 특정 카프카 토픽으로 저장하는 역할을 하여 처리된 데이터의 최종 종착지이다.

프로세서 진행 과정

DSL은 특정 비즈니스 도메인의 문제를 해결하려고 만든, 작은, 범용이 아닌 특정 도메인을 대상으로 만들어진 특수 프로그래밍 언어다.

스트림즈 DSL은 스트림즈 프로세싱에 쓰일만한 기능을 API로 만들어두어 변호나 로직을 어별지 않게 개발 할 수 있다.

여기서 부족한 부분들은 프로세서 API를 사용해서 구현하면 된다.

 

스트림즈DSL

KStream

레코드의 흐름을 표현한 것으로 메시지 키와 메시지 값으로 구성되어있다.

데이터를 조회 했을 때 토픽에 존재하는 모든 레코드가 출력이 된다.

이는 컨슈머로 토픽을 구독하는 것과 동일한 것으로 볼 수 있다. 

KStream 예시

KTable

메시지 키를 기준으로 묶어서 사용한다.

유니크한 메시지 키를 기준으로 가장 최신 레코드를 사용한다.

여기서 선언된 토픽은 1개의 파티션이 1개의 태스크에 할당되어 사용된다.

KTable 예시

GlobalKTable

메시지 키를 기준으로 묶어서 사용한다.

선언된 토픽은 모든 파티션 데이터가 각 태스크에 할당되어 사용된다.

 

코파티셔닝

조인을 하는 2개의 데이터의 파티션 수가 동일하고 파티셔닝 전략을 동일하게 맞춰야 한다.

코파티셔닝 예시

이러한 경우만 조인을 수행할 수 있다.

하지만 이런 경우는 흔한 것이 아니므로 일반적으로 코파티셔닝 되지 않아 ToplogyException이 발생한다.

코파티셔닝이 안된 토픽들

따라서 리파티셔닝을 통해서 새로운 토픽에 새로운 메시지 키를 가지도록 재배열을 하면 되지만 이러한 과정은 기존의 데이터를 토픽에 중복해서 생성하고 추가적인 프로세싱과정도 들어가게 된다.

따라서 이러한 경우 GlobalKTable로 선언하여 KTable을 사용하면 KStream과 데이터 조인을 할 수 있게 된다.

다만 각 태스크마다 정의된 모든 데이터를 저장하고 사용하기 때문에 로컬 스토리지 사용량이 증가하고 네트워크 브로커에 부하가 생길 수 있어 용량이 작은 경우만 사용하는 것이 좋다.

GlobalKTable

 

프로세서 API

스트림즈 DSL로 할 수 없는 추가적인 상세 로직의 구현이 필요하다면 프로세서 API를 활용해야한다.

 

카프카 커넥트

데이터 파이프라인 생성시 반복 작업을 줄이고 효율적인 전송을 이루기 위한 애플리케이션이다.

파이프라인을 생성할 때 매번 프로듀서, 컨슈머를 개발하는 것은 비효율적이다.

따라서 커넥트를 통해서 특정한 작업 형태를 탬플릿으로 만들어놓은 커넥터를 실행하여 반복작업을 줄일 수 있다.

이때 토픽의 이름이나 파일의 이름과 같이 파이프라인 생성시 자주 반복되는 값들을 커넥터 코드로 작성하여 고유한 설정값을 받아 데이터를 처리한다.

커넥터에는 프로듀서 역할을 하는 소스 커넥터와 컨슈머 역할을 하는 싱크 커넥터가 있다.

소스 커넥터는 토픽의 데이터를 파일로 저장하고 싱크 커넥터는 카프카에서 데이터를 가져올 수 있다.

소스 커넥터와 싱크 커넥터가 카프카 커넥트에서 실행

만약 추가적인 커넥터를 사용하고 싶으면 플러그인 형태로 파일을 추가하여 사용할 수 있다.

여기에는 커넥터를 구현하는 클래스를 빌드한 파일이 포함되어 있어 직접 이를 만들어 사용할 수 있다.

그리고 오픈소스 커넥터를 쓰면 직접 만들 필요 없이 다운로드 해서 사용할 수 있는데 이미 커넥터들이 많이 공개되어있어 필요한 부분을 검색하여 아키텍처에 맞게 다운받아 사용하면 된다.

추가적으로 필요한 커넥터

 

사용자가 커넥트를 생성하면 내부에 커넥터와 태스크가 생성된다.

여기서 커넥터는 태스크들을 관리하고, 태스크는 실질적인 데이터 처리를 한다.

 

커넥트를 실행하는 방법

단일 모드 커넥트

단일 애플리케이션으로 실행하는 것으로 커넥터를 정의하고 파일을 작성하고 해당 파일을 참조하는 과정을 한다.

이는 1개의 프로세스만 실행되는 점이 특징인데 이러한 이유로 고가용성이 구성되지 않아 단일 장애점이 될 수 있다.

따라서 중요도가 낮은 파이프라인을 운영할 때 사용한다.

단일 모드 커넥트

분산모드 커넥트

2대 이상의 서버에서 클러스터 형태로 운영하여 안전하게 운영할 수 있다.

다른 커넥트가 문제가 생겨도 다른 커넥트가 파이프라인을 지속적으로 처리할 수 있기 때문이다.

이는 데이터 처리량의 변화에도 유연하게 대응할 수 있는데 실행되는 서버 개수를 늘림으로 무중단 스케일 아웃하여 처리량을 늘릴 수 있다.

분산 모드 커넥트

 

소스 커넥터

소스 애플리케이션이나 소스 파일로부터 데이터를 가져와 토픽으로 넣는 역할을 한다.

오픈소스 소스 커넥터에서 라이선스 문제나 원하는 로직이 없어 직접 개발해야하는 경우 카프카 라이브러리의 SourceConnector와 SourceTask클래스를 사용해 직접 구현할 수 있다.

소스 커넥트

SourceConnector는 태스크를 실행하기 전 커넥터 설정 파일을 초기화 하고 어떤 태스크 클래스를 사용할지 정의하는데 사용한다.

SourceTask는 실제로 데이터를 다루는 클래스로 소스 애플리케이션으로부터 데이터를 가져와서 토픽으로 데이터를 보내는 역할을 한다.

 

싱크 커넥터

토픽의 데이터를 타킷 애플리케이션 또는 타킷 파일로 저장하는 역할을 한다.

이는 SinkConnector와 SinkTask클래스를 사용하여 직접 싱크 커넥터를 구현할 수 있다.

싱크 커넥트

SinkConnector는 태스크를 실행하기 전에 사용자로부터 입력받은 설정값을 초기화하고 어떤 태스크 클래스를 사용할 지 정의하는데 사용한다.

그리고 SinkTask는 실제 데이터를 처리하는 로직으로 커넥트에서 컨슈머 역할을 하고 데이터를 저장하는 코드를 가지게 된다.

 

카프카 미러메이커2

카프카 미러메이커2는 서로 다른 두 개의 카프카 클러스터 간의 토픽을 복제하는 애플리케이션이다.

프로듀서와 컨슈머를 사용해서 직접 미러링하는 애플리케이션을 만들어도 되지만 토픽의 모든 것을 복제할 필요성이 있기 때문에 이를 사용한다.

프로듀서와 컨슈머를 사용해서 2개의 서로 다른 클러스터에 토픽의 고유한 메시지키, 메시지 값, 파티션과 같은 데이터를 완전히 옮기는 것은 힘들고, 동일한 파티션에 동일한 레코드가 들어가게 하는 작업은 클러스터에서 사용하던 파티셔너에 대한 정보가 필요하다.

또한 복제하는 토픽의 파티션 개수가 달라지면 저장하는 토픽의 파티션 수도 달라져 어드민까지 조합한 형태로 개발을 해야한다.

따라서 미러 메이커2를 사용해서 이러한 기능을 할 수 있다.

참고로 미러 메이커1은 카프카의 최초의 토픽 데이터 복제 기능인데 이는 토픽 복제에 충분한 기능을 가지지 않아서 토픽의 데이터를 복제할 때 기본 파티셔너를 사용해 복제전 데이터와 복제 후 데이터의 파티션 정보가 달랐다.

그리고 복제하는 토픽이 달라지면 수정하기 위해서 미러 메이커를 재시작해야하는 문제가 있고, 전달의 정확도를 보장하지 못해 데이터의 유실 또는 중복이 발생할 가능성이 있었기 때문에 이러한 기능을 개선하여 미러 메이터2가 나왔다.

 

미러메이커2를 활용한 지리적 복제

액티브-스탠바이 클러스터 운영

재해복구를 위해 임시 카프카 클러스터를 하나 더 구성하는 경우 액티브-스탠바이 클러스터로 운영할 수 있다.

이때 애플리케이션이 직접 통신하는 클러스터를 액티브 클러스터라고 하고 미러메이커2를 통해서 액티브 클러스터의 모든 토픽을 스탠바이 클러스터에 복제하여 액티브 클러스터의 장애에 대응하는 것이 스탠바이 클러스터이다.

다만 액티브 클러스터를 미러메이커2로 스탠바이 클러스터로 복제할 때 복제가 지연되는 현상인 복제 랙이 발생할 수 있다.

따라서 이에 따라 데이터가 중복 또는 유실 처리가 될 수 있어 대응 방안을 사전에 정하고 운영하는 것이 중요하다.

 

액티브-액티브 클러스터 운영

애플리케이션의 통신 지연을 최소화하기 위해 2개 이상의 클러스터를 두고 서로 데이터를 미러링 하면서 사용하는 경우이다.

물리적으로 아주 멀리 떨어진 곳에서 유저의 데이터를 저장하고 사용하는 방법은 각 지역마다 클러스터를 두고 필요한 데이터만 복제하여 사용하면 된다.

이 경우 각 지역의 클러스터에서 데이터를 가져옴으로 데이터의 지연을 줄일 수 있다.

허브 앤 스포크 클러스터 운영

각 팀에서 소규모 카프카 클러스터를 사용하고 있을 때 카프카 클러스터의 데이터를 한 개의 카프카 클러스터에 모아 데이터 레이크로 사용하는 경우 사용한다.

허브는 중앙의 한 개의 점을 뜻하고 스포크는 중앙의 점과 다른 점들을 연결한 선을 뜻한다.

데이터 레이크 특성상 서비스에서 생성된 데이터를 수집, 가공, 분석하는 격리된 플랫폼이 필요하다.

따라서 미러메이커2를 통해 각 팀에서 사용하는 카프카클러스테에 존재하는 데이터를 수집하고 데이터 레이크용 카프카 클러스테어서 가공, 분석한다.