마이파이 서비스(사주 및 인적성 웹 앱)에는 무료 분석과 유료 분석이 존재합니다. 유료 분석의 경우, 나의 파이 분석과 사주 분석의 두 가지 유형으로 나뉘며, 특히 나의 파이 분석에서는 천간 분석, 음양 분석, 계절 분석, 오행 분석, 직업 적합도 분석 총 5개의 내용을 보고서 형태로 제공합니다.
이 과정에서 유료 분석 보고서 생성 API는 많은 데이터를 처리하고, GPT API와의 통신을 통해 생성된 결과를 DB에 저장해야 하므로 작업 시간이 길어질 수 있습니다. 따라서 이러한 작업은 Celery를 활용하여 비동기로 처리하도록 구현했습니다.
그러나 비동기 처리의 특성상 사용자 경험(UX)에 큰 불편함이 있었습니다.
유저들은 분석 요청을 보낸 후 진행 상태를 알 수 없었으며, 작업이 언제 완료될지 예상할 수 없는 문제점이 있었습니다. 물론 보고서 생성이 완료되면 푸시 알림(FCM) 및 서비스 내 알림(Notification)을 통해 사용자에게 안내가 이루어지지만, 기다리는 동안 현재 진행 상황을 실시간으로 확인할 수 없는 것이 주요 불편 사항이었습니다.
비동기 작업 진행 상태 추적 기능 도입
이러한 문제를 해결하기 위해 Celery 작업의 진행 상태를 추적할 수 있도록 새로운 로깅 및 추적 시스템을 도입했습니다. 이를 통해 사용자들은 분석 요청이 접수된 후 실시간으로 진행 상태를 확인할 수 있게 되었으며, 다음과 같은 기능을 추가하여 UX를 개선했습니다.
1. Celery 작업 진행 상태 추적 시스템 구축
기존에는 작업을 요청하고 완료 알림만 받을 수 있었던 구조에서, 각 단계별 진행 상태를 저장하고 사용자에게 전달하는 시스템을 추가했습니다. 이를 위해 다음과 같은 모델을 구성했습니다.
class CeleryLog(BaseModel):
task_progress = models.OneToOneField("task_progress.TaskProgress", on_delete=models.CASCADE, null=True, blank=True)
name = models.CharField(max_length=100, verbose_name="태스크 명")
task_id = models.CharField(max_length=36, verbose_name="태스크 ID")
status = models.CharField(
max_length=10,
verbose_name="태스크 상태",
choices=CeleryLogStatusChoices.choices,
default=CeleryLogStatusChoices.PENDING,
)
message = models.TextField(verbose_name="메세지", blank=True, default="")
args = models.CharField(default=list, max_length=65535, blank=True, verbose_name="태스크 args")
kwargs = models.CharField(default=dict, max_length=65535, blank=True, verbose_name="태스크 kwargs")
class Meta:
db_table = "celery_log"
verbose_name = "셀러리 로그"
verbose_name_plural = verbose_name
ordering = ["-created_at"]
class TaskProgress(BaseModel):
four_pillar = models.ForeignKey("four_pillar.FourPillar", on_delete=models.CASCADE, verbose_name="사주")
payment = models.ForeignKey("payment.Payment", on_delete=models.CASCADE, null=True, blank=True, verbose_name="결제")
product_name = models.CharField(max_length=100, verbose_name="상품 이름")
status = models.CharField(
max_length=100,
choices=TaskProgressStatusChoices.choices,
default=TaskProgressStatusChoices.IN_PROGRESS,
verbose_name="테스크 상태",
)
class Meta:
db_table = "task_progress"
verbose_name = "테스크 진행 상황"
verbose_name_plural = verbose_name
ordering = ["-created_at"]
위의 모델을 구성함으로써 비동기 작업의 상태 조회가 가능했습니다.
2. Celery 시그널을 활용한 작업 상태 업데이트
Celery의 시그널을 활용하여 작업이 시작되기 전(task_prerun), 성공(task_success), 실패(task_failure) 상태를 기록하도록 개선했습니다.
작업 시작 전 진행 상태 추적
@task_prerun.connect
def task_celery_prerun(sender=None, **kwargs):
if not sender:
return
task_progress_id = kwargs.get("args", [None])[-1]
if not TaskProgress.objects.filter(id=task_progress_id).exists():
raise ValidationError(f"TaskProgress ID {task_progress_id} does not exist.")
CeleryLog.objects.get_or_create(
task_progress_id=task_progress_id,
defaults={
"task_id": kwargs["task_id"],
"name": sender.name.split(".")[-1],
"status": CeleryLogStatusChoices.PENDING,
"args": json.dumps(kwargs.get("args", [])),
"kwargs": json.dumps(kwargs.get("kwargs", {})),
},
)
- 작업이 시작되기 전에 진행 상태를 PENDING으로 기록.
- 작업 요청과 함께 전달된 task_progress_id를 기반으로 데이터 검증.
작업 성공 시 상태 업데이트 및 알림 발송
@task_success.connect
def task_celery_success(sender=None, **kwargs):
celery_log = CeleryLog.objects.filter(task_id=sender.request.id)
celery_log.update(
status=CeleryLogStatusChoices.SUCCESS,
)
task_progress = celery_log.first().task_progress
four_pillar = task_progress.four_pillar
all_task_progress = TaskProgress.objects.filter(four_pillar=four_pillar, product_name=task_progress.product_name)
user = four_pillar.child if four_pillar.child is not None else four_pillar.user
# 모든 작업 완료 시 푸시 알림 전송
if all(task.status == TaskProgressStatusChoices.COMPLETE for task in TaskProgress.objects.filter(four_pillar=four_pillar)):
push_log = PushLog.objects.create(
push_type=PushTypeChoices[task_progress.product_name],
title=f"[{four_pillar.user.username}]님의 보고서가 완성되었습니다.",
content="보고서를 확인해보세요!",
target_id=four_pillar.id,
)
push_log.send()
- 작업이 성공적으로 완료되면 상태를 SUCCESS로 업데이트.
- 모든 하위 작업이 완료되면 푸시 알림을 전송하여 사용자에게 즉시 인지시킴.
작업 실패 시 상태 업데이트 및 오류 처리 및 환불 처리
@task_failure.connect
def task_celery_failure(sender=None, **kwargs):
if not sender:
return
celery_log = CeleryLog.objects.filter(task_id=kwargs["task_id"])
celery_log.update(
status=CeleryLogStatusChoices.FAILURE,
message=kwargs.get("einfo").exception,
)
task_progress = celery_log.first().task_progress
# 실패한 테스크의 환불 처리 및 롤백 처리
fail_task_progress(task_progress)
- 작업 실패 시 FAILURE 상태로 업데이트하고, 오류 내용을 기록.
- 실패한 작업에 대해 별도의 핸들링 로직 실행.
3. UX 개선 효과
1. 진행 상태 확인 가능:
• 유저가 분석 요청 후, 현재 작업이 진행 중인지, 완료되었는지, 실패했는지를 조회 가능.
• 프론트엔드에서 진행률 UI 제공 (예: “분석 중… 50% 완료”).
2. 즉각적인 피드백 제공:
• 완료 시 푸시 알림을 통해 사용자 경험 향상.
• 실패 시 빠른 재시도를 유도할 수 있는 UX 제공.
3. 운영 효율성 증가:
• 관리자 페이지에서도 각 작업의 상태를 모니터링 가능.
• 실패한 작업을 분석하고 재처리할 수 있는 로직 추가.
이전에는 비동기 작업이 완료될 때까지 유저가 상태를 알 수 없었으나, 이번 개선을 통해 현재 작업 상태를 실시간으로 추적할 수 있는 기능이 추가되었습니다.
사용자는 보고서 생성 진행률을 실시간으로 확인할 수 있게 되었고 UX 적으로 크게 개선이 되었습니다.
마이파이 서비스 보러가기 : https://www.mypie.me/
마이파이 | 메인
조각들이 모여 완성되는 나의 정체성
www.mypie.me
추가로 어떻게 celery가 추적이 가능한 지에 대해서 설명드릴게요.
Celery의 시그널(@task_prerun.connect, @task_success.connect, @task_failure.connect)은 비동기 작업의 진행 상태를 추적하는 강력한 도구입니다. 이러한 시그널을 사용하면 Celery 작업의 라이프사이클 이벤트(시작, 성공, 실패)를 감지하고 특정 작업을 수행할 수 있습니다.
Celery 시그널 개요
Celery는 작업(task)이 실행될 때 발생하는 다양한 이벤트를 추적할 수 있도록 시그널(Signal) 기능을 제공합니다. 이를 통해 특정 작업이 실행되기 전, 완료된 후, 실패했을 때 등의 상태를 감지하고, 원하는 처리를 수행할 수 있습니다.
주요 Celery 시그널 종류
1. @task_prerun.connect - 작업이 실행되기 직전에 실행됨.
2. @task_success.connect - 작업이 성공적으로 완료되었을 때 실행됨.
3. @task_failure.connect - 작업이 실패했을 때 실행됨.
4. 기타 시그널
• @task_postrun.connect: 작업이 실행된 후(성공/실패 상관없이) 실행됨.
• @before_task_publish.connect: 작업이 브로커로 전달되기 전에 실행됨.
• @task_retry.connect: 작업이 재시도될 때 실행됨.
1. @task_prerun.connect - 작업 실행 직전 (Before Execution)
이 시그널은 Celery 작업이 시작되기 바로 직전에 실행됩니다. 작업이 실행되기 전에 로깅, 인증, 환경 설정 등의 작업을 수행할 수 있습니다.
시그널 트리거 시점: 작업이 Celery worker에서 실행되기 직전.
2. @task_success.connect - 작업 성공 후 (After Successful Execution)
이 시그널은 작업이 성공적으로 완료되었을 때 실행됩니다. 이를 통해 성공적인 작업을 로깅하고, 후속 작업(알림 발송, 상태 업데이트 등)을 수행할 수 있습니다.
시그널 트리거 시점:Celery 작업이 정상적으로 완료된 직후.
3. @task_failure.connect - 작업 실패 시 (On Task Failure)
이 시그널은 Celery 작업이 실패했을 때 호출됩니다. 실패 원인을 기록하고, 재시도 로직 또는 알림을 트리거하는 데 사용할 수 있습니다.
시그널 트리거 시점:작업 실행 중 예외가 발생하여 실패했을 때.
3. @task_postrun.connect - 작업 완료 시 (On Task Done)
이 시그널은 Celery 작업이 종료되었을 때 호출됩니다. 성공 / 실패 여부 상관없이 종료 후 실행됩니다.
시그널 트리거 시점:작업 실행이 완료 되었을 때
'django' 카테고리의 다른 글
Django에서 API GateWay와 Dynamo DB를 이용한 채팅 (0) | 2025.02.12 |
---|---|
구글 인앱 결제 웹훅 pub sub (0) | 2025.02.10 |
Django Admin 커스텀 (0) | 2022.11.22 |
Django ORM (0) | 2022.03.09 |
Django와 Axios를 이용한 좋아요 구현하기 (0) | 2022.01.15 |