본문 바로가기

django

Django에서 API GateWay와 Dynamo DB를 이용한 채팅

Django에서 DynamoDB와 RDS를 조합하여 채팅 시스템을 구축한 방법

 

1. DynamoDB란?

 

📌 DynamoDB의 기본 개념

AWS에서 제공하는 Managed NoSQL 데이터베이스

Key-Value 및 Document Store 형태의 저장 방식

자동 확장(Auto Scaling) 및 높은 가용성을 제공

RDS와 달리 Schema-less 구조 → 유연한 데이터 모델링 가능

 

📌 주요 기능

1. Primary Key 설계

Partition Key: 데이터를 효율적으로 분산하는 키 (예: chat_room_id)

Sort Key: 같은 Partition 내에서 정렬을 위한 키 (예: timestamp)

2. Global Secondary Index(GSI) & Local Secondary Index(LSI)

DynamoDB에서 효율적인 검색을 위해 활용 가능

3. TTL(Time to Live) 설정

일정 시간이 지난 데이터는 자동 삭제 가능

채팅방의 비활성화 시간 등을 기준으로 관리 가능

4. DynamoDB Streams

테이블 변경 사항을 감지하여 Lambda를 트리거할 수 있음

이를 활용해 메시지 동기화 및 실시간 기능 강화 가능

 

2. Django에서 DynamoDB 연동

 

📌 Django에서 DynamoDB를 사용하려면?

Django의 ORM은 RDBMS(RDS)에 최적화되어 있기 때문에 DynamoDB는 직접 관리해야 함.

Boto3(AWS SDK for Python)을 사용하여 DynamoDB에 접근.

 

📌 DynamoDB 설정 과정

1. AWS IAM 권한 설정

DynamoDB에 접근할 수 있는 IAM 사용자를 생성하고, AWS_ACCESS_KEY, AWS_SECRET_KEY 발급

최소한의 권한을 가진 정책을 설정하여 보안 강화

2. DynamoDB 테이블 생성

chat_room_idPartition Key, timestampSort Key로 설정하여 효율적인 조회 가능

DynamoDB 테이블 생성 페이지

3. API Gateway WebSocket API 생성

1️⃣ API Gateway 콘솔 접속

1. AWS Management Console에서 API Gateway 서비스로 이동

2. API 생성 버튼 클릭

3. WebSocket API 선택 후 다음과 같이 설정

API 이름: wavbor-dev-websocket-api

라우팅 선택 표준식: $request.body.action

설명: “Django 기반 실시간 채팅용 WebSocket API”

4. 생성 후 왼쪽 메뉴에서 경로(Route)로 이동하여 기본 엔드포인트 확인

3. WebSocket 이벤트 핸들러 설정

 

1️⃣ $connect 엔드포인트 설정

$connect클라이언트가 WebSocket에 연결할 때 실행됨.

DynamoDB에 Connection ID 저장을 위해 Lambda 연결.

 

4. Django에서 WebSocket API 호출

 

웹소켓 URL을 Django 설정 파일에 추가

WEBSOCKET_URL = f"https://ws.{APP_ENV}.{DOMAIN}"

 

4. Django에서 Boto3를 이용한 연동

import boto3
from django.conf import settings

# DynamoDB 클라이언트 생성
dynamodb = boto3.resource(
    'dynamodb',
    region_name=settings.AWS_REGION,
    aws_access_key_id=settings.AWS_ACCESS_KEY,
    aws_secret_access_key=settings.AWS_SECRET_KEY
)

# 채팅방 정보를 저장할 테이블 객체 가져오기
chat_table = dynamodb.Table('ChatRooms')

 

5. DynamoDB 기반의 채팅 시스템 설계

 

📌 DynamoDB에서 채팅방 정보 저장 구조

DynamoDB에 채팅방 ID만 저장하고, 메시지는 RDS에 저장하는 방식

테이블 설계:

class Chat(BaseModel):
    user = models.ManyToManyField("user.User", blank=True, related_name="user_set", verbose_name="의뢰인")
    artist = models.ManyToManyField("user.User", blank=True, related_name="artist_set", verbose_name="전문가")
    name = models.CharField(max_length=101, null=True, blank=True, verbose_name="채팅방 이름")

    class Meta:
        db_table = "chat"
        verbose_name = "채팅"
        verbose_name_plural = verbose_name
        ordering = ["-updated_at", "-created_at"]
        

class Message(BaseModel):
    chat = models.ForeignKey("chat.Chat", on_delete=models.CASCADE, verbose_name="채팅")
    user = models.ForeignKey("user.User", on_delete=models.SET_NULL, null=True, blank=True, verbose_name="유저")
    type = models.CharField(max_length=1, choices=MessageTypeChoices.choices, verbose_name="메세지 타입")
    text = models.TextField(null=True, blank=True, verbose_name="텍스트")
    image = models.ImageField(max_length=512, upload_to="message/image/", null=True, blank=True, verbose_name="이미지")
    file = models.FileField(max_length=512, upload_to="message/file/", null=True, blank=True, verbose_name="파일")
    audio = models.FileField(max_length=512, upload_to="message/audio/", null=True, blank=True, verbose_name="오디오")
    recording = models.FileField(
        max_length=512, upload_to="message/recording/", null=True, blank=True, verbose_name="음성 녹음"
    )
    recording_time = models.FloatField(default=0.0, verbose_name="음성 녹음 시간")

    class Meta:
        db_table = "message"
        verbose_name = "메세지"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]

 

6. 채팅 메시지 저장 및 채팅방 저장

채팅 메세지는 RDS에 저장하고 채팅방은 Dynamo DB에 저장

# serializer.py
class MessageSerializer(serializers.ModelSerializer):
    chat_id = serializers.IntegerField(write_only=True, label="채팅방 id")
    type = serializers.CharField(write_only=True, label="타입")
    text = serializers.CharField(allow_null=True, required=False, write_only=True, label="텍스트")
    image = serializers.CharField(allow_null=True, required=False, write_only=True, label="이미지")
    file = serializers.CharField(allow_null=True, required=False, write_only=True, label="파일")
    audio = serializers.CharField(allow_null=True, required=False, write_only=True, label="오디오")
    recording = serializers.CharField(allow_null=True, required=False, write_only=True, label="음성 녹음")

    class Meta:
        model = Message
        fields = ["id", "chat_id", "type", "text", "image", "file", "audio", "recording"]

    def validate(self, attrs):
        attrs["user_id"] = self.context["request"].user.id

        try:
            attrs["chat"] = Chat.objects.get(id=attrs.pop("chat_id"))
            user_list = [attrs["chat"].user.all(), attrs["chat"].artist.all()]
            attrs["user_list"] = list(chain(*user_list))

            if attrs["chat"].is_blocked and attrs["type"] != MessageTypeChoices.S:
                raise ValidationError({"chat_id": ["해당 채팅방은 블로킹되어 메세지를 보낼 수 없습니다."]})

            if attrs["user_id"] not in [data.id for data in attrs["user_list"]]:
                raise ValidationError({"chat_id": ["채팅방에 참여하고 있는 사용자가 아닙니다."]})

        except Chat.DoesNotExist:
            raise ValidationError({"chat_id": ["채팅방이 존재하지 않습니다."]})

        return attrs

    @transaction.atomic
    def create(self, validated_data):
        user_id = validated_data.get("user_id")
        chat = validated_data.get("chat")
        user_list = validated_data.pop("user_list", None)

        if validated_data.get("image"):
            validated_data["image"] = parse.unquote(validated_data["image"])
        if validated_data.get("file"):
            validated_data["file"] = parse.unquote(validated_data["file"])
        if validated_data.get("audio"):
            validated_data["audio"] = parse.unquote(validated_data["audio"])
        if validated_data.get("recording"):
            validated_data["recording"] = parse.unquote(validated_data["recording"])

            s3_client = boto3.client("s3")
            bucket_name = f"wavbor-{settings.APP_ENV}-bucket"

            response = s3_client.get_object(Bucket=bucket_name, Key="".join(["_media/", validated_data["recording"]]))
            audio_data = BytesIO(response["Body"].read())

            audio = AudioSegment.from_file(audio_data)
            duration_seconds = len(audio) / 1000

            validated_data["recording_time"] = duration_seconds

        # message 생성
        instance = Message.objects.create(**validated_data)

        # message가 속한 채팅방에 소속되어 있는 사용자들의 id를 가져와 현재 메세지를 작성한 사용자 이외의 사람들에게 메세지를 보낸다.
        for user in user_list:
            instance.send(user, user_id)

        return instance

 

