[AWS]Analytics on AWS 워크샵 실습하기(5)
이전 실습 : [AWS]Analytics on AWS 워크샵 실습하기(4)
마지막 실습 시간입니다. 이번 시간은 매우 구체적인 사용 사례 예제로 Lambda 함수를 생성 해 볼 것입니다.
7. 람다(Lambda) 함수
1. Lambda 함수 생성
우리가 이번에 작성할 람다 함수는 Athena가 S3의 processsed data에서 Hits 별 Top 5 Popular Songs를 쿼리하고 가져 오는 코드를 호스팅하는 것입니다.
AWS Lambda 콘솔로 이동합니다.리전이 버지니아 북주(us-east-1)인지 체크해줍니다. 그리고 함수 생성을 클릭해줍니다.
함수 생성 단계는 다음과 같이 작성합니다. 그리고 함수 생성을 눌러 마무리합니다.
2. Lambda 함수 작성
boto3를 사용하여 Athena 클라이언트에 액세스합니다. 함수 Code 섹션까지 아래로 스크롤하고 lambda_function.py를 더블 클릭하여 기존 코드를 아래의 Python 코드로 바꿉니다.
boto와 boto3 Athena API 메서드에 대한 자세한 사항을 아래 링크를 참조하면 도움이 될 것 같습니다.
https://boto3.amazonaws.com/v1/documentation/api/latest/index.html?id=docs_gateway
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/athena.html
import boto3
import time
import os
# Environment Variables
DATABASE = os.environ['DATABASE']
TABLE = os.environ['TABLE']
# Top X Constant
TOPX = 5
# S3 Constant
S3_OUTPUT = f's3://{os.environ["BUCKET_NAME"]}/query_results/'
# Number of Retries
RETRY_COUNT = 10
def lambda_handler(event, context):
client = boto3.client('athena')
# query variable with two environment variables and a constant
query = f"""
SELECT track_name as \"Track Name\",
artist_name as \"Artist Name\",
count(1) as \"Hits\"
FROM {DATABASE}.{TABLE}
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT {TOPX};
"""
response = client.start_query_execution(
QueryString=query,
QueryExecutionContext={ 'Database': DATABASE },
ResultConfiguration={'OutputLocation': S3_OUTPUT}
)
query_execution_id = response['QueryExecutionId']
# Get Execution Status
for i in range(0, RETRY_COUNT):
# Get Query Execution
query_status = client.get_query_execution(
QueryExecutionId=query_execution_id
)
exec_status = query_status['QueryExecution']['Status']['State']
if exec_status == 'SUCCEEDED':
print(f'Status: {exec_status}')
break
elif exec_status == 'FAILED':
raise Exception(f'STATUS: {exec_status}')
else:
print(f'STATUS: {exec_status}')
time.sleep(i)
else:
client.stop_query_execution(QueryExecutionId=query_execution_id)
raise Exception('TIME OVER')
# Get Query Results
result = client.get_query_results(QueryExecutionId=query_execution_id)
print(result['ResultSet']['Rows'])
# Function can return results to your application or service
# return result['ResultSet']['Rows']
3. 환경 변수
이제 구성 탭에서 환경 변수를 클릭하고 편집을 눌러 아래의 3가지 환경 변수를 추가하고 저장합니다.
- Key: DATABASE, Value: analyticsworkshopdb
- Key: TABLE, Value: processed_data
- Key: BUCKET_NAME, Value: yourname-analytics-workshop-bucket
이번엔 좌측 일반 구성을 선택 후 편집에 들어갑니다. 메모리는 12MB로 놔두고 제한 시간을 10초로 변경합니다. 그리고 저장을 누릅니다.
4. 실행 역할
좌측 권한 탭으로 들어가서 실행 역할 아래의 역할 이름을 클릭하면 새로운 IAM 탭이 열립니다.
그 후 우측 '권한 추가' -> '정책 연결'을 눌러 아래 두 정책을 연결시킵니다.
- AmazonS3FullAccess
- AmazonAthenaFullAccess
이 정책들이 추가 되었으면 탭을 닫습니다.
5. 테스트 이벤트 구성
이제 함수를 테스트 할 준비가 되었습니다. 함수 Code 섹션에서 Deploy를 클릭하여 먼저 함수를 배포합니다.
다음으로 새로 생성된 람다 함수의 실행 결과를 확인하기 위해 Dummy 테스트 이벤트를 구성해 보겠습니다.
Deploy 옆에 Test를 누르면 테스트 이벤트 구성 화면이 나타납니다. 다음과 같이 작성하고 나머지는 그대로 두고 저장 누릅니다.
그 후 Test를 다시 클릭 하면 Json 창에 Execution result가 나타나면서 Json 형식의 출력을 볼 수 있습니다.
6. Athena를 통한 확인
Athena 콘솔로 이동해서 Athena를 통해 결과를 확인해 보겠습니다.
왼쪽 패널에서, Database 드롭 다운에서 analyticsworkshopdb 선택하고 아래의 쿼리를 실행합니다.
SELECT track_name as "Track Name",
artist_name as "Artist Name",
count(1) as "Hits"
FROM analyticsworkshopdb.processed_data
GROUP BY 1,2
ORDER BY 3 DESC
LIMIT 5;
마지막으로 aws redshift가 있는데 설정 중 요금 발생하는 것 같아 일단 미루었습니다. 관심있으신 분은 아래 링크로 해보시기 바랍니다.