Search

EC2

dax 클러스터가 있는 vpc에 인스턴스 생성
반드시 같은 VPC의 라우팅 가능한 범위 내에 있는 인스턴스만 DAX에 접근 가능함
sudo yum install pip -y pip3 install amazon-dax-client pip3 install boto3
Shell
복사
import boto3 import time import uuid import argparse from datetime import datetime import amazondax from boto3.dynamodb.conditions import Key TABLE_NAME = "GlobalRequestLog" REGION = "us-east-1" parser = argparse.ArgumentParser() parser.add_argument( '--endpoint-url', required=False, help="DAX endpoint URL (e.g., dax://<cluster>.clustercfg.dax-clusters.<region>.amazonaws.com:8111)") args = parser.parse_args() DAX_ENDPOINT = args.endpoint_url dynamodb = boto3.resource('dynamodb', region_name=REGION) table = dynamodb.Table(TABLE_NAME) def write_sample_data(dyn_resource, n=5): table = dyn_resource.Table(TABLE_NAME) print("\n[INSERTING SAMPLE DATA]") for i in range(n): request_id = str(uuid.uuid4()) user_id = f"user{i % 2}" timestamp = datetime.utcnow().isoformat() region = REGION service = f"service{i % 3}" item = { 'request_id': request_id, 'timestamp': timestamp, 'user_id': user_id, 'region': region, 'service': service } table.put_item(Item=item) print(f" user_id={user_id} | service={service} | request_id={request_id}") time.sleep(0.3) def benchmark_gsi(dyn_resource, user_id, label="GSI"): table = dyn_resource.Table(TABLE_NAME) start = time.time() response = table.query( IndexName='UserIndexGSI', KeyConditionExpression=Key('user_id').eq(user_id) ) duration = time.time() - start return duration, len(response['Items']) def benchmark_lsi(dyn_resource, request_id, label="LSI"): table = dyn_resource.Table(TABLE_NAME) start = time.time() response = table.query( KeyConditionExpression=Key('request_id').eq(request_id) ) duration = time.time() - start return duration, len(response['Items']) def get_dax_resource(): if not DAX_ENDPOINT: print("[ERROR] DAX endpoint not provided. Use --endpoint-url") return None try: dax_resource = amazondax.AmazonDaxClient.resource(endpoint_url=DAX_ENDPOINT, region_name=REGION) return dax_resource except Exception as e: print("[ERROR] DAX 연결 실패:", e) return None def print_summary(results): print("\n[Performance Summary] (seconds)") print(f"{'Method':<15} {'Query Type':<10} {'Items':<6} {'Time'}") print("-" * 42) for method, query_type, items, t in results: print(f"{method:<15} {query_type:<10} {items:<6} {t:.4f}") print("-" * 42) if __name__ == "__main__": user_id = "user0" results = [] if DAX_ENDPOINT: print(f"Using DAX at {DAX_ENDPOINT}") dax_resource = get_dax_resource() if dax_resource: with dax_resource: write_sample_data(dax_resource) request_id = input("Enter a request_id to test LSI: ") t1, c1 = benchmark_gsi(dax_resource, user_id, "GSI") results.append(("DAX", "GSI (cold)", c1, t1)) t2, c2 = benchmark_gsi(dax_resource, user_id, "GSI") results.append(("DAX", "GSI (warm)", c2, t2)) t3, c3 = benchmark_lsi(dax_resource, request_id, "LSI") results.append(("DAX", "LSI", c3, t3)) else: print("Using regular Boto3 DynamoDB client") write_sample_data(dynamodb) request_id = input("Enter a request_id to test LSI: ") t1, c1 = benchmark_gsi(dynamodb, user_id, "GSI") results.append(("DynamoDB", "GSI (cold)", c1, t1)) t2, c2 = benchmark_gsi(dynamodb, user_id, "GSI") results.append(("DynamoDB", "GSI (warm)", c2, t2)) t3, c3 = benchmark_lsi(dynamodb, request_id, "LSI") results.append(("DynamoDB", "LSI", c3, t3)) print_summary(results)
Python
복사
# DAX 사용 테스트 python3 dynamodb.py --endpoint-url <dax_endpoint> # DynamoDB 사용 테스트 python3 dynamodb.py
Shell
복사