본문 바로가기

django

Django 비동기 작업 추적 기능 개선

마이파이 서비스(사주 및 인적성 웹 앱)에는 무료 분석과 유료 분석이 존재합니다. 유료 분석의 경우, 나의 파이 분석사주 분석의 두 가지 유형으로 나뉘며, 특히 나의 파이 분석에서는 천간 분석, 음양 분석, 계절 분석, 오행 분석, 직업 적합도 분석 총 5개의 내용을 보고서 형태로 제공합니다.

 

이 과정에서 유료 분석 보고서 생성 API는 많은 데이터를 처리하고, GPT API와의 통신을 통해 생성된 결과를 DB에 저장해야 하므로 작업 시간이 길어질 수 있습니다. 따라서 이러한 작업은 Celery를 활용하여 비동기로 처리하도록 구현했습니다.

 

그러나 비동기 처리의 특성상 사용자 경험(UX)에 큰 불편함이 있었습니다.

유저들은 분석 요청을 보낸 후 진행 상태를 알 수 없었으며, 작업이 언제 완료될지 예상할 수 없는 문제점이 있었습니다. 물론 보고서 생성이 완료되면 푸시 알림(FCM) 및 서비스 내 알림(Notification)을 통해 사용자에게 안내가 이루어지지만, 기다리는 동안 현재 진행 상황을 실시간으로 확인할 수 없는 것이 주요 불편 사항이었습니다.

 

비동기 작업 진행 상태 추적 기능 도입

 

이러한 문제를 해결하기 위해 Celery 작업의 진행 상태를 추적할 수 있도록 새로운 로깅 및 추적 시스템을 도입했습니다. 이를 통해 사용자들은 분석 요청이 접수된 후 실시간으로 진행 상태를 확인할 수 있게 되었으며, 다음과 같은 기능을 추가하여 UX를 개선했습니다.

 

1. Celery 작업 진행 상태 추적 시스템 구축

 

기존에는 작업을 요청하고 완료 알림만 받을 수 있었던 구조에서, 각 단계별 진행 상태를 저장하고 사용자에게 전달하는 시스템을 추가했습니다. 이를 위해 다음과 같은 모델을 구성했습니다.

 

class CeleryLog(BaseModel):
    task_progress = models.OneToOneField("task_progress.TaskProgress", on_delete=models.CASCADE, null=True, blank=True)
    name = models.CharField(max_length=100, verbose_name="태스크 명")
    task_id = models.CharField(max_length=36, verbose_name="태스크 ID")
    status = models.CharField(
        max_length=10,
        verbose_name="태스크 상태",
        choices=CeleryLogStatusChoices.choices,
        default=CeleryLogStatusChoices.PENDING,
    )
    message = models.TextField(verbose_name="메세지", blank=True, default="")
    args = models.CharField(default=list, max_length=65535, blank=True, verbose_name="태스크 args")
    kwargs = models.CharField(default=dict, max_length=65535, blank=True, verbose_name="태스크 kwargs")

    class Meta:
        db_table = "celery_log"
        verbose_name = "셀러리 로그"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]
        
class TaskProgress(BaseModel):
    four_pillar = models.ForeignKey("four_pillar.FourPillar", on_delete=models.CASCADE, verbose_name="사주")
    payment = models.ForeignKey("payment.Payment", on_delete=models.CASCADE, null=True, blank=True, verbose_name="결제")
    product_name = models.CharField(max_length=100, verbose_name="상품 이름")
    status = models.CharField(
        max_length=100,
        choices=TaskProgressStatusChoices.choices,
        default=TaskProgressStatusChoices.IN_PROGRESS,
        verbose_name="테스크 상태",
    )

    class Meta:
        db_table = "task_progress"
        verbose_name = "테스크 진행 상황"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]

 

위의 모델을 구성함으로써 비동기 작업의 상태 조회가 가능했습니다.

 

2. Celery 시그널을 활용한 작업 상태 업데이트

 

Celery의 시그널을 활용하여 작업이 시작되기 전(task_prerun), 성공(task_success), 실패(task_failure) 상태를 기록하도록 개선했습니다.

 

작업 시작 전 진행 상태 추적

@task_prerun.connect
def task_celery_prerun(sender=None, **kwargs):
    if not sender:
        return

    task_progress_id = kwargs.get("args", [None])[-1]

    if not TaskProgress.objects.filter(id=task_progress_id).exists():
        raise ValidationError(f"TaskProgress ID {task_progress_id} does not exist.")

    CeleryLog.objects.get_or_create(
        task_progress_id=task_progress_id,
        defaults={
            "task_id": kwargs["task_id"],
            "name": sender.name.split(".")[-1],
            "status": CeleryLogStatusChoices.PENDING,
            "args": json.dumps(kwargs.get("args", [])),
            "kwargs": json.dumps(kwargs.get("kwargs", {})),
        },
    )

- 작업이 시작되기 전에 진행 상태를 PENDING으로 기록.

- 작업 요청과 함께 전달된 task_progress_id를 기반으로 데이터 검증.

 

작업 성공 시 상태 업데이트 및 알림 발송

