Skip to content

[FEATURE] LangGraph 기반 워크플로우 전환 및 Cloud Control API 개선 #9

Description

@yubin425

개요

EC2 Agent를 기존의 단순한 클래스 기반 구조에서 LangGraph 기반의 구조화된 워크플로우로 전환했습니다. 이를 통해 위험 작업 확인, 파라미터 검증, 결과 검증 등의 기능을 체계적으로 처리할 수 있게 되었습니다. 또한 EC2 인스턴스 생성 시 Cloud Control API를 우선적으로 사용하도록 개선하고, 비동기 작업을 완료까지 대기하여 문제를 해결했습니다.

아키텍처 개요

기존 아키텍처

EC2Agent 클래스 구조:

class EC2Agent:
    """LangChain을 사용한 EC2 Mini Agent"""
    
    def __init__(self, settings, ...):
        self.llm = ChatBedrock(...)
        self.aws_tool = AWSCCTool(...)
        self.tools = [self.aws_tool]
    
    async def process_request(self, user_request: str, context: Dict[str, Any] = None):
        """사용자 요청을 처리하는 메인 메서드"""
        # 1. LLM 기반 요청 분석
        action_data = await self._analyze_request_llm_based(user_request)
        
        # 2. AWS API 호출
        aws_result = await self._execute_aws_action(action_data)
        
        # 3. 응답 생성
        return self._create_success_response(aws_result, action_data)

특징:

  • 단순한 클래스 구조
  • 직접적인 메서드 호출 체인
  • LLM 기반 요청 분석 + 규칙 기반 폴백
  • 위험 작업 확인 없음
  • 결과 검증 없음
  • 동기/비동기 혼용 (async def 사용)

처리 흐름:

사용자 요청 
  → LLM 분석 (_analyze_request_llm_based)
  → AWS API 호출 (_execute_aws_action)
  → 응답 생성 (_create_success_response)

현재 아키텍처 (BaseAgent + LangGraph)

BaseAgent 상속 구조:

