[세미나] 202203 우아한테크세미나-분산 이벤트 스트리밍 정리(feat.카프카 스트림즈)
2022-04-01
이번달 우아한 테크 세미나 주제는 분산 이벤트 스트리밍! 유튜브에 올라온 걸 들으면서(일이 있어서 실시간으로 듣진 못함 🥲) 빠르게 간단 정리해 보았다.
최근에 메세징 큐 방식으로 서비스 처리하는 업무를 했어서 그런지 조금 더 이해하기 수월했다(평소엔 이해하기 어려운 토픽이 많았…)
덕분에 좀 더 깊게 분산 이벤트 스트리밍을 배울 수 있었다. 카프카를 써서 대용량 처리를 해보고 싶기도..ㅎㅎ
도입 이유는 아래에 더 길게 쓸 테지만 짧게 요약하자면
- 데이터처리: 실제 처리량이 많아지다 보니까 시스템화(자동화)해서 해서 만들 필요가 있었음.
- 실제 수치를 알아야 한다. 이 지역에 대한 배달 인프라 지표 산출하는 방법을 어떻게 처리할 지 고민이 필요했음.
1. 주문에 대한 트래킹 처리 방식
1.1. 통계 수치 뽑기 (기본)
-
주문하고 특정 시간이상 인 주문건에 대한 처리
처리 방법: 스케쥴러 → RDS
스케쥴러 돌리는 방법 다양함. (쿼리 돌리기)
- 스프림프레임웍 : batch ? (잘 안들렸음)
- 젠킨스 시간마다 job 돌리기
- unix/linux crontab
-
주문을 배달원에게 배차 : 쿼리를 정기적으로 실행
1.2. 문제: 서버 한대가 모든 로직 수행한다
지표를 1분만에 뽑아야 하는데 수천건에 대한 처리는 어려움. (서비스가 커짐에 따라)
실제 배차 산출하는 기간 15초.. 등등 매우 짧다. 1분은 예시일 뿐.
문제 해결 방법 list:
- 멀티쓰레드 형식으로 처리하거나..
- 서버마다 VM을 따로 띄워서(지역별 파티션 나눠서) 처리
- 지역별 서버 따로 띄우기 ⇒ 관리의 허들이 있음. capa 관리 복잡.
1.3. 문제 해결? (1)
AWS SQS 를 사용해서 지역별 파티션 키가 있다고 가정, 큐로 처리.
예를 들어, 모든 지역의 파티션 키가 1~100 이라고 하면
- Command Sender (지역 파티션 키와 함께 해당 정보를 큐로 보냄) - 명령을 보내는 배치서버. 이벤트를 보냄.
- Queue (파티션 키에 맞는 command Receiver 찾고 해당 리시버에 큐를 보냄) - like AWS SQS.
사실 한대로 처리함. 문제가 생기면 2대로 할 수 있긴 하지만 이건 처리 이슈가 발생할 수 있음. 어디로 분배되는지 트래킹이 어려울 수도?
- Command Receiver (해당 정보를 실제로 처리) - 마치 워커 쓰레드가 분산처리하듯.
Spring boot 에서는 sqs listener 처리가 잘 되어있어서 좋음..ㅎㅎ
문제
-
메세지 중복처리 문제 : SQS가 중복처리 되었는지 안되었는지 개발자가 처리해야 된다.
→ 이따 뒤에서 설명. 파티션은 쏠림현상이 있을 수밖에 없다. 서울..!! 서울/경기: 전체 트래픽의 90%.
SQS FIFO 안씀. ‘굳이 써야할 필요가 있나?’ 싶다고 함. (비싸기도 하고..)
우발적인 상황에 대처하는 어플리케이션을 만드는 게 낫다.
-
Command Server (Batch server) 이중화 불가. 분산처리가 안된다.
서버가 죽을 수 있기 때문에 최소 3(매직넘버)을 동시 운용하진 못하지만, active-standby 설정함.
global lock을 선언해 둠(redis , rds) - 문제가 되었을 때 복구화할 수 있도록 함.
-
쿼리 리시버에서 지속적인 RDS fetch
모든 RDS remote에 값을 갱신처리해야 트래킹이 가능하기 때문 ㅠ
→ latency 문제 + RDS 매우 비쌈. reader 여러개 꽂았다고 하더라도 돈이..! (근본적인 문제점)
2. 카프카를 사용해보자! 문제 해결? (2)
2-0. 서적 추천
Apache Kafka 관련 서적 : MANNING에서 나온 책. Kafka Streams IN ACTION.
생각보다는 쉽다고 하시는데 음..!
2-1 카프카 장점
- high-performance data pipeline :
네트웍을 타는 순간 latency가 발생함. 메모리에 있는게 가장 빠르니까.
- streaming analysis
- data integration
- mission-critical : 미션을 완료하기에 도움이 될 수 있다~ 장애대응이 더 유용할 수 있다~ (이걸 하기 위해 너가 잘 만들어야한다~ 라고 이해하는게 좋다고 하심 ㅎㅎ)
2-2. 카프카 기본.
Broker(중개인) , Producer, Consumer,
-
브로커 : 카프카
를 지칭.
-
The log = event streaming, event log. 토픽 주제별로 다 모여있는 것임.
- 로그시스템: ELK, Kibana 타임스탬프 기반으로 로그가 나열되어 있다. 이 로그는 토픽으로 구분되어 있다.
-
publisher = producer
-
subscriber = consumer = receiver
-
connector = 스토리지에 있는 로그를 바로 떠서 log로 보내주거나, log를 스토리지로 넘기거나. 발표자는 커넥터는 쓰지 않는다고 함.
2-3. 카프카 스트림즈
선택 이유?
- 개발자 자율도가 매우 높음. 거의 실시간, 준 실시간으로 빠르게 쓸 수 있어서 썼다.
개발자가 만든 서버(어플리케이션 =
클러스터 1개
라고 함)와 연결해서 쓸 수 있음. 카프카는 운영팀에서 만들어주고, 이 카프카를 이용할 뿐이다.
- 리소스를 많이 줄임(연산 속도 확 낮아진다)
2-4 기존 서비스(1-3.) vs kafka 를 이용한 배치서비스
카프카: 파티션 갯수만큼 consumer 분산 할 수 있다.
group_id 에 따라 분산처리가 된다~
(파티션이 없으면 컨슈머 분산할 수 없다..ㅎㅎ 분산101)
데이터 처리 요구조건
- 중앙 저장소로 데이터 신속 전송 방법
- 피할 수 없는 오류로 인해 가동중지 시간이 짧고, 데이터 손실이 발생하지 않아야 함.
- 데이터 컨슈머를 확장할 수 있어야 함
- 모든 사용자가 데이터를 사용할 수 있어야 한다
- 누가 데이터를 보았는지, 보지 않았는지 추적할 필요가 없다.
2-5 컨슈머(group_id 로 나뉘어진) 처리 순서
같은 파티션일 때 : 타임스탬프, 즉 순서에 따라 처리가 가능하다.
즉 파티션 바깥에선 순서가 없음. 분산처리기 때문에.
우리 서비스는 순서가 중요해!! 하면 카프카 쓸 수 없음. 1개로만 처리해야하는데.. 왜 굳이?
2-6. 카프카 아키텍쳐 정리(1)
- 파티션 개수에 맞게 consumer(group id) 개수가 정해진다. (3개 파티션이면 컨슈머 3개~. 처리하는 서버 개수가 3개~ )
- 키에 의한 분할:
hashCode(keyBytes) % n
- 카프카는 파티션을 확장할 수 있음.
그러면 모듈러 연산이 바뀌고, 리밸런싱이 일어나면 → 갑자기 난리가 난다… 조심..
- 축소는 할 수 없음!
- 파티션을 커스텀 할 수 있다. 룰을 직접 만들 수 있음.
- 키를 null로 주면 round-robin으로 돌 수 있다고 함. 실제로 써본 적은 없다고 하심.
3. 데이터 유실 방지 전략
3-1. 카프카 복제
복제로 데이터 유실을 방어한다. 각 행위자별 행동 정리
-
프로듀서
- 비동기식. 콜백 call back 제공
- 속성 중,
acks
: all, 0, 1 (최소 복제 n개를 해야 성공했다는 메세지를 보내도록 세팅해야 된다)
-
컨슈머와 파티션
- 모든 인스턴스의 총 스레드 수 = 해당 토픽의 총 파티션 수 (이상적)
- 1 consumer = 1 partition 관리. (이상적인..)
→ 받은 메세지의 오프셋을 주기적으로 커밋해야 함.
오프셋
? 메세지를 고유하게 식별, 로그에서의 메세지 시작 위치.
-
리밸런싱
?
- 발생조건: 컨슈머 개수 변경되었을 때, 파티션 개수 변경되었을 때. 서버 배포할 때.
- 컨슈머에 대한 토픽-파티션 할당은 동적 할당.
3-2. 프로듀싱, 컨슈밍 실패시 전략
프로듀싱 실패시 전략
- 브로커가 죽었고, 브로커가 새로 뜨기 전까지는 메세지가 유실될 수 있음.
- 실패전략 : 로컬 큐에 넣고 워커쓰레드가 다시 보내는, 우회전략을 만들어야 함.
로컬큐가 불안하다면 remote queue를 만들어야 함. remote queue는 카프카가 아닌 SQS/SNS 등 다른 플랫폼으로 써야 한다.
컨슈밍 실패시 전략
- 카프카문제가 아닌 어플리케이션 문제이기 때문에 fallback topic을 따로 만들고,
이 실패용 데이터에 대한 어플리케이션(fallback consumer)을 따로 만들어야한다.
- 스프링 클라우드에서 컨슈밍 실패전략을 잘 정리했으므로 해당 부분 참고할 것.
4. 카프카 스트림즈
토폴로지 생성
- 토폴로지 : 위상수학에서 전체 그래프를 설명할 때. 각 프로세서 생성~!
- 서드(Serde)객체 : 역직렬화에 필요함.
- 간단한 구현 코드는 매닝 책에 있음.
https://www.manning.com/downloads/1526
(?)
KafkaStreamsYellingApp.java
- application_id = group_id라고 생각하면 됨.
스트림과 상태
- 상태가 중요함. rds 접근하지 않고 빠른 처리속도를 원하면 메모리 즉 로컬데이터로 처리해야한다. 해당 데이터의 처리 상태가 어떻게 되었는지 관리가 필요함.
- 새로 데이터가 입소하면 상태정보를 파악해야 한다.
- valueTransformer… 등을 쓸 수 있다.
- 스트림처리에서는 추가된 문맥을 **상태(state)**라 함. = 데이터베이스 테이블 같은 이미지 (최신 데이터를 업데이트 한다~)
- 카프카는 changelog (상태의 변경이 어떻게 이루어지고 있는지 로그)를 구성할 수 있음. 따라서 컨슈머가 줄거나 늘 때 데이터가 재배치 되는데 이 재배치되는 시간을 짧게 구성할 수 있음. status store를 재복구 가능함. 오류에 내구성이 강하다.
스트림즈의 타임스탬프
- 스트림 조인시 사용
- 변경로그(changelog) 업데이트 시 타임스탬프 기준으로 업데이트 & 사용
- 저수준 API (
Processor.puntuator()
)언제 작동할지 결정
KTable API
- 스트림과 테이블 사이의 관계
- 집계, 카운트 위해 업데이트 작업 (구조적으로 체인지로그, status store 를 쿼리로 사용할 수 있음)
- cf. 이벤트 vs. 업데이트 스트림?
- 이벤트스트림(kstream)
- 업데이트 스트림(k-table. 이 테이블은 status store와 연결됨) 비교
- 집계와 윈도 작업(windowing).
- 스트리밍 데이터에서 집계, 정렬, 그룹화는 필수 도구.
- 윈도잉? 윈도 연산?
- 세션 윈도 (타임스탬프를 기준으로 나눈다)
- 텀블링 윈도 [20s간격으로] [20][20][20]
- 슬라이딩(호핑) 윈도 [20초 간격인데 5초 뒤에 다시 나눈다] → [20] 5 [20] 5 [20] …
5. 할당- Uber H3 사용
이번 세션에서는 배차 최적화 계산은 안하고, 어떻게 분산처리를 할 수 있을지에 대해 나눠볼 거임.
우버에서 만든 공간을 분리하는 API : H3 User’s HHH..(헥소 지역 단위..)
경계지역은 주문과 배달원 미스매치가 일어남. 어쩔 수 없음.
다른 지역을 동시에 계산할 경우 배달원 중복이 발생함
제약사항이 있으므로, 헥사곤 7개를 15초 동안 한 바퀴 다 돌린다. (우회, 알고리즘으로..)
(이 부분은 우아한형제들 기술블로그에 자세히 나와있음. 봤던 기억이 남..!)
6. Q&A
- 메세지가 유실되어서 문제가 있는 경우는 있어도 유실되지 않았는데 문제되는 경우는 없었음. 물론 개발자가 개발하는 부분이라, hole이 있을 수 있고 이는 모니터링으로 채울 수 있음. 데이터 정합성이 100% 지켜질 순 없다..ㅠ
- fallback 용 컨슈머는 따로 만들자~ 정답은 없으나..! 현 컨슈머가 판단하는 건 문제이니까.
- Ktable windowing 을 잘 써서, 리밸런싱 시 빠르게 업데이트 할 수 있다. (위에 나온 ktable 부분, status store 참고)
- 카프카는 중복메세지 처리를 해줌. SNS에서는 보장이 안됨. ㅠ
- 모든 메세지 스키마를 동일하게 구성. 그래야지 모니터링이 수월하다. 파티셔닝하기도 쉽고~
- 해당 서비스(어플리케이션)의 역할은 프로듀싱까지 해야한다고 생각. 이벤트 리스너로 연결시켜서 처리함.
- avro IDL ? → 찾아보겠다.
- 모니터링? Ktable → Streaming 에 붓고, → 프로메테우스가 데이터를 갖고 있음. → 그라파나가 해당 로그 polling
추가로 읽어보면 좋을 내용 : 라인
내부 데이터 파이프라인에 Kafka Streams 적용하기