dax 클러스터가 있는 vpc에 인스턴스 생성
반드시 같은 VPC의 라우팅 가능한 범위 내에 있는 인스턴스만 DAX에 접근 가능함
sudo yum install pip -y
pip3 install amazon-dax-client
pip3 install boto3
Shell
복사
import boto3
import time
import uuid
import argparse
from datetime import datetime
import amazondax
from boto3.dynamodb.conditions import Key
TABLE_NAME = "GlobalRequestLog"
REGION = "us-east-1"
parser = argparse.ArgumentParser()
parser.add_argument(
'--endpoint-url', required=False,
help="DAX endpoint URL (e.g., dax://<cluster>.clustercfg.dax-clusters.<region>.amazonaws.com:8111)")
args = parser.parse_args()
DAX_ENDPOINT = args.endpoint_url
dynamodb = boto3.resource('dynamodb', region_name=REGION)
table = dynamodb.Table(TABLE_NAME)
def write_sample_data(dyn_resource, n=5):
table = dyn_resource.Table(TABLE_NAME)
print("\n[INSERTING SAMPLE DATA]")
for i in range(n):
request_id = str(uuid.uuid4())
user_id = f"user{i % 2}"
timestamp = datetime.utcnow().isoformat()
region = REGION
service = f"service{i % 3}"
item = {
'request_id': request_id,
'timestamp': timestamp,
'user_id': user_id,
'region': region,
'service': service
}
table.put_item(Item=item)
print(f" user_id={user_id} | service={service} | request_id={request_id}")
time.sleep(0.3)
def benchmark_gsi(dyn_resource, user_id, label="GSI"):
table = dyn_resource.Table(TABLE_NAME)
start = time.time()
response = table.query(
IndexName='UserIndexGSI',
KeyConditionExpression=Key('user_id').eq(user_id)
)
duration = time.time() - start
return duration, len(response['Items'])
def benchmark_lsi(dyn_resource, request_id, label="LSI"):
table = dyn_resource.Table(TABLE_NAME)
start = time.time()
response = table.query(
KeyConditionExpression=Key('request_id').eq(request_id)
)
duration = time.time() - start
return duration, len(response['Items'])
def get_dax_resource():
if not DAX_ENDPOINT:
print("[ERROR] DAX endpoint not provided. Use --endpoint-url")
return None
try:
dax_resource = amazondax.AmazonDaxClient.resource(endpoint_url=DAX_ENDPOINT, region_name=REGION)
return dax_resource
except Exception as e:
print("[ERROR] DAX 연결 실패:", e)
return None
def print_summary(results):
print("\n[Performance Summary] (seconds)")
print(f"{'Method':<15} {'Query Type':<10} {'Items':<6} {'Time'}")
print("-" * 42)
for method, query_type, items, t in results:
print(f"{method:<15} {query_type:<10} {items:<6} {t:.4f}")
print("-" * 42)
if __name__ == "__main__":
user_id = "user0"
results = []
if DAX_ENDPOINT:
print(f"Using DAX at {DAX_ENDPOINT}")
dax_resource = get_dax_resource()
if dax_resource:
with dax_resource:
write_sample_data(dax_resource)
request_id = input("Enter a request_id to test LSI: ")
t1, c1 = benchmark_gsi(dax_resource, user_id, "GSI")
results.append(("DAX", "GSI (cold)", c1, t1))
t2, c2 = benchmark_gsi(dax_resource, user_id, "GSI")
results.append(("DAX", "GSI (warm)", c2, t2))
t3, c3 = benchmark_lsi(dax_resource, request_id, "LSI")
results.append(("DAX", "LSI", c3, t3))
else:
print("Using regular Boto3 DynamoDB client")
write_sample_data(dynamodb)
request_id = input("Enter a request_id to test LSI: ")
t1, c1 = benchmark_gsi(dynamodb, user_id, "GSI")
results.append(("DynamoDB", "GSI (cold)", c1, t1))
t2, c2 = benchmark_gsi(dynamodb, user_id, "GSI")
results.append(("DynamoDB", "GSI (warm)", c2, t2))
t3, c3 = benchmark_lsi(dynamodb, request_id, "LSI")
results.append(("DynamoDB", "LSI", c3, t3))
print_summary(results)
Python
복사
# DAX 사용 테스트
python3 dynamodb.py --endpoint-url <dax_endpoint>
# DynamoDB 사용 테스트
python3 dynamodb.py
Shell
복사

