본문 바로가기

django

Django 비동기 작업 추적 기능 개선

마이파이 서비스(사주 및 인적성 웹 앱)에는 무료 분석과 유료 분석이 존재합니다. 유료 분석의 경우, 나의 파이 분석사주 분석의 두 가지 유형으로 나뉘며, 특히 나의 파이 분석에서는 천간 분석, 음양 분석, 계절 분석, 오행 분석, 직업 적합도 분석 총 5개의 내용을 보고서 형태로 제공합니다.

 

이 과정에서 유료 분석 보고서 생성 API는 많은 데이터를 처리하고, GPT API와의 통신을 통해 생성된 결과를 DB에 저장해야 하므로 작업 시간이 길어질 수 있습니다. 따라서 이러한 작업은 Celery를 활용하여 비동기로 처리하도록 구현했습니다.

 

그러나 비동기 처리의 특성상 사용자 경험(UX)에 큰 불편함이 있었습니다.

유저들은 분석 요청을 보낸 후 진행 상태를 알 수 없었으며, 작업이 언제 완료될지 예상할 수 없는 문제점이 있었습니다. 물론 보고서 생성이 완료되면 푸시 알림(FCM) 및 서비스 내 알림(Notification)을 통해 사용자에게 안내가 이루어지지만, 기다리는 동안 현재 진행 상황을 실시간으로 확인할 수 없는 것이 주요 불편 사항이었습니다.

 

비동기 작업 진행 상태 추적 기능 도입

 

이러한 문제를 해결하기 위해 Celery 작업의 진행 상태를 추적할 수 있도록 새로운 로깅 및 추적 시스템을 도입했습니다. 이를 통해 사용자들은 분석 요청이 접수된 후 실시간으로 진행 상태를 확인할 수 있게 되었으며, 다음과 같은 기능을 추가하여 UX를 개선했습니다.

 

1. Celery 작업 진행 상태 추적 시스템 구축

 

기존에는 작업을 요청하고 완료 알림만 받을 수 있었던 구조에서, 각 단계별 진행 상태를 저장하고 사용자에게 전달하는 시스템을 추가했습니다. 이를 위해 다음과 같은 모델을 구성했습니다.

 

class CeleryLog(BaseModel):
    task_progress = models.OneToOneField("task_progress.TaskProgress", on_delete=models.CASCADE, null=True, blank=True)
    name = models.CharField(max_length=100, verbose_name="태스크 명")
    task_id = models.CharField(max_length=36, verbose_name="태스크 ID")
    status = models.CharField(
        max_length=10,
        verbose_name="태스크 상태",
        choices=CeleryLogStatusChoices.choices,
        default=CeleryLogStatusChoices.PENDING,
    )
    message = models.TextField(verbose_name="메세지", blank=True, default="")
    args = models.CharField(default=list, max_length=65535, blank=True, verbose_name="태스크 args")
    kwargs = models.CharField(default=dict, max_length=65535, blank=True, verbose_name="태스크 kwargs")

    class Meta:
        db_table = "celery_log"
        verbose_name = "셀러리 로그"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]
        
class TaskProgress(BaseModel):
    four_pillar = models.ForeignKey("four_pillar.FourPillar", on_delete=models.CASCADE, verbose_name="사주")
    payment = models.ForeignKey("payment.Payment", on_delete=models.CASCADE, null=True, blank=True, verbose_name="결제")
    product_name = models.CharField(max_length=100, verbose_name="상품 이름")
    status = models.CharField(
        max_length=100,
        choices=TaskProgressStatusChoices.choices,
        default=TaskProgressStatusChoices.IN_PROGRESS,
        verbose_name="테스크 상태",
    )

    class Meta:
        db_table = "task_progress"
        verbose_name = "테스크 진행 상황"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]

 

위의 모델을 구성함으로써 비동기 작업의 상태 조회가 가능했습니다.

 

2. Celery 시그널을 활용한 작업 상태 업데이트

 

Celery의 시그널을 활용하여 작업이 시작되기 전(task_prerun), 성공(task_success), 실패(task_failure) 상태를 기록하도록 개선했습니다.

 

작업 시작 전 진행 상태 추적

@task_prerun.connect
def task_celery_prerun(sender=None, **kwargs):
    if not sender:
        return

    task_progress_id = kwargs.get("args", [None])[-1]

    if not TaskProgress.objects.filter(id=task_progress_id).exists():
        raise ValidationError(f"TaskProgress ID {task_progress_id} does not exist.")

    CeleryLog.objects.get_or_create(
        task_progress_id=task_progress_id,
        defaults={
            "task_id": kwargs["task_id"],
            "name": sender.name.split(".")[-1],
            "status": CeleryLogStatusChoices.PENDING,
            "args": json.dumps(kwargs.get("args", [])),
            "kwargs": json.dumps(kwargs.get("kwargs", {})),
        },
    )

- 작업이 시작되기 전에 진행 상태를 PENDING으로 기록.

- 작업 요청과 함께 전달된 task_progress_id를 기반으로 데이터 검증.

 

작업 성공 시 상태 업데이트 및 알림 발송

@task_success.connect
def task_celery_success(sender=None, **kwargs):
	celery_log = CeleryLog.objects.filter(task_id=sender.request.id)
    celery_log.update(
        status=CeleryLogStatusChoices.SUCCESS,
    )
    task_progress = celery_log.first().task_progress
    four_pillar = task_progress.four_pillar
    all_task_progress = TaskProgress.objects.filter(four_pillar=four_pillar, product_name=task_progress.product_name)
    user = four_pillar.child if four_pillar.child is not None else four_pillar.user

    # 모든 작업 완료 시 푸시 알림 전송
    if all(task.status == TaskProgressStatusChoices.COMPLETE for task in TaskProgress.objects.filter(four_pillar=four_pillar)):
        push_log = PushLog.objects.create(
            push_type=PushTypeChoices[task_progress.product_name],
            title=f"[{four_pillar.user.username}]님의 보고서가 완성되었습니다.",
            content="보고서를 확인해보세요!",
            target_id=four_pillar.id,
        )
        push_log.send()

