CLOUD/[P] 실시간 채팅 프로그램

[WebSocket/AWS] WebSocket API - Lambda 함수 작성

alsruds 2023. 4. 13. 04:30


 

① 기본 구성

2023.04.06 - [클라우드/Public Cloud] - [AWS] API Gateway : WebSocket API 이용하기

 

[AWS] API Gateway : WebSocket API 이용하기

안뇽하세요~ 지금 매우 신이 난 상태에용~ 왜냐면.. AWS WebSocket API를 이용하고 싶었는데ㅠㅠㅠ 자꾸 error: Unexpected server response: 500 이런 거 나오고.. ㅠㅠ ㅎㅎ 근데 그냥 cmd 창 껐다 키니까 되더라

alsrudalsrudalsrud.tistory.com

》 변경 : DynamoDB 대신 RDS 연결

 

② Lambda 코드 작성

※ zip 파일로 import 하려는 모듈 같이 업로드 해주어야 함

♨ Connect

const AWS = require('aws-sdk');
const mysql = require('mysql2'); # Promise 반환을 지원하는 mysql2 라이브러리로 변경

// Connection RDS MySQL
const connection = mysql.createConnection({
  host: 'RDS endpoint',
  port: '3306',
  user: 'admin',
  password: 'qwer1234',
  database: 'rds01',
});

exports.handler = async function (event, context) {
  const connectionId = event.requestContext.connectionId;
  const roomName = parseInt(event.queryStringParameters.roomName);
  
  // ApiGatewayManagementApi
  const callbackAPI = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
  });
  
  // roomName, connectionId 저장 : class ConnectionId
  try {
    // insert data
    let params = {
      roomName : roomName,
      cid : connectionId,
    };
    
    // insert data into table
    const save_sql = 'insert into chat01_connectionid set ?';
    await connection.promise().query(save_sql, params);
  } catch (error) {
    console.error(`error:: ${error}`); # 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있을 시 오류날 확률이 매우 높음

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }

  return {
    statusCode: 200,
  };
};

같은 채팅방 입장 : roomName 이용

 

sendMessage  

const AWS = require('aws-sdk');
const mysql = require('mysql2'); # Promise 반환을 지원하는 mysql2 라이브러리로 변경

// Connection RDS MySQL
let connection = mysql.createConnection({
  host: 'RDS endpoint',
  port: '3306',
  user: 'admin',
  password: 'qwer1234',
  database: 'rds01',
});

exports.handler = async function (event, context) {
  console.log('메세지 이벤트 실행');

  // ApiGatewayManagementApi
  const callbackAPI = new AWS.ApiGatewayManagementApi({
    apiVersion: '2018-11-29',
    endpoint: event.requestContext.domainName + '/' + event.requestContext.stage,
  });

  // Django에서 보낸 채팅 메세지
  let message = JSON.parse(event.body).message;
  const to_user = parseInt(JSON.parse(event.body).to_user);
  const from_user = parseInt(JSON.parse(event.body).from_user);
  const room_id = parseInt(JSON.parse(event.body).room_id);
  
  // 원래 메세지 DB 에서 불러오기
  let sql = 'select cid from chat01_connectionid where roomName=' + room_id;
  const return_sql = 'select message from chat01_chatting where room_id =' + room_id;
  try {
    // 저장되어 있던 DB array 저장
    const [results] = await connection.promise().query(return_sql);
    
    let return_arr = [];
    for (let i = 0; i < results.length; i++) {
      return_arr.push('0'+results[i]['message']);
    }
    console.log(return_arr);

    // 메세지 전송
    const [row, fields] = await connection.promise().query(sql);

    for (let j = 0; j < return_arr.length; j++) {
      const ReturnMessages = row.map(({ cid: ConnectionId }) => {
        return callbackAPI.postToConnection({ ConnectionId, Data: return_arr[j] }).promise();
      });
      console.log(return_arr[j]);
      await Promise.all(ReturnMessages);
    }
  } catch (error) {
    console.error(`error:: ${error}`); # 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있을 시 오류날 확률이 매우 높음

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }
  
  // DB에 새로운 메세지 저장 : class Chatting
  try {
    // insert data
    let params = {
      room_id : room_id,
      message : message,
      to : to_user,
      from_user : from_user,
    };
    
    // insert data into table
    let insert_sql = 'insert into chat01_chatting set ?';
    await connection.promise().query(insert_sql, params);
  } catch (error) {
    console.error(`error:: ${error}`); // 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있어 오류날 확률이 매우 높으므로 DB 데이터 정리 후 실행해야할 듯

    return {
      statusCode: 500,
      error_message: error.message,
    };
  }
  
  // 새로운 메세지 전송
  sql = 'select cid from chat01_connectionid where roomName=' + room_id;
  try {
    const [rows, fields] = await connection.promise().query(sql);
    
    message = '1'+ message;
    // 기존 코드에서 오류가 있었던 이유는 DB 컬럼명이 cid인데, ConnectionId로 불러와 제대로 참조하지 못함.
    // cid 컬럼 변수 명을 ConnectionId로 변경
    const sendMessages = rows.map(({ cid: ConnectionId }) => {
      return callbackAPI.postToConnection({ ConnectionId, Data: message }).promise();
    });
    
    // Array.map 메소드는 동기함수이고, callbackAPI... 메소드는 비동기함수라 map 내부 콜백에 async-await을 사용해도 소용없음.
    // Promise.all로 비동기 함수가 모두 실행될 수 있도록 binding해주고, await으로 함수 실행을 보장시켜줘야함.
    const result = await Promise.all(sendMessages);
    console.log(result);
  } catch (error) {
    console.error(`error:: ${error}`); // 현재 DB에 웹 소켓 연결이 끊긴 Connection_id가 계속 들어있어 오류날 확률이 매우 높으므로 DB 데이터 정리 후 실행해야할 듯

    return {
      statusCode: 500,
      error_message: error.message,
    };
  } //finally {
    // 이 람다함수가 매우 자주 실행되는 경우 Connection을 끊지 않고 계속 사용하는 것도 고려
    //await connection.end();
  //}

  return { statusCode: 200 };
};