# models.py
class Message(BaseModel):
    chat = models.ForeignKey("chat.Chat", on_delete=models.CASCADE, verbose_name="채팅")
    user = models.ForeignKey("user.User", on_delete=models.SET_NULL, null=True, blank=True, verbose_name="유저")

    class Meta:
        db_table = "message"
        verbose_name = "메세지"
        verbose_name_plural = verbose_name
        ordering = ["-created_at"]
        
    def send(self, receiver_user, login_user_id):

        db = boto3.client("dynamodb", region_name=settings.AWS_REGION)

		# Dynamo DB에서 connection ID 조회
        # UserIdIndex라는 GSI(Global Secondary Index)를 사용하여 user_id 기반으로 검색
        # 즉, 해당 유저가 현재 WebSocket에 연결되어 있는지 확인하는 과정
        response = db.query(
            TableName=f"{settings.PROJECT_NAME}-{settings.APP_ENV}-connection",
            IndexName="UserIdIndex",
            KeyConditionExpression="user_id = :user_id",
            ExpressionAttributeValues={
                ":user_id": {"N": str(receiver_user.id)},
            },
        )
        # AWS API Gateway WebSocket을 관리하는 클라이언트 생성
        apigw = boto3.client(
            "apigatewaymanagementapi",
            region_name=settings.AWS_REGION,
            endpoint_url=settings.WEBSOCKET_URL,
        )
        for item in response["Items"]:
            chat = self.chat
            user = self.user
            #  연결된 WebSocket의 Connection ID 가져오기
            connection_id = item["connection_id"]["S"]

            # 연결이 되어 있는 커낵션에는 메세지를 보내고 연결되지 않은 커낵션은 삭제해줌 -> 연결되지 않은 커낵션이 남아있어 에러 발생.
            # 메시지를 JSON 형식으로 변환 후 API Gateway WebSocket을 통해 전송
            try:
                apigw.post_to_connection(
                    ConnectionId=connection_id,
                    Data=json.dumps(
                        {
                            "isWavborChat": is_wavbor_chat,
                            "messageId": self.id,
                            "chatId": self.chat_id,
                            "status": self.chat.status,
                            "user": user,
                            "type": self.type,
                            "text": self.text,
                            "file": None
                            if not self.file
                            else "".join(
                                [f"https://{settings.AWS_S3_CUSTOM_DOMAIN}/_media/", str(self.file)],
                            ),
                            "audio": None
                            if not self.audio
                            else "".join(
                                [f"https://{settings.AWS_S3_CUSTOM_DOMAIN}/_media/", str(self.audio)],
                            ),
                            "recording": None
                            if not self.recording
                            else "".join(
                                [f"https://{settings.AWS_S3_CUSTOM_DOMAIN}/_media/", str(self.recording)],
                            ),
                            "recordingTime": None if not self.recording_time else self.recording_time,
                            "isMine": True if login_user_id and receiver_user.id == login_user_id else False,
                            "createdAt": str(timezone.localtime(self.created_at)).replace(" ", "T"),
                        }
                    ),
                )

            except Exception as e:
                logger.error(f"웹소켓 커낵션 에러 발생 (커낵션 삭제) -> {e}")

                try:
	                # 유효하지 않은 Connection ID 정리하여 불필요한 WebSocket 요청을 방지
                    db.delete_item(
                        TableName=f"{settings.PROJECT_NAME}-{settings.APP_ENV}-connection",
                        Key={"connection_id": {"S": connection_id}},
                    )

                except Exception as e:
                    logger.error(f"dynamoDB connection_id 삭제 시 에러 발생 -> {e}")

 

위의 코드를 정리하면 다음과 같다.

DynamoDB에서 WebSocket 연결 정보(Connection ID) 조회

AWS API Gateway WebSocket을 이용하여 실시간 메시지 전송

유효하지 않은 Connection ID는 DynamoDB에서 자동 삭제

 

⚠️ 고려해야 할 점

1. DynamoDB에서 WebSocket Connection ID가 잘못 남아있는 경우

Connection ID를 정기적으로 정리하는 로직을 추가하면 불필요한 요청을 줄일 수 있음.

WebSocket 종료 이벤트를 감지하는 로직 추가 가능.

2. AWS API Gateway WebSocket 한계

API Gateway WebSocket은 초당 500개 연결 제한이 있음.

초과하는 경우 Redis Pub/Sub 또는 Firebase Cloud Messaging(Firebase FCM) 같은 대안을 고려.

3. 메시지 전송 실패 시 재시도 로직 추가 필요

네트워크 문제로 인해 메시지 전송 실패 시 **재시도 로직 (Retry Mechanism)**을 적용하면 안정성이 증가.