- 작업이 성공적으로 완료되면 상태를 SUCCESS로 업데이트.

- 모든 하위 작업이 완료되면 푸시 알림을 전송하여 사용자에게 즉시 인지시킴.

 

작업 실패 시 상태 업데이트 및 오류 처리 및 환불 처리

@task_failure.connect
def task_celery_failure(sender=None, **kwargs):
    if not sender:
        return

    celery_log = CeleryLog.objects.filter(task_id=kwargs["task_id"])

    celery_log.update(
        status=CeleryLogStatusChoices.FAILURE,
        message=kwargs.get("einfo").exception,
    )
    task_progress = celery_log.first().task_progress
	
    # 실패한 테스크의 환불 처리 및 롤백 처리
    fail_task_progress(task_progress)

- 작업 실패 시 FAILURE 상태로 업데이트하고, 오류 내용을 기록.

- 실패한 작업에 대해 별도의 핸들링 로직 실행.

 

3. UX 개선 효과

 

1. 진행 상태 확인 가능:

유저가 분석 요청 후, 현재 작업이 진행 중인지, 완료되었는지, 실패했는지를 조회 가능.

프론트엔드에서 진행률 UI 제공 (예: “분석 중… 50% 완료”).

2. 즉각적인 피드백 제공:

완료 시 푸시 알림을 통해 사용자 경험 향상.

실패 시 빠른 재시도를 유도할 수 있는 UX 제공.

3. 운영 효율성 증가:

관리자 페이지에서도 각 작업의 상태를 모니터링 가능.

실패한 작업을 분석하고 재처리할 수 있는 로직 추가.

 

이전에는 비동기 작업이 완료될 때까지 유저가 상태를 알 수 없었으나, 이번 개선을 통해 현재 작업 상태를 실시간으로 추적할 수 있는 기능이 추가되었습니다.

사용자는 보고서 생성 진행률을 실시간으로 확인할 수 있게 되었고 UX 적으로 크게 개선이 되었습니다.

개선된 페이지

마이파이 서비스 보러가기 : https://www.mypie.me/

 

마이파이 | 메인

조각들이 모여 완성되는 나의 정체성

www.mypie.me

 

 

추가로 어떻게 celery가 추적이 가능한 지에 대해서 설명드릴게요.

Celery의 시그널(@task_prerun.connect, @task_success.connect, @task_failure.connect)은 비동기 작업의 진행 상태를 추적하는 강력한 도구입니다. 이러한 시그널을 사용하면 Celery 작업의 라이프사이클 이벤트(시작, 성공, 실패)를 감지하고 특정 작업을 수행할 수 있습니다.

 

Celery 시그널 개요

 

Celery는 작업(task)이 실행될 때 발생하는 다양한 이벤트를 추적할 수 있도록 시그널(Signal) 기능을 제공합니다. 이를 통해 특정 작업이 실행되기 전, 완료된 후, 실패했을 때 등의 상태를 감지하고, 원하는 처리를 수행할 수 있습니다.

 

주요 Celery 시그널 종류

1. @task_prerun.connect - 작업이 실행되기 직전에 실행됨.

2. @task_success.connect - 작업이 성공적으로 완료되었을 때 실행됨.

3. @task_failure.connect - 작업이 실패했을 때 실행됨.

4. 기타 시그널

@task_postrun.connect: 작업이 실행된 후(성공/실패 상관없이) 실행됨.

@before_task_publish.connect: 작업이 브로커로 전달되기 전에 실행됨.

@task_retry.connect: 작업이 재시도될 때 실행됨.

 

1. @task_prerun.connect - 작업 실행 직전 (Before Execution)

이 시그널은 Celery 작업이 시작되기 바로 직전에 실행됩니다. 작업이 실행되기 전에 로깅, 인증, 환경 설정 등의 작업을 수행할 수 있습니다.

시그널 트리거 시점: 작업이 Celery worker에서 실행되기 직전.

 

2. @task_success.connect - 작업 성공 후 (After Successful Execution)

이 시그널은 작업이 성공적으로 완료되었을 때 실행됩니다. 이를 통해 성공적인 작업을 로깅하고, 후속 작업(알림 발송, 상태 업데이트 등)을 수행할 수 있습니다.

시그널 트리거 시점:Celery 작업이 정상적으로 완료된 직후.

 

3. @task_failure.connect - 작업 실패 시 (On Task Failure)

이 시그널은 Celery 작업이 실패했을 때 호출됩니다. 실패 원인을 기록하고, 재시도 로직 또는 알림을 트리거하는 데 사용할 수 있습니다.

시그널 트리거 시점:작업 실행 중 예외가 발생하여 실패했을 때.

 

3. @task_postrun.connect - 작업 완료 시 (On Task Done)

이 시그널은 Celery 작업이 종료되었을 때 호출됩니다. 성공 / 실패 여부 상관없이 종료 후 실행됩니다.

시그널 트리거 시점:작업 실행이 완료 되었을 때

'django' 카테고리의 다른 글

Django에서 API GateWay와 Dynamo DB를 이용한 채팅  (0) 2025.02.12
구글 인앱 결제 웹훅 pub sub  (0) 2025.02.10
Django Admin 커스텀  (0) 2022.11.22
Django ORM  (0) 2022.03.09
Django와 Axios를 이용한 좋아요 구현하기  (0) 2022.01.15