지난 채팅 내역 불러오기

새로운 채팅 저장 & 화면에 출력

같은 채팅방(roomName) 을 가진 connectionId 에만 메세지 전송

 

♨ Disconnect  

import json
import pymysql

def lambda_handler(event, context):
    connectionId = event['requestContext']['connectionId']
    conn = pymysql.connect(
        host='RDS endpoint',
        user='user01',
        password='qwer1234',
        db='rds01',
        charset='utf8'
    )
    cur = conn.cursor()
    sql = "DELETE FROM chat01_connectionid WHERE cid = '" + connectionId + "';";
    cur.execute(sql)
    conn.commit()
    #conn.close()
    
    # TODO implement
    return {
        'statusCode': 200,
    }

Node.js 로 하다가..^^ 안돼서 Python 으로 갈았습니다

채팅방과 연결이 끊어진 connectionId 삭제

 

③ WebSocket API(Lambda) 를 호출하는 Django 코드

<script type="text/javascript">
let roomName = "{{ room_id | escapejs }}";
let to = "{{ to | escapejs }}";

# websocket api url
let chatSocket = new WebSocket(
    `[websocket api 주소]?roomName=`+roomName
);

# websocket connect
chatSocket.onopen = (e) => {
    console.log(e);
};

# websocket 에서 전달받은 메세지 출력
let cnt = 0;
chatSocket.onmessage = (e) => {
    console.log(e.data);
    let message = e.data
    if (message[0] == '0' && cnt == 0) {	# 기존 메세지
        document.querySelector("#chat-log").value += (message.substring(1) + '\n');
    }
    if (message[0] == '1') {	# 새로운 메세지
        cnt = 1;
        document.querySelector("#chat-log").value += (message.substring(1) + '\n');
    }
};

# websocket disconnect
chatSocket.onclose = (e) => {
    console.error('Chat socket closed unexpectedly');
};

# enter 키로 메세지 전송
document.querySelector("#chat-message-input").focus();
document.querySelector("#chat-message-input").addEventListener("keyup",(e) => {
    if (e.keyCode === 13) {
        document.querySelector("#chat-message-submit").click();
    }
});
# send 버튼으로 메세지 전송
document.querySelector("#chat-message-submit").addEventListener("click", (e) => {
    let messageInputDom = document.querySelector("#chat-message-input");
    let message = messageInputDom.value;
    chatSocket.send(JSON.stringify({
        'action': 'sendmessage',
        'message' : '{{ request.session.user_id }} : ' + message,
        'to_user' : to,
        'from_user' : {{ request.session.user_uid }},
        'room_id' : roomName
    }));
    messageInputDom.value = '';
});

</script>