Search

Scaler

pip3 install -r .\requirements.txt
Shell
복사
ECS Scaler
python3 .\ecs_autoscaler.py --cluster apdev-cluster --region ap-northeast-2
PowerShell
복사
Traffic Pattern
python3 .\rps_monitor.py
PowerShell
복사
Ngrep Loader
python3 .\ecs_ngrep_gui.py
PowerShell
복사
WAF Analysis
python3 .\waf_gui.py
PowerShell
복사
import boto3 import time from datetime import datetime, timedelta, UTC from rich.console import Console from rich.table import Table from rich.live import Live from rich.panel import Panel from rich.layout import Layout import threading import sys import platform # Windows-compatible keyboard input if platform.system() == 'Windows': import msvcrt else: import select import tty import termios class ECSAutoscaler: def __init__(self, cluster_name, region='ap-northeast-2', log_group_prefix=None, min_tasks=1): self.ecs = boto3.client('ecs', region_name=region) self.cloudwatch = boto3.client('cloudwatch', region_name=region) self.elbv2 = boto3.client('elbv2', region_name=region) self.logs = boto3.client('logs', region_name=region) self.autoscaling = boto3.client('autoscaling', region_name=region) self.ec2 = boto3.client('ec2', region_name=region) self.cluster_name = cluster_name self.log_group_prefix = log_group_prefix or f"/aws/ecs/{cluster_name}" self.min_tasks = min_tasks self.console = Console() self.service_data = {} self.node_data = {} self.last_update = "" self.scaling_history = {} self.last_scale_out = {} # Track last scale-out time per service self.last_scale_in = {} # Track last scale-in time per service self.target_group_cache = {} # Cache target group ARNs self.scale_in_counter = {} # Track consecutive low CPU readings self.cpu_history = {} # Track CPU readings for 2-cycle monitoring self.last_desired_count = {} # Track last known desired count self.manual_modification_time = {} # Track manual modification timestamps self.cpu_metric_mode = 'max' # 'max', 'avg', or 'off' - default to max self.keyboard_thread = None def get_services(self): response = self.ecs.list_services(cluster=self.cluster_name) return [arn.split('/')[-1] for arn in response['serviceArns']] def start_keyboard_listener(self): """Start keyboard listener in a separate thread""" def listen_for_keys(): if platform.system() == 'Windows': # Windows version using msvcrt while True: if msvcrt.kbhit(): key = msvcrt.getch().decode('utf-8').lower() if key == 'm': self.toggle_cpu_metric() elif key == 'q': break time.sleep(0.1) else: # Unix/Linux version old_settings = termios.tcgetattr(sys.stdin) try: tty.setraw(sys.stdin.fileno()) while True: if select.select([sys.stdin], [], [], 0.1)[0]: key = sys.stdin.read(1) if key.lower() == 'm': self.toggle_cpu_metric() elif key.lower() == 'q': break finally: termios.tcsetattr(sys.stdin, termios.TCSADRAIN, old_settings) self.keyboard_thread = threading.Thread(target=listen_for_keys, daemon=True) self.keyboard_thread.start() def toggle_cpu_metric(self): """Toggle between max, avg, and off CPU metrics""" if self.cpu_metric_mode == 'max': self.cpu_metric_mode = 'avg' elif self.cpu_metric_mode == 'avg': self.cpu_metric_mode = 'off' else: # 'off' self.cpu_metric_mode = 'max' def get_cpu_value(self, service_name): """Get CPU value based on current mode (max or avg)""" if service_name in self.service_data: if self.cpu_metric_mode == 'max': return self.service_data[service_name]['cpu_max'] else: return self.service_data[service_name]['cpu_avg'] return 0 def get_cpu_data(self, service_name): end_time = datetime.now(UTC) start_time = end_time - timedelta(minutes=5) # Reduced to 5 minutes for more recent data response = self.cloudwatch.get_metric_statistics( Namespace='AWS/ECS', MetricName='CPUUtilization', Dimensions=[ {'Name': 'ServiceName', 'Value': service_name}, {'Name': 'ClusterName', 'Value': self.cluster_name} ], StartTime=start_time, EndTime=end_time, Period=60, # 1 minute intervals for more granular data Statistics=['Maximum', 'Average'] ) history = [] current_max, current_avg = 0, 0 if response['Datapoints']: sorted_data = sorted(response['Datapoints'], key=lambda x: x['Timestamp']) # 히스토리 데이터 생성 for dp in sorted_data: history.append({ 'time': dp['Timestamp'].strftime("%H:%M"), 'max': dp['Maximum'], 'avg': dp['Average'] }) # 최신 값들 (테이블용) recent_data = sorted_data[-2:] if len(sorted_data) >= 2 else sorted_data if recent_data: current_max = max(dp['Maximum'] for dp in recent_data) current_avg = sum(dp['Average'] for dp in recent_data) / len(recent_data) # 캐시된 값 사용 (데이터가 없을 때) if not response['Datapoints'] and service_name in self.service_data: current_max = self.service_data[service_name]['cpu_max'] current_avg = self.service_data[service_name]['cpu_avg'] return current_max, current_avg, history[-10:] def get_service_details(self, service_name): response = self.ecs.describe_services( cluster=self.cluster_name, services=[service_name] ) service = response['services'][0] return { 'desired': service['desiredCount'], 'running': service['runningCount'], 'pending': service['pendingCount'], 'status': service['status'], 'task_definition': service['taskDefinition'].split('/')[-1] } def get_memory_utilization(self, service_name): end_time = datetime.now(UTC) start_time = end_time - timedelta(minutes=1) response = self.cloudwatch.get_metric_statistics( Namespace='AWS/ECS', MetricName='MemoryUtilization', Dimensions=[ {'Name': 'ServiceName', 'Value': service_name}, {'Name': 'ClusterName', 'Value': self.cluster_name} ], StartTime=start_time, EndTime=end_time, Period=60, Statistics=['Maximum'] ) if response['Datapoints']: return max(dp['Maximum'] for dp in response['Datapoints']) if service_name in self.service_data: return self.service_data[service_name].get('memory', 0) return 0 def get_scaling_progress(self, service_name, desired, running, pending): if service_name not in self.scaling_history: return "Stable", "green" history = self.scaling_history[service_name] target = history['target'] if desired == running and pending == 0: if desired == target: return "Complete", "green" else: return "Stable", "green" elif running < desired: progress = int((running / desired) * 100) if desired > 0 else 0 if pending > 0: return f"Scaling {progress}%", "yellow" else: return f"Starting {progress}%", "bright_yellow" elif running > desired: return "Draining", "red" else: return "In Progress", "yellow" def detect_manual_modification(self, service_name, current_desired): """Detect if desired count was manually modified""" if service_name not in self.last_desired_count: self.last_desired_count[service_name] = current_desired return False last_known = self.last_desired_count[service_name] # Check if desired count changed without our scaling action if current_desired != last_known: # Check if this change was from our recent scaling action if service_name in self.scaling_history: our_target = self.scaling_history[service_name]['target'] our_time = self.scaling_history[service_name]['timestamp'] # If current desired matches our target and it's recent (within 10 seconds), it's our change if current_desired == our_target and (datetime.now() - our_time).total_seconds() < 10: self.last_desired_count[service_name] = current_desired return False # This is a manual modification self.manual_modification_time[service_name] = datetime.now() self.last_desired_count[service_name] = current_desired return True return False def is_in_manual_cooldown(self, service_name): """Check if service is in manual modification cooldown (60 seconds)""" if service_name not in self.manual_modification_time: return False cooldown_seconds = 60 time_since_manual = (datetime.now() - self.manual_modification_time[service_name]).total_seconds() return time_since_manual < cooldown_seconds if service_name not in self.last_scale_out: return True last_scale_time = self.last_scale_out[service_name] cooldown_seconds = 120 # 2 minutes cooldown time_since_last_scale = (datetime.now() - last_scale_time).total_seconds() return time_since_last_scale >= cooldown_seconds def can_scale_out(self, service_name): if service_name not in self.last_scale_out: return True last_scale_time = self.last_scale_out[service_name] cooldown_seconds = 120 # 2 minutes cooldown time_since_last_scale = (datetime.now() - last_scale_time).total_seconds() return time_since_last_scale >= cooldown_seconds def can_scale_in(self, service_name): if service_name not in self.last_scale_in: return True last_scale_time = self.last_scale_in[service_name] cooldown_seconds = 60 time_since_last_scale = (datetime.now() - last_scale_time).total_seconds() return time_since_last_scale >= cooldown_seconds def scale_service(self, service_name, desired_count, is_scale_out=False): self.ecs.update_service( cluster=self.cluster_name, service=service_name, desiredCount=desired_count ) self.scaling_history[service_name] = { 'target': desired_count, 'timestamp': datetime.now() } def get_cpu_history(self, service_name): # 이제 캐시된 데이터 사용 if service_name in self.service_data and 'cpu_history' in self.service_data[service_name]: return self.service_data[service_name]['cpu_history'] return [] def get_running_ec2_instances(self): """Get running EC2 instances in the cluster's ASG""" try: # Get container instances from ECS cluster response = self.ecs.list_container_instances(cluster=self.cluster_name) if not response['containerInstanceArns']: return {} # Get detailed info about container instances instances_response = self.ecs.describe_container_instances( cluster=self.cluster_name, containerInstances=response['containerInstanceArns'] ) # Get EC2 instance IDs instance_ids = [inst['ec2InstanceId'] for inst in instances_response['containerInstances']] if not instance_ids: return {} # Get EC2 instance details ec2_response = self.ec2.describe_instances(InstanceIds=instance_ids) running_instances = {} for reservation in ec2_response['Reservations']: for instance in reservation['Instances']: if instance['State']['Name'] == 'running': instance_id = instance['InstanceId'] # Find corresponding container instance for cont_inst in instances_response['containerInstances']: if cont_inst['ec2InstanceId'] == instance_id: running_instances[instance_id] = { 'running_tasks': cont_inst['runningTasksCount'], 'pending_tasks': cont_inst['pendingTasksCount'], 'total_tasks': cont_inst['runningTasksCount'] + cont_inst['pendingTasksCount'], 'private_ip': instance.get('PrivateIpAddress', 'N/A'), 'instance_type': instance.get('InstanceType', 'N/A') } break return running_instances except Exception as e: return {} def create_ec2_task_display(self): """Create simple text display showing EC2 instance ID and task count""" running_instances = self.get_running_ec2_instances() if not running_instances: return "No running EC2 instances found" display_text = "" for instance_id, data in running_instances.items(): total_tasks = data['running_tasks'] + data['pending_tasks'] display_text += f"{instance_id} {total_tasks}\n" return display_text.rstrip() def get_asg_info(self): try: # ECS 클러스터의 컨테이너 인스턴스 조회 response = self.ecs.list_container_instances(cluster=self.cluster_name) if not response['containerInstanceArns']: return {} # 컨테이너 인스턴스 상세 정보 조회 instances_response = self.ecs.describe_container_instances( cluster=self.cluster_name, containerInstances=response['containerInstanceArns'] ) # EC2 인스턴스 ID 수집 instance_ids = [inst['ec2InstanceId'] for inst in instances_response['containerInstances']] if not instance_ids: return {} # EC2 인스턴스 정보 조회 ec2_response = self.ec2.describe_instances(InstanceIds=instance_ids) asg_data = {} for reservation in ec2_response['Reservations']: for instance in reservation['Instances']: # Auto Scaling Group 태그 찾기 asg_name = None for tag in instance.get('Tags', []): if tag['Key'] == 'aws:autoscaling:groupName': asg_name = tag['Value'] break if asg_name and asg_name not in asg_data: # ASG 정보 조회 asg_response = self.autoscaling.describe_auto_scaling_groups( AutoScalingGroupNames=[asg_name] ) if asg_response['AutoScalingGroups']: asg = asg_response['AutoScalingGroups'][0] asg_data[asg_name] = { 'desired': asg['DesiredCapacity'], 'min': asg['MinSize'], 'max': asg['MaxSize'], 'instances': len([i for i in asg['Instances'] if i['LifecycleState'] == 'InService']), 'pending': len([i for i in asg['Instances'] if i['LifecycleState'] == 'Pending']), 'terminating': len([i for i in asg['Instances'] if i['LifecycleState'] == 'Terminating']) } return asg_data except Exception: return {} def create_node_table(self): table = Table(title=f"Auto Scaling Groups - Cluster: {self.cluster_name}") table.add_column("ASG Name", style="cyan", width=30) table.add_column("Capacity (D/Min/Max)", style="yellow", width=18) table.add_column("Instances (R/P/T)", style="green", width=15) table.add_column("Status", style="white", width=15) for asg_name, data in self.node_data.items(): status = "Stable" status_color = "green" if data['pending'] > 0: status = "Scaling Out" status_color = "yellow" elif data['terminating'] > 0: status = "Scaling In" status_color = "red" elif data['instances'] != data['desired']: status = "Adjusting" status_color = "orange" table.add_row( asg_name, f"{data['desired']}/{data['min']}/{data['max']}", f"{data['instances']}/{data['pending']}/{data['terminating']}", f"[{status_color}]{status}[/{status_color}]" ) return table def create_config_panel(self): if self.cpu_metric_mode == 'max': cpu_mode_display = f"[bold green]CPU MAX[/bold green]" elif self.cpu_metric_mode == 'avg': cpu_mode_display = f"[bold yellow]CPU AVG[/bold yellow]" else: # 'off' cpu_mode_display = f"[bold red]SCALING OFF[/bold red]" config_text = f"""[bold cyan]Scaling Configuration:[/bold cyan] • CPU Metric Mode: {cpu_mode_display} (Press 'M' to toggle) • CPU Scale-out Threshold: 50% (normal services), 70% (stress services) • CPU Scale-in Threshold: 25% • CPU Emergency Scale-in: < 10% (scale to 1 task immediately) • Scale-out Cooldown: 120 seconds • Scale-in Cooldown: 60 seconds • Manual Modification Cooldown: 60 seconds [bold yellow]🚀 Scale-out 동작:[/bold yellow] • Normal 서비스 (임계치 초과 정도별): - 30% 이상 초과: 2개 필요 - 20% 이상 초과: 1개 필요 - 10% 이상 초과: 1개 필요 • Stress 서비스 (임계치 초과 정도별): - 30% 이상 초과: 3개 필요 - 20% 이상 초과: 2개 필요 - 10% 이상 초과: 1개 필요 • 120초 cooldown 적용 • Pending task 수만큼 필요 task에서 차감 • EC2 자동 확장: 모든 인스턴스가 4개 task로 가득 차면 EC2 1대 추가 [bold red]📉 Scale-in 동작:[/bold red] • CPU 30% 이하: 1개씩 task 감소 • CPU 10% 이하: 즉시 1개 task로 긴급 축소 • Scale-in 중일 때는 EC2 확장 안함 [bold green]💡 스마트 기능:[/bold green] • Pending task 고려하여 중복 scaling 방지 • CPU 사용률 기반 최적 task 수 계산 • t3.medium 기준 인스턴스당 최대 4개 task • 수동 수정 감지 시 60초 자동 scaling 일시정지 • Scaling Off 모드: 모든 데이터 수집하지만 자동 스케일링 비활성화 [bold magenta]⌨️ Controls:[/bold magenta] • Press 'M' to toggle CPU metric (Max/Avg/Off) • Press 'Q' to quit""" return config_text if not history: return f"No {metric_name} data" values = [h[metric_name.lower()] for h in history] max_val = max(values) if values else 100 scale = max_val / 10 if max_val > 0 else 1 graph = f"{metric_name} (0-{max_val:.0f}%):\n" for i, val in enumerate(values): bar_height = int(val / scale) if scale > 0 else 0 bar = "█" * bar_height + "░" * (10 - bar_height) time_str = history[i]['time'] graph += f"{time_str} |{bar}| {val:4.1f}%\n" return graph def create_display(self): table = Table(title=f"ECS Autoscaler - Cluster: {self.cluster_name}") table.add_column("Service", style="cyan", width=20) table.add_column("Status", style="green", width=10) table.add_column("Tasks (D/R/P)", style="yellow", width=12) table.add_column("Progress", style="white", width=12) table.add_column("CPU Max %", style="magenta", width=10) table.add_column("CPU Avg %", style="blue", width=10) table.add_column("Task P90 ms", style="red", width=10) table.add_column("RPS", style="cyan", width=10) table.add_column("Success %", style="green", width=10) table.add_column("Memory %", style="blue", width=10) table.add_column("Scaling Action", style="red", width=25) for service, data in self.service_data.items(): threshold = self.get_threshold(service) cpu_max_color = "red" if data['cpu_max'] > threshold else "green" cpu_avg_color = "red" if data['cpu_avg'] < 40 else "green" task_p90_color = "red" if data['task_p90'] > 300 else "green" mem_color = "red" if data['memory'] > 80 else "green" # Calculate success rate success_rate = 0 if data['total_requests'] > 0: success_rate = (data['success_requests'] / data['total_requests']) * 100 success_color = "red" if success_rate < 95 else "green" progress_text, progress_color = self.get_scaling_progress( service, data['desired'], data['running'], data['pending'] ) table.add_row( service, data['status'], f"{data['desired']}/{data['running']}/{data['pending']}", f"[{progress_color}]{progress_text}[/{progress_color}]", f"[{cpu_max_color}]{data['cpu_max']:.1f}%[/{cpu_max_color}]", f"[{cpu_avg_color}]{data['cpu_avg']:.1f}%[/{cpu_avg_color}]", f"[{task_p90_color}]{data['task_p90']:.0f}[/{task_p90_color}]", f"{data['total_requests']}", # Show RPS as simple number f"[{success_color}]{success_rate:.1f}%[/{success_color}]", f"[{mem_color}]{data['memory']:.1f}%[/{mem_color}]", data['scaling_action'] ) # Create EC2 task distribution display ec2_task_display = self.create_ec2_task_display() layout = Layout() layout.split_column( Layout(Panel(table, title="ECS CPU Autoscaler", border_style="bright_blue"), name="main"), Layout(Panel(self.create_node_table(), title="Cluster Nodes", border_style="bright_green"), name="nodes"), Layout(Panel(ec2_task_display, title="EC2 Task Distribution", border_style="bright_cyan"), name="ec2_tasks"), Layout(Panel(self.create_config_panel(), title="Configuration", border_style="bright_magenta"), name="config"), Layout(Panel(f"Last Update: {self.last_update}", border_style="dim"), name="bottom") ) layout["main"].size = 15 layout["nodes"].size = 10 layout["ec2_tasks"].size = 12 layout["config"].size = 8 layout["bottom"].size = 3 return layout def get_ec2_task_distribution(self): """Get task distribution across EC2 instances""" try: # Get container instances response = self.ecs.list_container_instances(cluster=self.cluster_name) if not response['containerInstanceArns']: return {} # Get detailed info about container instances instances_response = self.ecs.describe_container_instances( cluster=self.cluster_name, containerInstances=response['containerInstanceArns'] ) instance_tasks = {} for instance in instances_response['containerInstances']: instance_id = instance['ec2InstanceId'] running_tasks = instance['runningTasksCount'] pending_tasks = instance['pendingTasksCount'] instance_tasks[instance_id] = { 'running': running_tasks, 'pending': pending_tasks, 'total': running_tasks + pending_tasks } return instance_tasks except Exception as e: print(f"Error getting EC2 task distribution: {e}") return {} def check_asg_scale_needed(self): """Check if ASG needs to be scaled up""" try: # Get total pending tasks across all services total_pending = 0 any_scale_in = False for service_name in self.get_services(): service_details = self.get_service_details(service_name) total_pending += service_details['pending'] # Check if any service is scaling in if service_name in self.service_data: cpu_value = self.get_cpu_value(service_name) if cpu_value < 25: # Scale-in threshold any_scale_in = True # Don't scale ASG if any service is scaling in if any_scale_in: return False # Check if we have pending tasks if total_pending == 0: return False # Get current EC2 task distribution instance_tasks = self.get_ec2_task_distribution() if not instance_tasks: return False # Check if all instances are at capacity (4 tasks each for t3.medium) max_tasks_per_instance = 4 all_at_capacity = True for instance_id, tasks in instance_tasks.items(): if tasks['total'] < max_tasks_per_instance: all_at_capacity = False break return all_at_capacity and total_pending > 0 except Exception as e: print(f"Error checking ASG scale need: {e}") return False def scale_asg_up(self): """Scale up ASG by 1 instance""" try: # Find ASG with apdev-node- pattern response = self.autoscaling.describe_auto_scaling_groups() asg_name = None for asg in response['AutoScalingGroups']: if asg['AutoScalingGroupName'].startswith('apdev-node-'): asg_name = asg['AutoScalingGroupName'] break if not asg_name: print("No ASG found with apdev-node- pattern") return False # Get current ASG details asg_response = self.autoscaling.describe_auto_scaling_groups( AutoScalingGroupNames=[asg_name] ) asg = asg_response['AutoScalingGroups'][0] current_desired = asg['DesiredCapacity'] max_size = asg['MaxSize'] if current_desired >= max_size: print(f"ASG already at max capacity: {current_desired}/{max_size}") return False # Increase desired capacity by 1 new_desired = current_desired + 1 self.autoscaling.set_desired_capacity( AutoScalingGroupName=asg_name, DesiredCapacity=new_desired, HonorCooldown=False ) print(f"Scaled ASG {asg_name} from {current_desired} to {new_desired}") return True except Exception as e: print(f"Error scaling ASG: {e}") return False def get_threshold(self, service_name): return 70 if 'stress' in service_name else 50 # Scale-out threshold: 70% for stress services, 50% for others return 75 if 'stress' in service_name else 50 # Scale-out threshold: 75% for stress services, 50% for others def get_target_group_arn(self, service_name): # Extract cluster prefix for target group naming cluster_prefix = self.cluster_name.split('-cluster')[0] if '-cluster' in self.cluster_name else self.cluster_name # Remove trailing numbers from service name import re service_base = re.sub(r'\d+$', '', service_name) target_group_name = f"{cluster_prefix}-{service_base}-tg" # Check cache first if target_group_name in self.target_group_cache: return self.target_group_cache[target_group_name] try: # Get target group ARN dynamically response = self.elbv2.describe_target_groups(Names=[target_group_name]) if response['TargetGroups']: arn = response['TargetGroups'][0]['TargetGroupArn'] self.target_group_cache[target_group_name] = arn return arn except Exception: pass return None def get_task_latency_p90(self, service_name): end_time = datetime.now(UTC) start_time = end_time - timedelta(minutes=5) # Increased for testing log_group_name = f"{self.log_group_prefix}/{service_name}" try: response = self.logs.filter_log_events( logGroupName=log_group_name, startTime=int(start_time.timestamp() * 1000), endTime=int(end_time.timestamp() * 1000), filterPattern='latency', limit=100 ) latencies = [] for event in response.get('events', []): message = event['message'] import re # ms, μs 단위를 포함한 패턴 매칭 match = re.search(r'(\d+(?:\.\d+)?)\s*(ms|μs|us)', message, re.IGNORECASE) if match: latency = float(match.group(1)) unit = match.group(2).lower() # μs나 us를 ms로 변환 if unit in ['μs', 'us']: latency = latency / 1000 latencies.append(latency) if latencies: latencies.sort() p90_index = int(len(latencies) * 0.9) return latencies[p90_index] if p90_index < len(latencies) else latencies[-1] except Exception: pass return 0 def get_task_log_requests(self, service_name): log_group_name = f"{self.log_group_prefix}/{service_name}" # CloudWatch Logs 지연을 고려해서 30초 전부터 10초 전까지 조회 end_time = int(time.time() * 1000) - 10000 # 10초 전 start_time = end_time - 20000 # 20초 구간 try: response = self.logs.filter_log_events( logGroupName=log_group_name, startTime=start_time, endTime=end_time, ) events = response.get("events", []) total_requests = len(events) # 20초 구간의 로그를 초당으로 환산 rps = total_requests / 20 if total_requests > 0 else 0 success_requests = 0 for event in events: message = event['message'].lower() if any(keyword in message for keyword in ['200', '201', '202', 'success', 'ok']): success_requests += 1 success_rps = success_requests / 20 if success_requests > 0 else 0 failure_rps = rps - success_rps return int(rps), int(success_rps), int(failure_rps) except Exception: return 0, 0, 0 def format_number(self, num): if num >= 1_000_000_000: return f"{num/1_000_000_000:.1f}b" elif num >= 1_000_000: return f"{num/1_000_000:.1f}m" elif num >= 1_000: return f"{num/1_000:.1f}k" else: return f"{int(num)}" def run(self): # Start keyboard listener self.start_keyboard_listener() with Live(self.create_display(), refresh_per_second=1) as live: while True: services = self.get_services() self.node_data = self.get_asg_info() self.last_update = datetime.now().strftime("%H:%M:%S") for service_name in services: cpu_max, cpu_avg, cpu_history = self.get_cpu_data(service_name) memory_util = self.get_memory_utilization(service_name) task_p90 = self.get_task_latency_p90(service_name) total_requests, success_requests, failure_requests = self.get_task_log_requests(service_name) service_details = self.get_service_details(service_name) scaling_action = "Normal" up_threshold = self.get_threshold(service_name) down_threshold = 25 # Scale-in threshold changed to 25% p90_threshold = 300 # 300ms (0.3s) # Get current desired count from ECS current_desired = service_details['desired'] # Check for manual modifications manual_modified = self.detect_manual_modification(service_name, current_desired) in_manual_cooldown = self.is_in_manual_cooldown(service_name) # Skip scaling if in manual cooldown or scaling is off if in_manual_cooldown: remaining_time = 60 - (datetime.now() - self.manual_modification_time[service_name]).total_seconds() scaling_action = f"Manual cooldown ({remaining_time:.0f}s remaining)" elif manual_modified: scaling_action = "Manual modification detected" elif self.cpu_metric_mode == 'off': scaling_action = "Scaling disabled" else: # Simple CPU-based scaling (only if not in manual cooldown and scaling is enabled) cpu_value = self.get_cpu_value(service_name) if cpu_value > up_threshold and self.can_scale_out(service_name): # Get CPU value based on current mode cpu_value = self.get_cpu_value(service_name) # Determine scale amount based on CPU difference and service type cpu_diff = cpu_value - up_threshold if 'stress' in service_name: # Stress service scaling: 10%->1, 20%->2, 30%->3 if cpu_diff >= 30: needed_tasks = 3 elif cpu_diff >= 20: needed_tasks = 2 elif cpu_diff >= 10: needed_tasks = 1 else: needed_tasks = 1 else: # Normal service scaling: 10%->1, 20%->1, 30%->2 if cpu_diff >= 30: needed_tasks = 2 elif cpu_diff >= 20: needed_tasks = 1 elif cpu_diff >= 10: needed_tasks = 1 else: needed_tasks = 1 # Account for pending tasks pending_tasks = service_details['pending'] actual_scale_amount = max(0, needed_tasks - pending_tasks) if actual_scale_amount > 0: new_count = current_desired + actual_scale_amount self.scale_service(service_name, new_count, is_scale_out=True) self.last_scale_out[service_name] = datetime.now() scaling_action = f"Scale-out +{actual_scale_amount} (needed: {needed_tasks}, pending: {pending_tasks}) [{self.cpu_metric_mode.upper()}]" else: scaling_action = f"No scale needed (pending: {pending_tasks} >= needed: {needed_tasks})" elif cpu_value > up_threshold and not self.can_scale_out(service_name): scaling_action = "CPU Cooldown" # Scale-in logic elif cpu_value < down_threshold and current_desired > 1 and self.can_scale_in(service_name): if cpu_value <= 10: # Emergency scale-in new_count = 1 self.scale_service(service_name, new_count) self.last_scale_in[service_name] = datetime.now() scaling_action = f"Emergency scale to 1 task (CPU: {cpu_value:.1f}%) [{self.cpu_metric_mode.upper()}]" else: # Normal scale-in new_count = max(1, current_desired - 1) self.scale_service(service_name, new_count) self.last_scale_in[service_name] = datetime.now() scaling_action = f"Scale-in -1 task (CPU: {cpu_value:.1f}%) [{self.cpu_metric_mode.upper()}]" elif cpu_value < down_threshold and not self.can_scale_in(service_name): scaling_action = "Scale-in cooldown" self.service_data[service_name] = { 'cpu_max': cpu_max, 'cpu_avg': cpu_avg, 'cpu_history': cpu_history, 'memory': memory_util, 'task_p90': task_p90, 'total_requests': total_requests, 'success_requests': success_requests, 'failure_requests': failure_requests, 'desired': current_desired, 'running': service_details['running'], 'pending': service_details['pending'], 'status': service_details['status'], 'scaling_action': scaling_action } # Check if ASG needs scaling after processing all services (every 60 seconds) # Skip ASG scaling if scaling is disabled current_time = datetime.now() if (self.cpu_metric_mode != 'off' and (not hasattr(self, 'last_asg_check') or (current_time - self.last_asg_check).total_seconds() >= 60)): if self.check_asg_scale_needed(): self.scale_asg_up() self.last_asg_check = current_time live.update(self.create_display()) time.sleep(1) if __name__ == "__main__": import argparse parser = argparse.ArgumentParser(description='ECS CPU Autoscaler') parser.add_argument('--cluster', required=True, help='ECS cluster name') parser.add_argument('--region', default='ap-northeast-2', help='AWS region') parser.add_argument('--log-group-prefix', help='Log group prefix (default: /aws/ecs/{cluster_name})') args = parser.parse_args() scaler = ECSAutoscaler(args.cluster, args.region, args.log_group_prefix) scaler.run()
Python
복사