@task_success.connect
def task_celery_success(sender=None, **kwargs):
	celery_log = CeleryLog.objects.filter(task_id=sender.request.id)
    celery_log.update(
        status=CeleryLogStatusChoices.SUCCESS,
    )
    task_progress = celery_log.first().task_progress
    four_pillar = task_progress.four_pillar
    all_task_progress = TaskProgress.objects.filter(four_pillar=four_pillar, product_name=task_progress.product_name)
    user = four_pillar.child if four_pillar.child is not None else four_pillar.user

    # 모든 작업 완료 시 푸시 알림 전송
    if all(task.status == TaskProgressStatusChoices.COMPLETE for task in TaskProgress.objects.filter(four_pillar=four_pillar)):
        push_log = PushLog.objects.create(
            push_type=PushTypeChoices[task_progress.product_name],
            title=f"[{four_pillar.user.username}]님의 보고서가 완성되었습니다.",
            content="보고서를 확인해보세요!",
            target_id=four_pillar.id,
        )
        push_log.send()

- 작업이 성공적으로 완료되면 상태를 SUCCESS로 업데이트.

- 모든 하위 작업이 완료되면 푸시 알림을 전송하여 사용자에게 즉시 인지시킴.

 

작업 실패 시 상태 업데이트 및 오류 처리 및 환불 처리

@task_failure.connect
def task_celery_failure(sender=None, **kwargs):
    if not sender:
        return

    celery_log = CeleryLog.objects.filter(task_id=kwargs["task_id"])

    celery_log.update(
        status=CeleryLogStatusChoices.FAILURE,
        message=kwargs.get("einfo").exception,
    )
    task_progress = celery_log.first().task_progress
	
    # 실패한 테스크의 환불 처리 및 롤백 처리
    fail_task_progress(task_progress)

- 작업 실패 시 FAILURE 상태로 업데이트하고, 오류 내용을 기록.

- 실패한 작업에 대해 별도의 핸들링 로직 실행.

 

3. UX 개선 효과

 

1. 진행 상태 확인 가능:

유저가 분석 요청 후, 현재 작업이 진행 중인지, 완료되었는지, 실패했는지를 조회 가능.

프론트엔드에서 진행률 UI 제공 (예: “분석 중… 50% 완료”).

2. 즉각적인 피드백 제공:

완료 시 푸시 알림을 통해 사용자 경험 향상.

실패 시 빠른 재시도를 유도할 수 있는 UX 제공.

3. 운영 효율성 증가:

관리자 페이지에서도 각 작업의 상태를 모니터링 가능.

실패한 작업을 분석하고 재처리할 수 있는 로직 추가.

 

이전에는 비동기 작업이 완료될 때까지 유저가 상태를 알 수 없었으나, 이번 개선을 통해 현재 작업 상태를 실시간으로 추적할 수 있는 기능이 추가되었습니다.

사용자는 보고서 생성 진행률을 실시간으로 확인할 수 있게 되었고 UX 적으로 크게 개선이 되었습니다.

개선된 페이지

마이파이 서비스 보러가기 : https://www.mypie.me/

 

마이파이 | 메인

조각들이 모여 완성되는 나의 정체성

www.mypie.me

 

 

추가로 어떻게 celery가 추적이 가능한 지에 대해서 설명드릴게요.

Celery의 시그널(@task_prerun.connect, @task_success.connect, @task_failure.connect)은 비동기 작업의 진행 상태를 추적하는 강력한 도구입니다. 이러한 시그널을 사용하면 Celery 작업의 라이프사이클 이벤트(시작, 성공, 실패)를 감지하고 특정 작업을 수행할 수 있습니다.

 

Celery 시그널 개요

 

Celery는 작업(task)이 실행될 때 발생하는 다양한 이벤트를 추적할 수 있도록 시그널(Signal) 기능을 제공합니다. 이를 통해 특정 작업이 실행되기 전, 완료된 후, 실패했을 때 등의 상태를 감지하고, 원하는 처리를 수행할 수 있습니다.

 

주요 Celery 시그널 종류

1. @task_prerun.connect - 작업이 실행되기 직전에 실행됨.

2. @task_success.connect - 작업이 성공적으로 완료되었을 때 실행됨.

3. @task_failure.connect - 작업이 실패했을 때 실행됨.

4. 기타 시그널

@task_postrun.connect: 작업이 실행된 후(성공/실패 상관없이) 실행됨.

@before_task_publish.connect: 작업이 브로커로 전달되기 전에 실행됨.

@task_retry.connect: 작업이 재시도될 때 실행됨.

 

1. @task_prerun.connect - 작업 실행 직전 (Before Execution)

이 시그널은 Celery 작업이 시작되기 바로 직전에 실행됩니다. 작업이 실행되기 전에 로깅, 인증, 환경 설정 등의 작업을 수행할 수 있습니다.

시그널 트리거 시점: 작업이 Celery worker에서 실행되기 직전.

 

2. @task_success.connect - 작업 성공 후 (After Successful Execution)

이 시그널은 작업이 성공적으로 완료되었을 때 실행됩니다. 이를 통해 성공적인 작업을 로깅하고, 후속 작업(알림 발송, 상태 업데이트 등)을 수행할 수 있습니다.

시그널 트리거 시점:Celery 작업이 정상적으로 완료된 직후.

 

3. @task_failure.connect - 작업 실패 시 (On Task Failure)

이 시그널은 Celery 작업이 실패했을 때 호출됩니다. 실패 원인을 기록하고, 재시도 로직 또는 알림을 트리거하는 데 사용할 수 있습니다.

시그널 트리거 시점:작업 실행 중 예외가 발생하여 실패했을 때.

 

3. @task_postrun.connect - 작업 완료 시 (On Task Done)

이 시그널은 Celery 작업이 종료되었을 때 호출됩니다. 성공 / 실패 여부 상관없이 종료 후 실행됩니다.

시그널 트리거 시점:작업 실행이 완료 되었을 때

'django' 카테고리의 다른 글