개요
회사에서 설계했던 STT 프로젝트의 아키텍처를 후회없이 개선해보고 떠나보내고 싶었다 💌
따라서 며칠간 STT api의 트래픽 제어를 위해 어떤 기술을 도입해 사용해볼지 고민했다. 이번주에 Non-blocking/Blocking에 대한 개념에 대해 공부하고 보니 event queue에 대해 좀 더 넓은 시야로 접근해볼 수 있었다. 메시지큐에는 Kafka,, RabbitMQ,, Redis,, 종류가 정말 다양한데 비교적 러닝커브가 낮은 Redis를 도입해보기로 하였다. 우선 Redis, 그리고 Event Queue에 대해 간략히 알아보자.
Redis
Redis(REmote DIctionary Server)는 주로 캐시나 빠른 응답 속도를 가진 DB가 필요할 때 사용하는 오픈 소스 인메모리 NoSQL 이다. 키/값 으로 저장되고, Redis는 디스크나 SSD가 아닌 메모리에 데이터를 저장하기 때문에 탁월한 속도, 안정성, 성능을 한다.
애플리케이션에서 RDBMS 서버만을 사용하는 경우 소스의 대기 시간과 처리량에 있어서, 트래픽이 증가하거나 애플리케이션이 확장될 때 성능 병목 현상을 일으킬 수 있다. 이런 경우 애플리케이션과 물리적으로 더 가까운 메모리에 데이터를 저장하고 조작하면 된다. 즉, Redis를 사용해 데이터를 메모리에 저장하여 데이터를 읽거나 쓸 때의 성능을 최대화하고, 데이터를 사용자에게 더 가깝게 배치하여 대기 시간을 최소화할 수 있다.
asynchronous, non-blocking
Redis Event Queue에 대한 공식문서를 살펴보자. 아주 반가운,, async-non blocking 방식이 보인다. event queue, 그리고 pub-sub 구조는 기본적으로 asynchronous, non-blocking 방식이다.
async-non blocking의 가장 큰 특징은 "A에서 B 호출 이후 다른 무언가(예를 들어 작업 C)를 하고 있던지 신경쓰지 않고 B는 자기가 끝나면 return한다"는 것이었다. pub은 메시지를 만들어 queue 속에 넣어둔다. 그러면 해당 pub을 구독하고 있는 sub들에게는 이 메시지들이 수신하게 된다. pub은 sub이 메시지를 받았는지 여부에 개의치 않고, 큐에 메시지들을 넣는다. sub 은 pub에서 다른 요청을 처리하고 있던지 관계없이 메시지들을 소비한다.
asynchronous, blocking 도 지원 !
추가로, Redis는 이 이벤트큐를 blocking으로 꺼내오는 방식 또한 사용할 수 있도록 제공해준다고 한다. 명령어에 차이가 있는데, RPOP은 non-blocking, BRPOP은 blocking하도록 지원한다. blocking 로직에서는 sending client가 한 번 보낸 내용을 처리하는 동안 sending client가 다른 작업을 할 수 없도록 막는다는 특징이 있다.
아직 sync/async, blocking/nonblocking에 대한 개념을 모르겠다면 여기를 참고하자.
초기 고민들
내가 구현했던 api는 각각 다른 GPU로 돌아가는 flask api 두 개였다. 이 때 유저들의 요청이 한번에 들어오게 되면 GPU에 부하가 걸려 GPU OOM 오류가 빈번하게 일어났다. 따라서 이 두개의 서버에 적절히 나누어 처리한 후, 대기열을 통해 여러 유저들의 요청을 대기시켜놓고 "나중에라도 꼭 처리될 수 있도록" return 해주고 싶었다.
우선 구현 전에 내가 했던 바보같은 고민들에 대해 설명해보려고한다.. 내용이 길지만 핵심은 'Redis를 로드밸런싱에도 활용할 수 있을까?'였고 답은 '굳이?! 그런 의도로 만들어진 게 아니다 '이다. 아래 고민을 해결하는 과정에서 pub/sub 구조와 event 큐 이해도가 높아질 수 있었다. 쓰잘대기 없는 고민일 수 있음 주의.
구현 이전 고민1. 큐에 publisher들이 데이터를 적재하면 적절히 두 서버가 나누어 처리하도록 구현할 수 없을까?
결론적으로 내가 한 질문에 대한 대답은 "그건 load balancing으로 해라" 이다. pub-sub에 대한 이해를 잘 하도록 하자!
redis queue를 사용한 pub-sub 모델에서 push 와 pull 용어에 대해 잘 구분해야 한다. push 방식은 publisher가 메시지를 Push 하면 subscriber들이 모두 같은 메시지들을 받게 되는 것을 의미하고. pull 방식은 subscriber가 publisher이 만들어둔 메시지를 가져오는 것을 의미한다.
자자, 이 가운데에 낀 이것 !! 이 이벤트 채널은 publisher들이 발행하는 이벤트들을 알아서 subscriber에게 push 해주며 pub-sub 간을 디커플링하는 역할을 한다. 이 때 중요한 게, subscriber는 메시지를 "필요에 따라 꺼내가는(push)" 방식이 아니라, 채널을 구독하고 메시지가 도착할 때까지 기다리는 "풀(pull)" 방식이라는 거다.
구현 이전 고민2. 그러면 Redis cluster를 이용해서 데이터를 자동으로 여러 개의 Redis 노드에 나누어 저장한 후에 각각의 sub한테 보내면 되지 않나?
쿠버네티스와 redis 등에서 분산시스템을 위해 파드, 클러스터 등의 용어를 가진 여러 노드를 사용한다. Redis에서도 cluster로 이 분산시스템을 적용하고 있다. 일단 결론적으로 말하면 이 분산시스템은 로드밸런싱 목적으로 만들어진 게 아니다. Redis queue에는 Kafka의 컨슈머 그룹같은 분산처리 개념이 없다. 😥
Redis cluster의 존재 이유는 아래와 같다.
1. 자동으로 여러 노드에 데이터를 나누어 저장할 수 있다.
2. 따라서 일부 노드가 죽거나 통신이 되지 않을 때 작업을 계속할 수 있다.
물론 여러 노드에 데이터를 나누어 저장할 수 있지만 이는 "가용성 증진을 위한 노드간 데이터 분산"에 초점을 맞춘 것이지, 내가 의도한 것처럼 로드밸런싱을 의도한 것이 아니다.
... 이쯤 이론을 파악하면 로드밸런싱에 대한 고민이 바보같은 고민이었다는 걸 알게된다.
아키텍처
따라서 기존에 있던 Kong API GATEWAY에서 로드밸런싱을 처리하고, 받은 데이터들에 대해서 Redis Queue를 도입해 각각의 api에 연결하기로 했다.
자세한 내용은 아래 코드를 보며 더 알아보자. flask-rq에 대한 구현 글이 많지 않아 github 에서 여러 소스코드 예제를 찾아서 구현해 볼 수 있었다.
구현
우선 아래와 같이 redis와 rq를 사용하기 위해 연결 주소를 할당해준다. 이 파일은 따로 cmd 창으로 들어가 백그라운드에서 계속 돌아가도록 한다.
worker.py
import os
import redis
from rq import Worker, Queue, Connection
listen = ['default']
redis_url = os.getenv('REDISTOGO_URL', 'redis://localhost:6379')
conn = redis.from_url(redis_url)
if __name__ == '__main__':
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
순서가 보장되는 FIFO 자료구조 REDIS QUEUE(rq) 사용
Redis에서는 여러 자료구조를 제공해준다. Redis의 sorted set과 rq라는 python redis queue 자료구조 중 어떤 걸로 구현할지 고민했다. 현재 아키텍처는 우선순위가 계속해서 바뀌는 상황이 아니고, sorted set 자료구조는 주로 랭킹, 우선순위, 범위 기반의 조회 등에 사용되므로, 간단히 FIFO 구조를 가지는 rq 자료구조를 써서 선입선출로 이벤트를 가져오기로 하였다.
async/non-blocking
아래 작업에서는 client가 POST 요청으로 job_id를 body에 포함시키게 보낸다. 이벤트 큐에 던진다는 느낌이라고 생각해도 좋을 것 같다. 이렇게 이벤트가 던져지면, 즉 push 되면 아래에서 q.enqueue(random_job, job_id=job_id) 에 해당 내용이 반영되며 random_job 함수가 이 큐 속 내용들을 처리하게 된다.
큐에서 작업이 완료되면 Redis 에 위에서 말했던 메인메모리 저장방식으로 이 결과값을 저장해둔다. 그리고 rq에서는 해당 job을 삭제한다. 이후 client는 GET으로 이 내용을 꺼내올 수 있다. 이 내용은 polling 방식으로 가져오면 되겠다.
대기자 정보 조회
아래와 같은 화면을 살면서 한번쯤은 봤을 것이다. 내 앞에 몇 명의 대기자가 있는지 파악하는 api도 추가해 보았다.(아래 elif job.is_queued: 조건) 이 api를 사용하려면 실시간으로 계속 가져오는 Websocket 방식으로 (클라이언트와 서버가 자신이 원할 때 데이터를 보낼 수 있는 방법) 사용해 볼 수도 있겠고, polling 방식으로 주기를 정해 api를 요청해올 수도 있을 것이다. 이 때 websocket은 sync-blocking 방식, polling은 sync-nonblocking 방식이다. 내 api는 Polling을 의도하고 설계되었다.
main.py
from flask import Flask, jsonify, request
from redis import Redis
from rq import Queue
from method import random_job
app = Flask(__name__)
redis = Redis()
q = Queue(connection=redis)
@app.route('/job_result/<job_id>', methods=['GET'])
def poll_result(job_id):
job = q.fetch_job(job_id)
if job is not None:
if job.is_finished:
result = job.result
# 작업이 완료되면 Redis에 결과 저장하고 q에서 job 삭제
redis.set(f'result:{job_id}', result)
job.cleanup()
return jsonify({'status': 'success', 'message': 'Job finished', 'result': result})
elif job.is_failed:
job.cleanup()
return jsonify({'status': 'error', 'message': 'Job failed'})
elif job.is_started:
# 현재 작업 중
return jsonify({'status': 'success', 'message': f'Job still in progress, position in queue: 0'})
elif job.is_queued:
# 큐에서 현재 작업 위치
queued_jobs = q.job_ids
position = queued_jobs.index(job_id) + 1
return jsonify({'status': 'success', 'message': f'Job queued, 현재 진행상황: {position}/{len(queued_jobs)}'})
else:
return jsonify({'status': 'error', 'message': 'Job not found'})
@app.route('/enqueue', methods=['POST'])
def enqueue_job():
data = request.get_json()
job_id = data.get('job_id')
if job_id:
q.enqueue(random_job, job_id=job_id)
return jsonify({'status': 'success', 'message': f'Job {job_id} enqueued'})
else:
return jsonify({'status': 'error', 'message': 'Invalid data'})
if __name__ == '__main__':
app.run('0.0.0.0', port=8081, debug=True)
아래 random_job 함수에 이벤트들을 이용해 처리하고자 하는 작업 내용이 들어가 주면 된다. 여기에서는 임의로 random 값동안 대기하는 함수로 구현했다.
RQ 사용법 글에서 확인해볼 수 있는데, 다음과 같이 함수들을 다른 파일에 빼서 작성해두어야 한다고 한다. 그리고 위에서 봤던 이 코드 방식(q.enqueue(random_job, job_id=job_id) )으로 이 job_id,, 등의 parameter를 넘길 수 있다. RQ 사용법 글 → 여기 잘 나와있다.
method.py
import time
import random
def random_job(*args, **kargs): #작업할 내용에 대해 적어주면 된다. 여기에서는 일단 랜덤값으로 기다리도록 구현
t = random.randint(1,10)
time.sleep(t)
return t
def on_success(job, connection, result, *args, **kargs):
print(f"the job {job.latest_result().job_id} terminated successfully with result: {result}")
def on_failure(job, connection, type, value, traceback, *args, **kargs):
print(f"the job {job.latest_result().job_id} terminated with errors: {traceback}")
postman과 queue 모습
우선 Redis와 서버를 켜준다.
이후 post 요청으로 내가 넣고 싶은 데이터를 넣는다.
우리 회사라면 wav url을 넣으면 된다.
이후 내가 원하는 데이터명(wav url) 에 대해서 조회해본다. 지금은 5번째 애(test5)를 처리하고 있어서 test6는 큐에서 전체 1개 중 1번째에 존재해 있는다.
(만약 큐에 3개의 데이터가 들어가 있고 내가 찾는 값은 큐에서 2번째 위치에 존재한다면 2/3으로 표시된다.)
이후 test6이 모두 실행되면 해당 job이 끝났음을 알 수 있고 그에 대한 결과값(여기에서는 랜덤값을 2로 설정했나보다)을 도출해준다. 회사 api의 경우에는 stt result 값이 들어오게 된다.
참고
https://www.ibm.com/kr-ko/topics/redis
https://realpython.com/flask-by-example-implementing-a-redis-task-queue/
https://redis.com/glossary/event-queue/
https://python-rq.org/
마무리
간단한 아키텍처라 구현이 비교적 간단하긴 했으나 그래도 원하던 아키텍처가 제대로 나온 것 같아 기쁘다 vV 😆
담번엔 좀 더 고도화된 디커플링 과정을 경험해보자 !!
'BackEnd' 카테고리의 다른 글
[Java|Spring] 동시성 문제 (0) | 2024.07.21 |
---|---|
[OS] Sync/Async, Blocking/NonBlocking (0) | 2024.01.22 |
[DB] @Transactional 이해하고 사용하기 (0) | 2024.01.17 |
[Java|Spring|JSP] Koala 출석부 제작 후기 ..그리고 보완 방향! (0) | 2024.01.16 |
[DB] RDS DB 마이그레이션 과정 - RDB와 NoSQL (0) | 2024.01.06 |