class EC2Agent(BaseAgent):
    """LangChain을 사용한 EC2 Mini Agent (LangGraph 호환)"""
    
    def __init__(self, settings, ...):
        super().__init__(settings, ...)  # BaseAgent 초기화
        self.aws_tool = AWSCCTool(...)
        self.tools = [self.aws_tool]
    
    def get_agent_type(self) -> str:
        return "ec2"
    
    def get_system_prompt(self) -> str:
        return "..."  # EC2 전문 프롬프트
    
    def get_required_parameters(self, action: str) -> List[str]:
        return [...]  # 액션별 필수 파라미터
    
    def get_dangerous_actions(self) -> List[str]:
        return ["terminate_instance"]  # 위험 작업 목록
    
    def execute_action(self, action: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        # 실제 AWS API 호출
        return self.aws_tool._run(...)
    
    def verify_action_result(self, action: str, parameters: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
        # 결과 검증 로직
        return {"passed": True, ...}

LangGraph 워크플로우:

  • BaseAgent가 자동으로 LangGraph 워크플로우를 구성
  • State 기반으로 데이터 전달
  • 각 노드가 독립적으로 실행

처리 흐름:

START
  → think_node (사고 과정)
  → extract_parameters_node (파라미터 추출)
  → check_parameters_node (파라미터 검증)
  → [재질문 필요 시] reask_node
  → check_risk_node (위험 작업 확인)
  → [위험 작업 시] confirm_dangerous_action_node
  → execute_action_node (작업 실행) ← Cloud Control API 호출
  → verify_result_node (결과 검증)
  → generate_response_node (응답 생성)
  → END

SubAgentState 구조

LangGraph에서 사용하는 State는 TypedDict로 정의되어 있으며, 모든 노드 간에 공유됩니다:

class SubAgentState(TypedDict):
    """서브 에이전트 상태를 관리하는 TypedDict (LangGraph 호환)"""
    user_request: str                          # 사용자 요청
    context: Dict[str, Any]                     # 컨텍스트 정보
    thinking_result: Optional[Dict[str, Any]]    # think_node 결과
    extracted_parameters: Optional[Dict[str, Any]]  # extract_parameters_node 결과
    missing_parameters: List[str]               # 부족한 파라미터 목록
    reask_message: Optional[str]                 # 재질문 메시지
    is_dangerous_action: bool                   # 위험 작업 여부
    dangerous_action_confirmed: bool             # 위험 작업 확인 여부
    confirmation_message: Optional[str]          # 확인 메시지
    action_result: Optional[Dict[str, Any]]      # execute_action_node 결과
    verification_result: Optional[Dict[str, Any]]  # verify_result_node 결과
    verification_passed: bool                     # 검증 통과 여부
    final_response: Optional[str]                # 최종 응답
    needs_clarification: bool                    # 재질문 필요 여부
    action: Optional[str]                        # 추출된 액션
    messages: List[BaseMessage]                  # LLM 메시지 히스토리

State의 역할:

  • 각 노드가 State를 읽고 수정하여 다음 노드로 전달
  • 상태 기반으로 조건부 라우팅 결정
  • 전체 워크플로우의 컨텍스트 유지

BaseAgent 워크플로우 노드 상세

1. think_node

  • 역할: 사용자 요청을 분석하고 사고 과정 생성
  • 입력: user_request
  • 출력: thinking_result
  • 특징: LLM을 사용하여 요청의 의도를 파악

2. extract_parameters_node

  • 역할: 사고 결과에서 액션과 파라미터 추출
  • 입력: thinking_result
  • 출력: action, extracted_parameters
  • 특징: JSON 파싱 및 검증

3. check_parameters_node

  • 역할: 필수 파라미터 검증
  • 입력: action, extracted_parameters
  • 출력: needs_clarification, missing_parameters
  • 특징: 각 에이전트별 필수 파라미터 확인

4. reask_node

  • 역할: 부족한 파라미터에 대한 재질문 생성
  • 입력: missing_parameters
  • 출력: reask_message
  • 조건: needs_clarification == True일 때만 실행

5. check_risk_node

  • 역할: 위험 작업 감지 및 확인 메시지 생성
  • 입력: action, extracted_parameters
  • 출력: is_dangerous_action, confirmation_message
  • 특징: 각 에이전트별 위험 작업 목록 확인

6. confirm_dangerous_action_node

  • 역할: 위험 작업 사용자 확인 처리
  • 입력: dangerous_action_confirmed
  • 출력: 상태 확인
  • 조건: is_dangerous_action == True일 때만 실행

7. execute_action_node

  • 역할: 실제 AWS API 호출
  • 입력: action, extracted_parameters
  • 출력: action_result
  • 특징: 각 에이전트의 execute_action 메서드 호출
  • 중요: 여기서 Cloud Control API 호출이 발생

8. verify_result_node

  • 역할: 작업 결과 검증
  • 입력: action, extracted_parameters, action_result
  • 출력: verification_result, verification_passed
  • 특징: 각 에이전트의 verify_action_result 메서드 호출
  • 예시: EC2 인스턴스 생성 후 실제로 존재하는지 확인

9. retry_or_rollback_node

  • 역할: 재시도 또는 롤백 처리
  • 입력: verification_result
  • 출력: 상태 업데이트
  • 조건: verification_passed == False일 때 실행

10. generate_response_node

  • 역할: 최종 사용자 친화적 응답 생성
  • 입력: action_result, verification_passed
  • 출력: final_response
  • 특징: LLM을 사용하여 간결한 응답 생성

변경 후 구조 (현재)

전체 아키텍처:

class EC2Agent(BaseAgent):
    """BaseAgent를 상속받는 EC2 Agent"""
    
    def __init__(self, settings, ...):
        super().__init__(settings, ...)  # LangGraph 워크플로우 자동 구성
        self.aws_tool = AWSCCTool(...)
    
    def execute_action(self, action: str, parameters: Dict[str, Any]) -> Dict[str, Any]:
        """BaseAgent의 execute_action_node에서 호출됨"""
        if action == 'create_instance':
            result = self.aws_tool._run(json.dumps({
                'action': action,
                'parameters': parameters
            }))
            return json.loads(result)
    
    def verify_action_result(self, action: str, parameters: Dict[str, Any], result: Dict[str, Any]) -> Dict[str, Any]:
        """BaseAgent의 verify_result_node에서 호출됨"""
        if action == 'create_instance' and result.get('success'):
            # 인스턴스가 실제로 생성되었는지 확인
            instance_id = result.get('instance_id')
            # ... 검증 로직 ...
            return {"passed": True}

_create_instance 메서드 (AWSCCTool 내부):

def _create_instance(self, parameters: Dict[str, Any]) -> str:
    """EC2 인스턴스 생성 (Cloud Control API 우선 사용, 비동기 작업 완료 대기)"""
    try:
        # 1. 파라미터 검증 및 정리
        ami_id = self._validate_and_get_ami_id(parameters)
        instance_type = self._validate_and_get_instance_type(parameters)
        
        # 2. Cloud Control API 호출
        response = self._cloudcontrol_client.create_resource(
            TypeName='AWS::EC2::Instance',
            DesiredState=json.dumps(resource_model)
        )
        
        # 3. 응답 처리
        if 'ResourceDescription' in response:
            # 동기 응답 처리
            instance_id = extract_from_resource_description(response)
        elif 'ProgressEvent' in response:
            # 비동기 응답 처리
            if operation_status == 'IN_PROGRESS' and not instance_id:
                instance_id = self._wait_for_async_operation(request_token)
        
        # 4. 실패 시 EC2 API로 폴백
        if not instance_id:
            return self._create_instance_ec2_fallback(parameters, ami_id)
            
    except Exception as e:
        # 에러 발생 시 EC2 API로 폴백
        return self._create_instance_ec2_fallback(parameters, ami_id)

특징:

  • BaseAgent 상속 구조로 LangGraph 워크플로우 활용
  • State 기반 데이터 전달
  • 구조화된 노드 기반 처리
  • Cloud Control API 우선 사용
  • 비동기 작업 완료까지 대기
  • 실패 시에만 EC2 API로 폴백
  • 다양한 응답 구조 처리
  • 위험 작업 확인 기능 포함
  • 결과 검증 기능 포함
  • 파라미터 검증 및 재질문 기능 포함

Cloud Control API 비동기 작업 처리 개선

개요

EC2 인스턴스 생성 시 Cloud Control API를 우선적으로 사용하도록 개선하고, 비동기 작업을 완료까지 대기하여 중복 생성 문제를 해결했습니다.

변경사항

  • Cloud Control API를 먼저 시도
  • 비동기 작업 완료까지 최대 60초 대기
  • 실패/타임아웃 시에만 EC2 API로 폴백

비동기 작업 처리 방식

_wait_for_async_operation 메서드를 통해 비동기 작업을 처리합니다:

def _wait_for_async_operation(
    self, request_token: str, parameters: Dict[str, Any], 
    ami_id: str, max_wait_time: int = 60
) -> Optional[str]:
    """비동기 Cloud Control API 작업 완료 대기"""
    wait_interval = 2  # 2초마다 확인
    waited_time = 0
    
    while waited_time < max_wait_time:
        status_response = self._cloudcontrol_client.get_resource_request_status(
            RequestToken=request_token
        )
        
        status_event = status_response.get('ProgressEvent', {})
        current_status = status_event.get('OperationStatus')
        
        if current_status == 'SUCCESS':
            return extract_instance_id(status_event)
        elif current_status in ['FAILED', 'CANCEL_IN_PROGRESS', 'CANCEL_COMPLETE']:
            return None
        elif current_status == 'IN_PROGRESS':
            time.sleep(wait_interval)
            waited_time += wait_interval
    
    return None  # 타임아웃

동작 흐름:

  1. create_resource 호출 후 ProgressEvent 반환
  2. RequestToken 추출
  3. get_resource_request_status로 주기적 상태 확인 (2초 간격)
  4. SUCCESS 상태가 되면 인스턴스 ID 추출
  5. 최대 60초 대기 후 타임아웃 시 None 반환

다양한 응답 구조 처리

Cloud Control API는 다양한 응답 구조를 반환할 수 있으며, 모든 구조를 처리하도록 구현했습니다:

  • ResourceDescription: 동기 응답
  • ProgressEvent: 비동기 응답
  • 직접 Identifier: 간단한 응답

4. 에러 처리 및 로깅 강화

  • 각 단계별 상세한 로깅 추가
  • ClientError와 일반 예외 구분 처리
  • 스택 트레이스 로깅
  • 에러 코드 및 메시지 상세 기록

잠재적 문제점 및 고려사항

1. 응답 시간 증가

문제:

  • 비동기 작업 완료까지 최대 60초 대기
  • 사용자 경험 저하 가능성

완화 방안:

  • 대기 시간을 설정 가능하게 만들기 (환경 변수 등)
  • 진행 상황을 사용자에게 알리기 (웹소켓, 폴링 등)

권장사항:

# 환경 변수로 대기 시간 설정 가능하게
max_wait_time = int(os.getenv('CLOUD_CONTROL_MAX_WAIT_TIME', '60'))

2. 타임아웃 처리

문제:

  • 60초 내에 완료되지 않으면 EC2 API로 폴백
  • Cloud Control API 작업이 백그라운드에서 계속 진행될 수 있음

현재 해결책:

  • 타임아웃 시 EC2 API로 폴백하여 즉시 인스턴스 생성
  • Cloud Control API 작업은 백그라운드에서 계속 진행되지만, EC2 API로 생성된 인스턴스를 사용

개선 가능성:

  • Cloud Control API 작업 취소 기능 추가 (AWS API 지원 시)
  • 작업 상태를 추적하여 나중에 확인 가능하게

3. 네트워크 오류 처리

문제:

  • get_resource_request_status 호출 중 네트워크 오류 발생 가능
  • 일시적 오류와 영구적 오류 구분 필요

현재 처리:

  • ClientError는 일시적 오류로 간주하여 재시도
  • 일반 예외는 즉시 실패 처리

개선 가능성:

  • 재시도 로직 개선 (지수 백오프 등)
  • 네트워크 오류와 API 오류 구분

4. 리소스 모델 형식

문제:

  • Cloud Control API는 CloudFormation 형식의 리소스 모델을 요구
  • EC2 API와 파라미터 형식이 다름

현재 처리:

  • EC2 API 파라미터를 CloudFormation 형식으로 변환
  • 선택적 파라미터 처리

주의사항:

  • 일부 EC2 API 파라미터가 Cloud Control API에서 지원되지 않을 수 있음
  • 새로운 파라미터 추가 시 양쪽 API 모두 확인 필요

5. 비용 및 성능

장점:

  • Cloud Control API는 통합 API로 여러 서비스 관리 가능
  • 일관된 인터페이스 제공

단점:

  • 비동기 작업으로 인한 지연 시간
  • 추가 API 호출 (get_resource_request_status)로 인한 비용 증가

비용 분석:

  • Cloud Control API: 무료 (일반적인 사용량 기준)
  • EC2 API: 무료
  • 추가 API 호출로 인한 비용 증가는 미미함

테스트 시나리오

LangGraph 워크플로우 테스트

  • 각 노드별 단위 테스트
  • State 전달 및 업데이트 테스트
  • 조건부 라우팅 테스트
  • 위험 작업 확인 플로우 테스트

Cloud Control API 테스트

  • 동기 응답 처리 테스트
  • 비동기 응답 처리 테스트 (60초 내 완료)
  • 타임아웃 시 EC2 API 폴백 테스트
  • 실패 시 EC2 API 폴백 테스트

모니터링 및 디버깅

로그 레벨별 정보

INFO 레벨:

  • Cloud Control API 호출 시작/완료
  • 비동기 작업 상태 변경
  • 인스턴스 ID 추출 성공

DEBUG 레벨:

  • Cloud Control API 응답 전체 내용
  • 작업 상태 확인 상세 정보

WARNING 레벨:

  • 비동기 작업 실패
  • 타임아웃 발생
  • EC2 API로 폴백

ERROR 레벨:

  • 예외 발생
  • 스택 트레이스

주요 로그 메시지

INFO: Cloud Control API를 사용하여 인스턴스 생성 시도 - AMI: ami-xxx, Type: t2.micro
INFO: Cloud Control API create_resource 호출 시작
INFO: Cloud Control API create_resource 호출 성공
INFO: Cloud Control API가 비동기 응답을 반환했습니다. Status: IN_PROGRESS
INFO: 비동기 작업 완료 대기 시작. RequestToken: xxx, 최대 대기 시간: 60초
INFO: 작업 상태 확인 (2초 경과): Status=IN_PROGRESS
INFO: 작업 상태 확인 (4초 경과): Status=SUCCESS
INFO: 작업 완료 - 인스턴스 ID: i-1234567890abcdef0

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions