# 엣지 AIoT 3-tier 멀티 에이전트 통신 프레임워크 — 구현 설계서 **문서 버전**: v0.1.0 **최종 수정**: 2026-06-07 **관련 문서**: `FINAL_REPORT.md` (설계 근거 및 사용 사례 분석) --- ## 목차 1. [기술 스택 선정 근거](#1-기술-스택-선정-근거) 2. [프로젝트 디렉터리 구조](#2-프로젝트-디렉터리-구조) 3. [Proto 파일 설계](#3-proto-파일-설계) 4. [T2 Edge Gateway 구현 설계](#4-t2-edge-gateway-구현-설계) 5. [T1 Orchestrator 구현 설계](#5-t1-orchestrator-구현-설계) 6. [T3 Field Agent 구현 설계](#6-t3-field-agent-구현-설계) 7. [SPIFFE/SPIRE 통합](#7-spiffespire-통합) 8. [OpenTelemetry 통합](#8-opentelemetry-통합) 9. [Docker Compose 실험 환경 설계](#9-docker-compose-실험-환경-설계) 10. [구현 단계별 계획 (Phase)](#10-구현-단계별-계획-phase) 11. [테스트 전략](#11-테스트-전략) 12. [예상 성능 지표 및 측정 방법](#12-예상-성능-지표-및-측정-방법) --- ## 1. 기술 스택 선정 근거 ### 1.1 핵심 언어 선정 | 언어 | 적용 컴포넌트 | 선정 이유 | |------|-------------|-----------| | **Go** | T2 Gateway, T3 Field Agent, gRPC 서버/클라이언트 | 단일 바이너리 배포, 낮은 메모리 푸트프린트, 고루틴 기반 고동시성, quic-go 네이티브 지원, 크로스 컴파일(ARM/MIPS/x86) | | **Python** | T1 Orchestrator LLM 에이전트, A2A 클라이언트 | LangGraph/AutoGen 등 LLM 프레임워크 생태계, 빠른 프로토타이핑, grpcio 지원 | ### 1.2 전송 계층 라이브러리 #### quic-go (`github.com/quic-go/quic-go`) - Go 네이티브 QUIC RFC 9000 구현 — CGO 의존 없음 - `net/http` 인터페이스 호환 → HTTP/3 서버를 표준 핸들러로 운영 가능 - Connection migration 내장: 핸드오버(IP 변경) 시 세션 유지 - QUIC Datagram 확장(RFC 9221) 지원 → 저지연 V2X 메시지에 활용 가능 - gRPC-Go 트랜스포트 레이어 교체 방식으로 통합 (`WithContextDialer` + `quic.DialAddr`) #### gRPC-Go (`google.golang.org/grpc`) - 공식 Google Go gRPC 구현 — Interceptor Chain, Health Checking, Service Config(retry policy) 내장 - Protobuf codec 교체 가능 → 커스텀 직렬화 확장 가능 - `grpc.WithTransportCredentials` + quic-go TLS 설정을 연동하여 mTLS over QUIC 구현 #### Eclipse Paho Go (`github.com/eclipse/paho.mqtt.golang`) - MQTT 3.1.1 / 5.0 지원 Go 클라이언트 - Reconnect 자동 처리 + Message Queue(오프라인 버퍼) - T3 현장 디바이스와의 Pub/Sub 인터페이스 ### 1.3 에이전트 프레임워크 | 프레임워크 | 선정 이유 | |-----------|-----------| | **LangGraph** (Python) | 상태 머신 기반 에이전트 그래프, 체크포인팅·재시도 내장, T1 Orchestrator의 복잡한 오케스트레이션 로직에 적합 | | **A2A Protocol** | Google 주도 에이전트 간 통신 표준, HTTP/JSON 기반으로 언어 무관 상호운용성, Task/Artifact 개념으로 장기 실행 작업 모델링 | ### 1.4 보안 및 관측성 | 라이브러리 | 선정 이유 | |-----------|-----------| | **SPIFFE/SPIRE Go SDK** (`github.com/spiffe/go-spiffe/v2`) | X.509-SVID 자동 갱신, gRPC `credentials.TransportCredentials` 구현 제공, TPM attestation 플러그인 체인 | | **OpenTelemetry Go SDK** (`go.opentelemetry.io/otel`) | 벤더 중립 trace/metric/log, gRPC 자동 계측(`otelgrpc`), W3C TraceContext 전파 | | **Prometheus** | Go 네이티브 exporter, 엣지 환경 경량 스크랩 | --- ## 2. 프로젝트 디렉터리 구조 ``` edge-aiot-mas/ ├── proto/ # Protobuf 스키마 (언어 무관 공유) │ ├── agent/ │ │ └── v1/ │ │ └── agent.proto # AgentService (Unary/Stream 4모드) │ ├── telemetry/ │ │ └── v1/ │ │ └── telemetry.proto # TelemetryService │ ├── control/ │ │ └── v1/ │ │ └── control.proto # ControlService (OTA, Config) │ └── buf.yaml # Buf 스키마 설정 │ ├── gen/ # proto 컴파일 출력 (생성 파일, git 추적) │ ├── go/ │ │ ├── agent/v1/ │ │ ├── telemetry/v1/ │ │ └── control/v1/ │ └── python/ │ ├── agent/v1/ │ └── telemetry/v1/ │ ├── go.mod # Go 모듈 루트 ├── go.sum │ ├── cmd/ │ ├── gateway/ │ │ └── main.go # T2 Edge Gateway 진입점 │ ├── field-agent/ │ │ └── main.go # T3 Field Agent 진입점 │ └── cloud-server/ │ └── main.go # T1 Go gRPC 서버 진입점 │ ├── internal/ │ ├── gateway/ │ │ ├── server.go # gRPC over QUIC 서버 초기화 │ │ ├── mqtt_client.go # MQTT 브로커 클라이언트 │ │ ├── translator.go # MQTT topic → gRPC method 변환 엔진 │ │ ├── resume_token.go # Resume Token 관리자 │ │ ├── a2a_handler.go # A2A HTTP/3 엔드포인트 │ │ └── interceptors/ │ │ ├── resume.go # Resume Token gRPC Interceptor │ │ ├── deadline.go # Deadline 강제 Interceptor │ │ └── otel.go # OTel trace 전파 Interceptor │ │ │ ├── field/ │ │ ├── mqtt_publisher.go # MQTT 텔레메트리 발행 │ │ ├── offline_queue.go # 오프라인 큐 (BoltDB) │ │ └── resume_client.go # Resume Token 클라이언트 │ │ │ ├── cloud/ │ │ ├── agent_service.go # AgentService 서버 구현 │ │ └── telemetry_service.go # TelemetryService 서버 구현 │ │ │ ├── security/ │ │ ├── spiffe.go # SPIFFE/SPIRE 클라이언트 래퍼 │ │ └── credentials.go # gRPC TransportCredentials 생성 │ │ │ └── observability/ │ ├── tracer.go # OTel TracerProvider 초기화 │ └── metrics.go # Prometheus 메트릭 등록 │ ├── python/ │ ├── pyproject.toml │ ├── orchestrator/ │ │ ├── __init__.py │ │ ├── agent.py # LangGraph 오케스트레이터 에이전트 │ │ ├── a2a_client.py # A2A 클라이언트 │ │ ├── grpc_client.py # gRPC-over-QUIC 클라이언트 (aioquic 또는 subprocess) │ │ └── tools/ │ │ ├── telemetry_tool.py # 텔레메트리 조회 도구 │ │ └── control_tool.py # 제어 명령 도구 │ └── tests/ │ └── test_agent.py │ ├── deployments/ │ ├── docker-compose.yml # 전체 실험 환경 │ ├── docker-compose.spire.yml # SPIRE 오버레이 │ ├── spire/ │ │ ├── server.conf │ │ └── agent.conf │ ├── mosquitto/ │ │ └── mosquitto.conf │ └── otel/ │ └── otel-collector-config.yaml │ ├── configs/ │ ├── gateway.yaml # T2 Gateway 설정 │ ├── field-agent.yaml # T3 Field Agent 설정 │ └── cloud-server.yaml # T1 서버 설정 │ └── tests/ ├── integration/ │ ├── gateway_test.go │ └── e2e_test.go └── perf/ └── benchmark_test.go ``` --- ## 3. Proto 파일 설계 ### 3.1 AgentService (`proto/agent/v1/agent.proto`) ```protobuf syntax = "proto3"; package agent.v1; option go_package = "github.com/your-org/edge-aiot-mas/gen/go/agent/v1;agentv1"; import "google/protobuf/timestamp.proto"; import "google/protobuf/any.proto"; // 에이전트 태스크 요청 message AgentTask { string task_id = 1; // UUID string agent_id = 2; // 발신 에이전트 ID string target_agent = 3; // 수신 에이전트 ID string task_type = 4; // "infer", "control", "query", "ota" bytes payload = 5; // Protobuf Any 직렬화 map metadata = 6; // X-Tenant-ID, X-Device-Class 등 google.protobuf.Timestamp deadline = 7; } // 에이전트 태스크 응답 message AgentResult { string task_id = 1; string agent_id = 2; Status status = 3; bytes payload = 4; string resume_token = 5; // 스트리밍 재개 토큰 map metadata = 6; enum Status { STATUS_UNSPECIFIED = 0; STATUS_OK = 1; STATUS_PARTIAL = 2; // 스트림 중간 청크 STATUS_ERROR = 3; } } // 스트리밍 청크 (Server/Client/Bidi Streaming 공용) message StreamChunk { string stream_id = 1; uint64 sequence_num = 2; bytes data = 3; bool is_last = 4; string resume_token = 5; } // 4대 RPC 모드를 모두 지원하는 AgentService service AgentService { // Unary: 단일 명령/조회 (OTA 명령, 상태 조회) rpc Execute(AgentTask) returns (AgentResult); // Server Streaming: T1→T2 모델 업데이트, 토큰 스트리밍 rpc Subscribe(AgentTask) returns (stream StreamChunk); // Client Streaming: T3→T2 배치 센서 데이터 업로드 rpc Upload(stream StreamChunk) returns (AgentResult); // Bidi Streaming: T2↔T2 엣지 합의, 로봇 실시간 협업 rpc Collaborate(stream AgentTask) returns (stream AgentResult); } ``` ### 3.2 TelemetryService (`proto/telemetry/v1/telemetry.proto`) ```protobuf syntax = "proto3"; package telemetry.v1; option go_package = "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1;telemetryv1"; import "google/protobuf/timestamp.proto"; // 단일 센서 데이터 포인트 message DataPoint { string device_id = 1; string metric = 2; // "temperature", "vibration", "voltage" 등 double value = 3; string unit = 4; google.protobuf.Timestamp ts = 5; map labels = 6; // site, line, zone 등 레이블 } // 배치 텔레메트리 (다수 디바이스 팬인) message TelemetryBatch { string source_gateway = 1; repeated DataPoint points = 2; string resume_token = 3; // 마지막 처리 위치 } // 텔레메트리 수신 확인 message TelemetryAck { string resume_token = 1; // 서버가 발급한 다음 재개 위치 uint64 accepted = 2; uint64 rejected = 3; string error_message = 4; } // 텔레메트리 쿼리 message TelemetryQuery { string device_id = 1; string metric = 2; google.protobuf.Timestamp from = 3; google.protobuf.Timestamp to = 4; int32 limit = 5; } service TelemetryService { // Client Streaming: T3 다수 센서 배치 업로드 rpc PushBatch(stream DataPoint) returns (TelemetryAck); // Unary: 텔레메트리 배치 전송 rpc Push(TelemetryBatch) returns (TelemetryAck); // Server Streaming: 실시간 텔레메트리 구독 rpc Stream(TelemetryQuery) returns (stream DataPoint); } ``` ### 3.3 ControlService (`proto/control/v1/control.proto`) ```protobuf syntax = "proto3"; package control.v1; option go_package = "github.com/your-org/edge-aiot-mas/gen/go/control/v1;controlv1"; message OTARequest { string device_id = 1; string firmware_url = 2; string expected_sha256 = 3; string version = 4; } message OTAStatus { string device_id = 1; State state = 2; int32 progress = 3; // 0-100 string error = 4; enum State { STATE_UNSPECIFIED = 0; STATE_DOWNLOADING = 1; STATE_VERIFYING = 2; STATE_APPLYING = 3; STATE_COMPLETE = 4; STATE_FAILED = 5; } } message ConfigUpdate { string device_id = 1; bytes config = 2; // JSON or Protobuf Any string version = 3; } message ConfigAck { bool applied = 1; string version = 2; string error = 3; } service ControlService { // Server Streaming: OTA 다운로드 진행 상황 푸시 rpc OTAUpdate(OTARequest) returns (stream OTAStatus); // Unary: 설정 업데이트 rpc UpdateConfig(ConfigUpdate) returns (ConfigAck); } ``` ### 3.4 Buf 설정 (`proto/buf.yaml`) ```yaml version: v2 modules: - path: . lint: use: - STANDARD breaking: use: - FILE deps: - buf.build/googleapis/googleapis ``` --- ## 4. T2 Edge Gateway 구현 설계 ### 4.1 quic-go + gRPC 통합 T2 게이트웨이는 T1 방향으로 gRPC over QUIC 서버를 운영하고, T3 방향으로 MQTT 클라이언트를 유지한다. #### `internal/gateway/server.go` ```go package gateway import ( "context" "crypto/tls" "fmt" "net" "net/http" "github.com/quic-go/quic-go" "github.com/quic-go/quic-go/http3" "google.golang.org/grpc" "google.golang.org/grpc/credentials" agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1" telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1" "github.com/your-org/edge-aiot-mas/internal/gateway/interceptors" "github.com/your-org/edge-aiot-mas/internal/security" ) // GatewayConfig는 T2 게이트웨이 설정을 담는다. type GatewayConfig struct { GRPCAddr string // T1 방향 gRPC over QUIC 수신 주소 (예: ":9443") A2AAddr string // A2A HTTP/3 수신 주소 (예: ":8443") MQTTBroker string // T3 방향 MQTT 브로커 주소 SPIFFESocket string // SPIRE Agent 소켓 경로 } // Gateway는 T2 엣지 게이트웨이의 핵심 컴포넌트다. type Gateway struct { cfg GatewayConfig grpcServer *grpc.Server http3Server *http3.Server mqttClient *MQTTClient translator *Translator tokenMgr *ResumeTokenManager } // New는 Gateway를 초기화하고 반환한다. func New(ctx context.Context, cfg GatewayConfig) (*Gateway, error) { // 1. SPIFFE/SPIRE에서 TLS 자격증명 획득 creds, err := security.NewSPIFFECredentials(ctx, cfg.SPIFFESocket) if err != nil { return nil, fmt.Errorf("SPIFFE credentials: %w", err) } // 2. Resume Token 관리자 초기화 tokenMgr := NewResumeTokenManager() // 3. gRPC 서버 인터셉터 체인 구성 srv := grpc.NewServer( grpc.Creds(creds), grpc.ChainUnaryInterceptor( interceptors.DeadlineEnforcer(defaultDeadlines), interceptors.ResumeTokenUnary(tokenMgr), interceptors.OTelUnary(), ), grpc.ChainStreamInterceptor( interceptors.ResumeTokenStream(tokenMgr), interceptors.OTelStream(), ), ) // 4. 서비스 등록 agentSvc := NewAgentServiceServer(tokenMgr) telemetrySvc := NewTelemetryServiceServer(tokenMgr) agentv1.RegisterAgentServiceServer(srv, agentSvc) telemetryv1.RegisterTelemetryServiceServer(srv, telemetrySvc) // 5. MQTT 클라이언트 초기화 mqttClient, err := NewMQTTClient(cfg.MQTTBroker) if err != nil { return nil, fmt.Errorf("MQTT client: %w", err) } return &Gateway{ cfg: cfg, grpcServer: srv, mqttClient: mqttClient, translator: NewTranslator(agentSvc, telemetrySvc), tokenMgr: tokenMgr, }, nil } // ServeGRPCOverQUIC는 quic-go 리스너 위에서 gRPC 서버를 구동한다. func (g *Gateway) ServeGRPCOverQUIC(ctx context.Context) error { tlsCfg := g.grpcServer.GetServiceInfo() // SPIFFE creds에서 TLS config 추출 _ = tlsCfg // quic-go 리스너 생성 quicListener, err := quic.ListenAddr(g.cfg.GRPCAddr, &tls.Config{ NextProtos: []string{"h2", "h3"}, // gRPC는 h2, HTTP/3는 h3 MinVersion: tls.VersionTLS13, // GetCertificate는 SPIFFE SVID에서 동적으로 제공 }, &quic.Config{ EnableDatagrams: true, MaxIdleTimeout: 30 * 60 * 1e9, // 30분 }) if err != nil { return fmt.Errorf("quic listen: %w", err) } defer quicListener.Close() // quic.Listener를 net.Listener 인터페이스로 래핑 netListener := &quicNetListener{inner: quicListener} // gRPC 서버를 quic-go 리스너 위에서 구동 go func() { <-ctx.Done() g.grpcServer.GracefulStop() }() return g.grpcServer.Serve(netListener) } // quicNetListener는 quic.Listener를 net.Listener로 래핑한다. type quicNetListener struct { inner *quic.Listener } func (l *quicNetListener) Accept() (net.Conn, error) { conn, err := l.inner.Accept(context.Background()) if err != nil { return nil, err } return &quicConn{conn: conn}, nil } func (l *quicNetListener) Close() error { return l.inner.Close() } func (l *quicNetListener) Addr() net.Addr { return l.inner.Addr() } // quicConn은 quic.Connection을 net.Conn으로 래핑한다. // gRPC-Go는 net.Conn 인터페이스를 사용하므로 첫 번째 스트림을 단일 연결로 노출. type quicConn struct { conn quic.Connection stream quic.Stream } func (c *quicConn) Read(b []byte) (int, error) { if c.stream == nil { var err error c.stream, err = c.conn.AcceptStream(context.Background()) if err != nil { return 0, err } } return c.stream.Read(b) } func (c *quicConn) Write(b []byte) (int, error) { if c.stream == nil { return 0, fmt.Errorf("stream not initialized") } return c.stream.Write(b) } func (c *quicConn) Close() error { return c.conn.CloseWithError(0, "closed") } func (c *quicConn) LocalAddr() net.Addr { return c.conn.LocalAddr() } func (c *quicConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *quicConn) SetDeadline(t interface{}) error { return nil } func (c *quicConn) SetReadDeadline(t interface{}) error { return nil } func (c *quicConn) SetWriteDeadline(t interface{}) error { return nil } // defaultDeadlines는 디바이스 클래스별 기본 deadline 정책이다. var defaultDeadlines = map[string]int64{ "agv": 50, // ms "robot": 100, "sensor": 5000, "default": 3000, } ``` > **참고**: 실제 quic-go와 gRPC-Go 통합 시 `grpc.WithContextDialer`를 사용하여 클라이언트 측에서 QUIC 다이얼러를 주입하는 방식이 더 실용적이다. 위 코드는 서버 측 리스너 래핑 패턴을 보여준다. 프로덕션에서는 `quic-go/http3` 패키지의 `ServeListener` 방식 또는 별도 gRPC-QUIC 어댑터 라이브러리(`github.com/open-telemetry/opentelemetry-collector-contrib` 참조)를 활용한다. ### 4.2 MQTT→gRPC 변환 엔진 #### MQTT Topic 네이밍 규칙 ``` telemetry/{site}/{device_id}/{metric} → TelemetryService.Push control/{site}/{device_id}/ota → ControlService.OTAUpdate agent/{site}/{device_id}/task → AgentService.Execute ``` #### `internal/gateway/translator.go` ```go package gateway import ( "context" "encoding/json" "fmt" "strings" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "google.golang.org/protobuf/proto" agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1" telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) // TopicRoute는 MQTT topic 패턴과 핸들러 함수의 매핑이다. type TopicRoute struct { Pattern string Handler func(ctx context.Context, topic string, payload []byte) error } // Translator는 MQTT 메시지를 gRPC 호출로 변환한다. type Translator struct { agentSvc agentv1.AgentServiceServer telemetrySvc telemetryv1.TelemetryServiceServer routes []TopicRoute propagator propagation.TextMapPropagator } // NewTranslator는 변환 엔진을 초기화한다. func NewTranslator( agentSvc agentv1.AgentServiceServer, telemetrySvc telemetryv1.TelemetryServiceServer, ) *Translator { t := &Translator{ agentSvc: agentSvc, telemetrySvc: telemetrySvc, propagator: otel.GetTextMapPropagator(), } t.registerRoutes() return t } // registerRoutes는 MQTT topic 패턴별 핸들러를 등록한다. func (t *Translator) registerRoutes() { t.routes = []TopicRoute{ { Pattern: "telemetry/+/+/+", Handler: t.handleTelemetry, }, { Pattern: "agent/+/+/task", Handler: t.handleAgentTask, }, } } // HandleMessage는 MQTT 메시지를 수신하고 라우팅한다. func (t *Translator) HandleMessage(_ mqtt.Client, msg mqtt.Message) { topic := msg.Topic() payload := msg.Payload() // MQTT UserProperties에서 OTel trace context 추출 ctx := context.Background() // MQTT 5.0 UserProperties를 통한 traceparent 전파는 // paho.mqtt.golang v2(MQTT 5.0 지원)에서 msg.Properties()로 접근 for _, route := range t.routes { if matchTopic(route.Pattern, topic) { if err := route.Handler(ctx, topic, payload); err != nil { // 에러 로깅 (OTel span에 기록) fmt.Printf("translation error [%s]: %v\n", topic, err) } return } } } // handleTelemetry는 telemetry/{site}/{device}/{metric} 토픽을 처리한다. func (t *Translator) handleTelemetry(ctx context.Context, topic string, payload []byte) error { parts := strings.Split(topic, "/") if len(parts) != 4 { return fmt.Errorf("invalid telemetry topic: %s", topic) } site, deviceID, metric := parts[1], parts[2], parts[3] // JSON 페이로드 파싱 (T3 디바이스는 JSON으로 발행) var raw struct { Value float64 `json:"v"` Unit string `json:"u"` TS int64 `json:"ts"` // Unix milliseconds } if err := json.Unmarshal(payload, &raw); err != nil { return fmt.Errorf("json unmarshal: %w", err) } // Protobuf DataPoint 생성 dp := &telemetryv1.DataPoint{ DeviceId: deviceID, Metric: metric, Value: raw.Value, Unit: raw.Unit, Labels: map[string]string{"site": site}, } if raw.TS > 0 { ts := time.UnixMilli(raw.TS) _ = ts // timestamppb 변환 후 dp.Ts에 할당 } // TelemetryService.Push를 내부 호출 batch := &telemetryv1.TelemetryBatch{ SourceGateway: "t2-gateway", Points: []*telemetryv1.DataPoint{dp}, } _, err := t.telemetrySvc.(*TelemetryServiceServer).pushBatch(ctx, batch) return err } // handleAgentTask는 agent/{site}/{device}/task 토픽을 처리한다. func (t *Translator) handleAgentTask(ctx context.Context, topic string, payload []byte) error { parts := strings.Split(topic, "/") if len(parts) != 4 { return fmt.Errorf("invalid agent topic: %s", topic) } deviceID := parts[2] var task agentv1.AgentTask if err := proto.Unmarshal(payload, &task); err != nil { // JSON fallback if err2 := json.Unmarshal(payload, &task); err2 != nil { return fmt.Errorf("unmarshal agent task: %w", err) } } task.AgentId = deviceID _, err := t.agentSvc.Execute(ctx, &task) return err } // matchTopic는 MQTT 와일드카드(+, #) 패턴 매칭을 수행한다. func matchTopic(pattern, topic string) bool { pp := strings.Split(pattern, "/") tp := strings.Split(topic, "/") if len(pp) != len(tp) { if len(pp) > 0 && pp[len(pp)-1] == "#" { return strings.HasPrefix(topic, strings.Join(pp[:len(pp)-1], "/")) } return false } for i, p := range pp { if p != "+" && p != tp[i] { return false } } return true } ``` ### 4.3 Resume Token 구현 Resume Token은 스트리밍 RPC가 중단되었을 때 재연결 후 중단 위치부터 이어받기 위한 서버 발급 토큰이다. #### `internal/gateway/resume_token.go` ```go package gateway import ( "crypto/hmac" "crypto/sha256" "encoding/base64" "encoding/json" "fmt" "sync" "time" ) // ResumeState는 스트리밍 재개 상태를 표현한다. type ResumeState struct { StreamID string `json:"sid"` SequenceNum uint64 `json:"seq"` DeviceID string `json:"did"` Timestamp int64 `json:"ts"` TraceParent string `json:"tp,omitempty"` // W3C traceparent Extra map[string]string `json:"extra,omitempty"` } // ResumeTokenManager는 Resume Token을 발급하고 검증한다. type ResumeTokenManager struct { secret []byte store sync.Map // streamID → *ResumeState } // NewResumeTokenManager는 새 관리자를 초기화한다. func NewResumeTokenManager() *ResumeTokenManager { // 프로덕션에서는 SPIFFE SVID 또는 환경 변수에서 시크릿 로드 return &ResumeTokenManager{ secret: []byte("change-me-in-production"), } } // Issue는 현재 스트리밍 상태를 HMAC 서명된 토큰으로 발급한다. func (m *ResumeTokenManager) Issue(state ResumeState) (string, error) { state.Timestamp = time.Now().Unix() data, err := json.Marshal(state) if err != nil { return "", fmt.Errorf("marshal state: %w", err) } // HMAC-SHA256 서명 mac := hmac.New(sha256.New, m.secret) mac.Write(data) sig := mac.Sum(nil) // base64url(data) + "." + base64url(sig) token := base64.RawURLEncoding.EncodeToString(data) + "." + base64.RawURLEncoding.EncodeToString(sig) // 상태 저장 (메모리 캐시) m.store.Store(state.StreamID, &state) return token, nil } // Verify는 토큰을 검증하고 ResumeState를 반환한다. func (m *ResumeTokenManager) Verify(token string) (*ResumeState, error) { parts := splitOnLast(token, ".") if len(parts) != 2 { return nil, fmt.Errorf("invalid token format") } data, err := base64.RawURLEncoding.DecodeString(parts[0]) if err != nil { return nil, fmt.Errorf("decode data: %w", err) } sig, err := base64.RawURLEncoding.DecodeString(parts[1]) if err != nil { return nil, fmt.Errorf("decode sig: %w", err) } // HMAC 검증 mac := hmac.New(sha256.New, m.secret) mac.Write(data) expected := mac.Sum(nil) if !hmac.Equal(sig, expected) { return nil, fmt.Errorf("invalid token signature") } var state ResumeState if err := json.Unmarshal(data, &state); err != nil { return nil, fmt.Errorf("unmarshal state: %w", err) } // 24시간 만료 체크 if time.Now().Unix()-state.Timestamp > 86400 { return nil, fmt.Errorf("token expired") } return &state, nil } func splitOnLast(s, sep string) []string { idx := len(s) - 1 for idx >= 0 && string(s[idx]) != sep { idx-- } if idx < 0 { return []string{s} } return []string{s[:idx], s[idx+1:]} } ``` #### Resume Token gRPC Interceptor (`internal/gateway/interceptors/resume.go`) ```go package interceptors import ( "context" "google.golang.org/grpc" "google.golang.org/grpc/metadata" "github.com/your-org/edge-aiot-mas/internal/gateway" ) const resumeTokenMetaKey = "x-resume-token" // ResumeTokenUnary는 Unary RPC에서 Resume Token을 처리하는 인터셉터다. func ResumeTokenUnary(mgr *gateway.ResumeTokenManager) grpc.UnaryServerInterceptor { return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { ctx = injectResumeState(ctx, mgr) resp, err := handler(ctx, req) return resp, err } } // ResumeTokenStream는 Streaming RPC에서 Resume Token을 처리하는 인터셉터다. func ResumeTokenStream(mgr *gateway.ResumeTokenManager) grpc.StreamServerInterceptor { return func( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { // 클라이언트가 재연결 시 보내는 Resume Token 검증 ctx := ss.Context() if md, ok := metadata.FromIncomingContext(ctx); ok { if tokens := md.Get(resumeTokenMetaKey); len(tokens) > 0 { state, err := mgr.Verify(tokens[0]) if err == nil { // 검증된 상태를 context에 주입 ctx = context.WithValue(ctx, resumeStateKey{}, state) ss = &resumeServerStream{ServerStream: ss, ctx: ctx} } } } return handler(srv, ss) } } // injectResumeState는 incoming metadata에서 Resume Token을 추출하고 context에 주입한다. func injectResumeState(ctx context.Context, mgr *gateway.ResumeTokenManager) context.Context { md, ok := metadata.FromIncomingContext(ctx) if !ok { return ctx } tokens := md.Get(resumeTokenMetaKey) if len(tokens) == 0 { return ctx } state, err := mgr.Verify(tokens[0]) if err != nil { return ctx } return context.WithValue(ctx, resumeStateKey{}, state) } // GetResumeState는 context에서 ResumeState를 추출한다. func GetResumeState(ctx context.Context) (*gateway.ResumeState, bool) { state, ok := ctx.Value(resumeStateKey{}).(*gateway.ResumeState) return state, ok } type resumeStateKey struct{} // resumeServerStream은 context를 오버라이드한 ServerStream 래퍼다. type resumeServerStream struct { grpc.ServerStream ctx context.Context } func (s *resumeServerStream) Context() context.Context { return s.ctx } ``` ### 4.4 A2A HTTP/3 엔드포인트 #### `internal/gateway/a2a_handler.go` ```go package gateway import ( "encoding/json" "net/http" "github.com/quic-go/quic-go/http3" ) // A2ATask는 Google A2A 프로토콜의 태스크 요청 구조체다. type A2ATask struct { ID string `json:"id"` Message A2AMessage `json:"message"` Metadata map[string]any `json:"metadata,omitempty"` } // A2AMessage는 A2A 메시지 구조체다. type A2AMessage struct { Role string `json:"role"` Parts []A2APart `json:"parts"` } // A2APart는 A2A 메시지의 단일 파트(텍스트, 데이터 등)다. type A2APart struct { Type string `json:"type"` Text string `json:"text,omitempty"` MimeType string `json:"mimeType,omitempty"` Data []byte `json:"data,omitempty"` } // ServeA2AHTTP3는 A2A HTTP/3 서버를 구동한다. func (g *Gateway) ServeA2AHTTP3() error { mux := http.NewServeMux() // A2A 에이전트 카드 (에이전트 능력 공개) mux.HandleFunc("/.well-known/agent.json", g.handleAgentCard) // A2A 태스크 수신 엔드포인트 mux.HandleFunc("/tasks/send", g.handleTaskSend) mux.HandleFunc("/tasks/sendSubscribe", g.handleTaskSendSubscribe) mux.HandleFunc("/tasks/get", g.handleTaskGet) server := &http3.Server{ Addr: g.cfg.A2AAddr, Handler: mux, } return server.ListenAndServeTLS("", "") // TLS config는 SPIFFE에서 주입 } // handleAgentCard는 에이전트의 능력을 JSON으로 반환한다. func (g *Gateway) handleAgentCard(w http.ResponseWriter, r *http.Request) { card := map[string]any{ "name": "T2-Edge-Gateway", "description": "엣지 AIoT T2 게이트웨이 에이전트", "url": "https://" + g.cfg.A2AAddr, "version": "1.0.0", "capabilities": map[string]any{ "streaming": true, "pushNotifications": false, }, "skills": []map[string]any{ { "id": "telemetry_query", "name": "텔레메트리 조회", "description": "T3 디바이스 텔레메트리 데이터를 조회한다", }, { "id": "device_control", "name": "디바이스 제어", "description": "T3 디바이스에 제어 명령을 전달한다", }, }, } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(card) } // handleTaskSend는 A2A 태스크를 수신하고 처리한다. func (g *Gateway) handleTaskSend(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "method not allowed", http.StatusMethodNotAllowed) return } var task A2ATask if err := json.NewDecoder(r.Body).Decode(&task); err != nil { http.Error(w, "invalid request body", http.StatusBadRequest) return } // 태스크를 gRPC AgentService.Execute로 위임 ctx := r.Context() result, err := g.processA2ATask(ctx, &task) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) return } w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(result) } // handleTaskSendSubscribe는 SSE 스트리밍으로 태스크 상태를 푸시한다. func (g *Gateway) handleTaskSendSubscribe(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") // SSE 스트리밍 구현 (생략) flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming not supported", http.StatusInternalServerError) return } _ = flusher // TODO: gRPC Bidi 스트림 → SSE 변환 } func (g *Gateway) processA2ATask(ctx interface{}, task *A2ATask) (map[string]any, error) { // 구현: task.Message.Parts에서 요청 추출 → gRPC 호출 → 결과 반환 return map[string]any{ "id": task.ID, "status": map[string]string{"state": "completed"}, }, nil } ``` --- ## 5. T1 Orchestrator 구현 설계 ### 5.1 Python LangGraph 에이전트 + A2A 클라이언트 #### `python/orchestrator/agent.py` ```python from __future__ import annotations import asyncio import json import os from typing import Annotated, TypedDict import httpx from langchain_core.messages import AIMessage, HumanMessage, SystemMessage from langchain_openai import ChatOpenAI from langgraph.checkpoint.memory import MemorySaver from langgraph.graph import END, StateGraph from langgraph.graph.message import add_messages from .a2a_client import A2AClient from .tools.telemetry_tool import TelemetryQueryTool from .tools.control_tool import DeviceControlTool class OrchestratorState(TypedDict): """오케스트레이터 에이전트 상태.""" messages: Annotated[list, add_messages] site_id: str active_tasks: dict[str, str] # task_id → status # 에이전트가 사용할 도구 목록 tools = [TelemetryQueryTool(), DeviceControlTool()] # LLM 초기화 (모델은 환경 변수로 교체 가능) llm = ChatOpenAI( model=os.getenv("LLM_MODEL", "gpt-4o"), temperature=0, ).bind_tools(tools) def should_continue(state: OrchestratorState) -> str: """도구 호출 여부에 따라 다음 노드를 결정한다.""" last = state["messages"][-1] if isinstance(last, AIMessage) and last.tool_calls: return "tools" return END async def call_model(state: OrchestratorState) -> OrchestratorState: """LLM을 호출하고 응답을 상태에 추가한다.""" system = SystemMessage(content=( "당신은 엣지 AIoT 오케스트레이터입니다. " "T2 게이트웨이 에이전트를 통해 현장 디바이스를 조율합니다. " f"현재 사이트: {state['site_id']}" )) messages = [system] + state["messages"] response = await llm.ainvoke(messages) return {"messages": [response]} async def call_tools(state: OrchestratorState) -> OrchestratorState: """도구를 실행하고 결과를 상태에 추가한다.""" from langchain_core.messages import ToolMessage last = state["messages"][-1] results = [] for tool_call in last.tool_calls: # 도구 이름으로 실제 도구 인스턴스 검색 tool = next((t for t in tools if t.name == tool_call["name"]), None) if tool is None: content = f"Unknown tool: {tool_call['name']}" else: try: content = await tool.arun(tool_call["args"]) except Exception as e: content = f"Tool error: {e}" results.append(ToolMessage( content=str(content), tool_call_id=tool_call["id"], )) return {"messages": results} def build_graph() -> StateGraph: """LangGraph 오케스트레이터 그래프를 빌드한다.""" builder = StateGraph(OrchestratorState) builder.add_node("agent", call_model) builder.add_node("tools", call_tools) builder.set_entry_point("agent") builder.add_conditional_edges("agent", should_continue) builder.add_edge("tools", "agent") return builder.compile(checkpointer=MemorySaver()) # 그래프 싱글턴 graph = build_graph() async def run_orchestrator(user_input: str, site_id: str, thread_id: str) -> str: """오케스트레이터를 실행하고 최종 응답을 반환한다.""" config = {"configurable": {"thread_id": thread_id}} initial_state: OrchestratorState = { "messages": [HumanMessage(content=user_input)], "site_id": site_id, "active_tasks": {}, } final_state = await graph.ainvoke(initial_state, config=config) last_message = final_state["messages"][-1] return last_message.content if hasattr(last_message, "content") else str(last_message) ``` #### `python/orchestrator/a2a_client.py` ```python from __future__ import annotations import asyncio import json import uuid from typing import AsyncIterator import httpx class A2AClient: """T2 Edge Gateway의 A2A HTTP/3 엔드포인트를 호출하는 클라이언트.""" def __init__(self, base_url: str, timeout: float = 30.0): self.base_url = base_url.rstrip("/") # HTTP/3(QUIC) 지원을 위해 httpx + h3 백엔드 사용 # 실험 환경에서는 HTTP/2로 폴백 가능 self._client = httpx.AsyncClient( http2=True, timeout=timeout, verify=False, # 개발 환경 — 프로덕션에서는 SPIFFE SVID CA 적용 ) async def send_task(self, message: str, skill_id: str | None = None) -> dict: """태스크를 T2 게이트웨이로 전송하고 결과를 반환한다.""" task = { "id": str(uuid.uuid4()), "message": { "role": "user", "parts": [{"type": "text", "text": message}], }, "metadata": {"skill_id": skill_id} if skill_id else {}, } resp = await self._client.post( f"{self.base_url}/tasks/send", json=task, headers={"Content-Type": "application/json"}, ) resp.raise_for_status() return resp.json() async def send_task_streaming( self, message: str, skill_id: str | None = None ) -> AsyncIterator[dict]: """SSE 스트리밍으로 태스크 진행 상태를 수신한다.""" task = { "id": str(uuid.uuid4()), "message": { "role": "user", "parts": [{"type": "text", "text": message}], }, } async with self._client.stream( "POST", f"{self.base_url}/tasks/sendSubscribe", json=task, ) as resp: resp.raise_for_status() async for line in resp.aiter_lines(): if line.startswith("data: "): data = json.loads(line[6:]) yield data async def get_agent_card(self) -> dict: """T2 게이트웨이의 에이전트 카드를 조회한다.""" resp = await self._client.get(f"{self.base_url}/.well-known/agent.json") resp.raise_for_status() return resp.json() async def close(self): await self._client.aclose() ``` ### 5.2 Go gRPC-over-QUIC 클라이언트 코드 #### `internal/cloud/grpc_quic_client.go` ```go package cloud import ( "context" "crypto/tls" "fmt" "net" "github.com/quic-go/quic-go" "google.golang.org/grpc" "google.golang.org/grpc/credentials" agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1" telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1" ) // QUICDialer는 quic-go를 사용하여 gRPC 연결을 수립하는 다이얼러다. type QUICDialer struct { tlsConfig *tls.Config } // NewQUICDialer는 TLS 설정으로 QUIC 다이얼러를 초기화한다. func NewQUICDialer(tlsConfig *tls.Config) *QUICDialer { return &QUICDialer{tlsConfig: tlsConfig} } // DialContext는 gRPC의 WithContextDialer에 전달될 다이얼 함수다. func (d *QUICDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) { // QUIC connection 수립 conn, err := quic.DialAddr(ctx, addr, d.tlsConfig, &quic.Config{ EnableDatagrams: true, MaxIdleTimeout: 30 * 60 * 1e9, }) if err != nil { return nil, fmt.Errorf("quic dial %s: %w", addr, err) } // 첫 번째 스트림 개방 (gRPC framing에 사용) stream, err := conn.OpenStreamSync(ctx) if err != nil { conn.CloseWithError(0, "stream open failed") return nil, fmt.Errorf("open quic stream: %w", err) } return &quicStreamConn{ conn: conn, stream: stream, }, nil } // quicStreamConn은 QUIC 스트림을 net.Conn으로 노출한다. type quicStreamConn struct { conn quic.Connection stream quic.Stream } func (c *quicStreamConn) Read(b []byte) (int, error) { return c.stream.Read(b) } func (c *quicStreamConn) Write(b []byte) (int, error) { return c.stream.Write(b) } func (c *quicStreamConn) Close() error { c.stream.Close() return c.conn.CloseWithError(0, "client closed") } func (c *quicStreamConn) LocalAddr() net.Addr { return c.conn.LocalAddr() } func (c *quicStreamConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() } func (c *quicStreamConn) SetDeadline(t interface{}) error { return nil } func (c *quicStreamConn) SetReadDeadline(t interface{}) error { return nil } func (c *quicStreamConn) SetWriteDeadline(t interface{}) error { return nil } // T1Client는 T1 클라우드에서 T2 게이트웨이로 연결하는 gRPC 클라이언트다. type T1Client struct { conn *grpc.ClientConn agentClient agentv1.AgentServiceClient telemetryClient telemetryv1.TelemetryServiceClient } // NewT1Client는 T2 게이트웨이에 gRPC over QUIC 연결을 수립한다. func NewT1Client(ctx context.Context, gatewayAddr string, tlsConfig *tls.Config) (*T1Client, error) { dialer := NewQUICDialer(tlsConfig) conn, err := grpc.DialContext( ctx, gatewayAddr, grpc.WithContextDialer(dialer.DialContext), grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)), // Retry policy: 무선 손실 시 지수 백오프 grpc.WithDefaultServiceConfig(`{ "methodConfig": [{ "name": [{}], "retryPolicy": { "maxAttempts": 5, "initialBackoff": "0.5s", "maxBackoff": "30s", "backoffMultiplier": 2, "retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"] } }] }`), ) if err != nil { return nil, fmt.Errorf("grpc dial: %w", err) } return &T1Client{ conn: conn, agentClient: agentv1.NewAgentServiceClient(conn), telemetryClient: telemetryv1.NewTelemetryServiceClient(conn), }, nil } // ExecuteTask는 T2 게이트웨이에 AgentTask를 전송한다. func (c *T1Client) ExecuteTask(ctx context.Context, task *agentv1.AgentTask) (*agentv1.AgentResult, error) { return c.agentClient.Execute(ctx, task) } // StreamTelemetry는 T2 게이트웨이에서 텔레메트리 스트림을 구독한다. func (c *T1Client) StreamTelemetry( ctx context.Context, query *telemetryv1.TelemetryQuery, handler func(*telemetryv1.DataPoint) error, ) error { stream, err := c.telemetryClient.Stream(ctx, query) if err != nil { return fmt.Errorf("stream start: %w", err) } for { dp, err := stream.Recv() if err != nil { return fmt.Errorf("stream recv: %w", err) } if err := handler(dp); err != nil { return err } } } // Close는 gRPC 연결을 종료한다. func (c *T1Client) Close() error { return c.conn.Close() } ``` --- ## 6. T3 Field Agent 구현 설계 ### 6.1 MQTT 텔레메트리 발행 #### `internal/field/mqtt_publisher.go` ```go package field import ( "context" "encoding/json" "fmt" "time" mqtt "github.com/eclipse/paho.mqtt.golang" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/propagation" ) // TelemetryPayload는 T3 디바이스가 발행하는 텔레메트리 JSON 구조체다. type TelemetryPayload struct { Value float64 `json:"v"` Unit string `json:"u"` TS int64 `json:"ts"` // Unix milliseconds Props map[string]string `json:"p,omitempty"` // MQTT 5.0 UserProperties 대체 } // FieldConfig는 T3 Field Agent 설정을 담는다. type FieldConfig struct { DeviceID string SiteID string MQTTBroker string MQTTPort int OfflineDB string // BoltDB 파일 경로 } // FieldAgent는 T3 현장 에이전트다. type FieldAgent struct { cfg FieldConfig mqttClient mqtt.Client offlineQueue *OfflineQueue tracer interface{} // OTel tracer } // NewFieldAgent는 Field Agent를 초기화한다. func NewFieldAgent(cfg FieldConfig) (*FieldAgent, error) { // MQTT 클라이언트 옵션 설정 opts := mqtt.NewClientOptions(). AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTBroker, cfg.MQTTPort)). SetClientID(fmt.Sprintf("field-%s", cfg.DeviceID)). SetAutoReconnect(true). SetConnectRetry(true). SetConnectRetryInterval(5 * time.Second). SetMaxReconnectInterval(60 * time.Second). SetCleanSession(false). // 오프라인 중 QoS 1 메시지 보존 SetResumeSubs(true) // 연결 로스트 핸들러 opts.SetConnectionLostHandler(func(client mqtt.Client, err error) { fmt.Printf("[%s] MQTT connection lost: %v\n", cfg.DeviceID, err) }) // 재연결 핸들러 opts.SetOnConnectHandler(func(client mqtt.Client) { fmt.Printf("[%s] MQTT reconnected\n", cfg.DeviceID) }) client := mqtt.NewClient(opts) if token := client.Connect(); token.Wait() && token.Error() != nil { return nil, fmt.Errorf("mqtt connect: %w", token.Error()) } offlineQueue, err := NewOfflineQueue(cfg.OfflineDB) if err != nil { return nil, fmt.Errorf("offline queue: %w", err) } return &FieldAgent{ cfg: cfg, mqttClient: client, offlineQueue: offlineQueue, }, nil } // PublishTelemetry는 텔레메트리 데이터를 MQTT로 발행한다. // 오프라인 상태라면 로컬 큐에 저장한다. func (a *FieldAgent) PublishTelemetry(ctx context.Context, metric string, value float64, unit string) error { topic := fmt.Sprintf("telemetry/%s/%s/%s", a.cfg.SiteID, a.cfg.DeviceID, metric) // OTel trace context를 페이로드 Props에 포함 carrier := propagation.MapCarrier{} otel.GetTextMapPropagator().Inject(ctx, carrier) payload := TelemetryPayload{ Value: value, Unit: unit, TS: time.Now().UnixMilli(), Props: carrier, // traceparent 등 포함 } data, err := json.Marshal(payload) if err != nil { return fmt.Errorf("marshal payload: %w", err) } // MQTT 연결 상태 확인 if !a.mqttClient.IsConnected() { // 오프라인 큐에 저장 return a.offlineQueue.Enqueue(topic, data) } // QoS 1로 발행 (전달 보장) token := a.mqttClient.Publish(topic, 1, false, data) token.Wait() if err := token.Error(); err != nil { // 발행 실패 시 오프라인 큐로 폴백 if qErr := a.offlineQueue.Enqueue(topic, data); qErr != nil { return fmt.Errorf("publish failed and queue failed: %v, %v", err, qErr) } return nil // 큐에 저장 성공 } return nil } // FlushOfflineQueue는 재연결 후 오프라인 큐를 드레인한다. func (a *FieldAgent) FlushOfflineQueue(ctx context.Context) error { if !a.mqttClient.IsConnected() { return fmt.Errorf("not connected") } return a.offlineQueue.Drain(func(topic string, data []byte) error { token := a.mqttClient.Publish(topic, 1, false, data) token.Wait() return token.Error() }) } ``` ### 6.2 오프라인 큐 (BoltDB) #### `internal/field/offline_queue.go` ```go package field import ( "encoding/binary" "fmt" "time" bolt "go.etcd.io/bbolt" ) var queueBucket = []byte("offline_queue") // QueueEntry는 오프라인 큐의 단일 항목이다. type QueueEntry struct { Topic string Payload []byte EnqueuedAt time.Time } // OfflineQueue는 BoltDB 기반 오프라인 메시지 큐다. type OfflineQueue struct { db *bolt.DB } // NewOfflineQueue는 BoltDB 파일을 열고 큐 버킷을 초기화한다. func NewOfflineQueue(path string) (*OfflineQueue, error) { db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second}) if err != nil { return nil, fmt.Errorf("bolt open %s: %w", path, err) } if err := db.Update(func(tx *bolt.Tx) error { _, err := tx.CreateBucketIfNotExists(queueBucket) return err }); err != nil { return nil, fmt.Errorf("create bucket: %w", err) } return &OfflineQueue{db: db}, nil } // Enqueue는 메시지를 큐에 추가한다. func (q *OfflineQueue) Enqueue(topic string, payload []byte) error { return q.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(queueBucket) // 자동 증가 시퀀스 키 생성 seq, err := b.NextSequence() if err != nil { return fmt.Errorf("next sequence: %w", err) } key := make([]byte, 8) binary.BigEndian.PutUint64(key, seq) // topic + null + payload 직렬화 value := append([]byte(topic+"\x00"), payload...) return b.Put(key, value) }) } // Drain은 큐의 모든 항목을 순서대로 처리하고 성공한 항목을 삭제한다. func (q *OfflineQueue) Drain(handler func(topic string, payload []byte) error) error { return q.db.Update(func(tx *bolt.Tx) error { b := tx.Bucket(queueBucket) var toDelete [][]byte if err := b.ForEach(func(k, v []byte) error { // topic 분리 for i, c := range v { if c == 0x00 { topic := string(v[:i]) payload := v[i+1:] if err := handler(topic, payload); err != nil { return err // 실패 시 드레인 중단 } toDelete = append(toDelete, append([]byte{}, k...)) return nil } } return fmt.Errorf("invalid queue entry") }); err != nil { return err } // 성공한 항목 삭제 for _, key := range toDelete { if err := b.Delete(key); err != nil { return err } } return nil }) } // Size는 큐에 저장된 항목 수를 반환한다. func (q *OfflineQueue) Size() (int, error) { var count int err := q.db.View(func(tx *bolt.Tx) error { b := tx.Bucket(queueBucket) count = b.Stats().KeyN return nil }) return count, err } // Close는 BoltDB를 닫는다. func (q *OfflineQueue) Close() error { return q.db.Close() } ``` --- ## 7. SPIFFE/SPIRE 통합 ### 7.1 아키텍처 개요 ``` SPIRE Server (T1) ├── Registration Entry: T1 에이전트 (k8s SA) ├── Registration Entry: T2 게이트웨이 (HW serial) └── Registration Entry: T3 디바이스 (TPM attestation) SPIRE Agent (각 노드에 DaemonSet/프로세스로 실행) └── Workload API (Unix socket) └── Go Workload (SVID 자동 갱신) ``` ### 7.2 Go SDK 사용법 #### `internal/security/spiffe.go` ```go package security import ( "context" "fmt" "github.com/spiffe/go-spiffe/v2/spiffeid" "github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig" "github.com/spiffe/go-spiffe/v2/workloadapi" "google.golang.org/grpc/credentials" ) // SPIFFECredentials는 SPIRE Workload API에서 SVID를 가져와 // gRPC TransportCredentials를 생성한다. type SPIFFECredentials struct { source *workloadapi.X509Source } // NewSPIFFECredentials는 SPIRE Agent 소켓에 연결하고 // X.509 SVID 소스를 초기화한다. func NewSPIFFECredentials(ctx context.Context, socketPath string) (*SPIFFECredentials, error) { // SPIRE Agent Workload API에 연결 source, err := workloadapi.NewX509Source( ctx, workloadapi.WithClientOptions( workloadapi.WithAddr("unix://" + socketPath), ), ) if err != nil { return nil, fmt.Errorf("workload API connect: %w", err) } return &SPIFFECredentials{source: source}, nil } // ServerCredentials는 gRPC 서버용 mTLS 자격증명을 반환한다. // 클라이언트 SVID를 trustDomain으로 검증한다. func (s *SPIFFECredentials) ServerCredentials(trustDomain string) (credentials.TransportCredentials, error) { td, err := spiffeid.TrustDomainFromString(trustDomain) if err != nil { return nil, fmt.Errorf("trust domain: %w", err) } tlsCfg := tlsconfig.MTLSServerConfig( s.source, s.source, tlsconfig.AuthorizeMemberOf(td), ) return credentials.NewTLS(tlsCfg), nil } // ClientCredentials는 gRPC 클라이언트용 mTLS 자격증명을 반환한다. // 서버 SPIFFE ID를 serverID로 검증한다. func (s *SPIFFECredentials) ClientCredentials(serverSpiffeID string) (credentials.TransportCredentials, error) { serverID, err := spiffeid.IDFromString(serverSpiffeID) if err != nil { return nil, fmt.Errorf("server SPIFFE ID: %w", err) } tlsCfg := tlsconfig.MTLSClientConfig( s.source, s.source, tlsconfig.AuthorizeID(serverID), ) return credentials.NewTLS(tlsCfg), nil } // Close는 X509Source를 닫는다. func (s *SPIFFECredentials) Close() error { return s.source.Close() } ``` ### 7.3 SVID 발급 흐름 ``` T3 Field Agent 부팅 │ ├─► TPM Quote 생성 (PCR 측정값) │ ├─► SPIRE Agent에 Attestation 요청 (TPM 플러그인) │ └── SPIRE Agent → SPIRE Server 검증 │ ├─► SPIRE Server: Registration Entry 매칭 │ └── spiffe://iot/{org}/{factory}/device/{serial} │ ├─► X.509-SVID 발급 (TTL: 1시간) │ ├── Subject: spiffe://iot/... │ └── SAN: URI = SPIFFE ID │ └─► Workload API를 통해 Go 워크로드에 SVID 제공 └── go-spiffe SDK가 자동 갱신 (만료 10분 전) ``` ### 7.4 SPIRE Registration Entry 예시 ```bash # T2 게이트웨이 등록 (하드웨어 시리얼 기반) spire-server entry create \ -spiffeID spiffe://edge-aiot/t2/gateway/GW-001 \ -parentID spiffe://edge-aiot/spire-agent \ -selector unix:uid:1000 \ -selector aws_iid:instance-id:i-0abc123 # T3 Field Agent 등록 (TPM attestation) spire-server entry create \ -spiffeID spiffe://edge-aiot/t3/factory-a/device/SENSOR-001 \ -parentID spiffe://edge-aiot/spire-agent \ -selector tpm:pub_hash:sha256:abcdef... ``` --- ## 8. OpenTelemetry 통합 ### 8.1 TracerProvider 초기화 #### `internal/observability/tracer.go` ```go package observability import ( "context" "fmt" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.26.0" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) // InitTracer는 OTel TracerProvider를 초기화하고 글로벌에 등록한다. // otlpEndpoint: OTel Collector OTLP gRPC 엔드포인트 (예: "otel-collector:4317") func InitTracer(ctx context.Context, serviceName, otlpEndpoint string) (func(context.Context) error, error) { // OTLP gRPC exporter 생성 conn, err := grpc.DialContext(ctx, otlpEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), ) if err != nil { return nil, fmt.Errorf("grpc dial otlp: %w", err) } exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) if err != nil { return nil, fmt.Errorf("otlp exporter: %w", err) } // 리소스 속성 (서비스 이름, 버전, 배포 환경) res, err := resource.New(ctx, resource.WithAttributes( semconv.ServiceName(serviceName), semconv.ServiceVersion("0.1.0"), ), resource.WithOS(), resource.WithHost(), ) if err != nil { return nil, fmt.Errorf("resource: %w", err) } // TracerProvider 생성 tp := sdktrace.NewTracerProvider( sdktrace.WithBatcher(exporter), sdktrace.WithResource(res), sdktrace.WithSampler(sdktrace.AlwaysSample()), // 실험 환경 — 프로덕션은 ParentBased ) // 글로벌 TracerProvider 및 Propagator 설정 otel.SetTracerProvider(tp) otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator( propagation.TraceContext{}, // W3C traceparent/tracestate propagation.Baggage{}, )) // 종료 함수 반환 (defer로 호출) return tp.Shutdown, nil } ``` ### 8.2 gRPC Interceptor에서 Trace Propagation #### `internal/gateway/interceptors/otel.go` ```go package interceptors import ( "context" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/metadata" ) // OTelUnary는 Unary RPC에 OTel trace를 주입하는 인터셉터다. // otelgrpc.UnaryServerInterceptor()를 래핑하여 Resume Token trace 연속성을 추가한다. func OTelUnary() grpc.UnaryServerInterceptor { base := otelgrpc.UnaryServerInterceptor() return func( ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, ) (interface{}, error) { // Resume Token에 포함된 traceparent로 trace 연속성 복원 ctx = restoreTraceFromResumeToken(ctx) return base(ctx, req, info, handler) } } // OTelStream는 Streaming RPC에 OTel trace를 주입하는 인터셉터다. func OTelStream() grpc.StreamServerInterceptor { base := otelgrpc.StreamServerInterceptor() return func( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler, ) error { ctx := restoreTraceFromResumeToken(ss.Context()) wrapped := &otelServerStream{ServerStream: ss, ctx: ctx} return base(srv, wrapped, info, handler) } } // restoreTraceFromResumeToken은 Resume Token에 포함된 traceparent를 // context에 복원하여 단절-재연결 후에도 trace 체인이 유지되도록 한다. func restoreTraceFromResumeToken(ctx context.Context) context.Context { state, ok := GetResumeState(ctx) if !ok || state.TraceParent == "" { return ctx } // traceparent를 metadata carrier로 변환하여 propagate carrier := propagation.MapCarrier{ "traceparent": state.TraceParent, } return otel.GetTextMapPropagator().Extract(ctx, carrier) } // InjectResumeTraceParent는 현재 span의 traceparent를 ResumeState에 저장한다. // Resume Token 발급 시 호출한다. func InjectResumeTraceParent(ctx context.Context) string { carrier := propagation.MapCarrier{} otel.GetTextMapPropagator().Inject(ctx, carrier) return carrier.Get("traceparent") } // T2FanOutSpan은 T3 디바이스에서 전달된 trace를 T2 게이트웨이에서 // fan-out 지점으로 처리하는 span을 생성한다. func T2FanOutSpan(ctx context.Context, deviceID, metric string) (context.Context, trace.Span) { tracer := otel.Tracer("t2-gateway") return tracer.Start(ctx, "t2.fanout", trace.WithAttributes( attribute.String("device.id", deviceID), attribute.String("telemetry.metric", metric), ), ) } // RecordError는 gRPC 에러를 span에 기록한다. func RecordError(span trace.Span, err error) { if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) } } type otelServerStream struct { grpc.ServerStream ctx context.Context } func (s *otelServerStream) Context() context.Context { return s.ctx } ``` ### 8.3 MQTT→gRPC Trace 연속성 패턴 T3 디바이스가 MQTT로 발행할 때 UserProperties(MQTT 5.0) 또는 페이로드 필드로 `traceparent`를 포함시키고, T2 게이트웨이의 `Translator`가 이를 gRPC metadata로 변환하여 T1까지 trace 체인을 연결한다. ```go // MQTT 페이로드의 Props 필드에서 traceparent 추출 후 context 복원 예시 func extractTraceFromMQTTPayload(payload TelemetryPayload) context.Context { ctx := context.Background() if len(payload.Props) == 0 { return ctx } carrier := propagation.MapCarrier(payload.Props) return otel.GetTextMapPropagator().Extract(ctx, carrier) } ``` --- ## 9. Docker Compose 실험 환경 설계 ### `deployments/docker-compose.yml` ```yaml version: "3.9" networks: t1-net: driver: bridge t2-net: driver: bridge t3-net: driver: bridge obs-net: driver: bridge volumes: spire-data: prometheus-data: grafana-data: services: # ───────────────────────────────────────────────────── # T1: 클라우드 계층 # ───────────────────────────────────────────────────── cloud-server: build: context: .. dockerfile: deployments/Dockerfile.go args: CMD: cmd/cloud-server image: edge-aiot/cloud-server:dev container_name: t1-cloud-server networks: [t1-net, obs-net] ports: - "9443:9443" # gRPC over QUIC environment: - GRPC_ADDR=:9443 - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 - SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock volumes: - /run/spire/t1/sockets:/run/spire/sockets:ro depends_on: - spire-server - otel-collector t1-orchestrator: build: context: ../python dockerfile: ../deployments/Dockerfile.python image: edge-aiot/orchestrator:dev container_name: t1-orchestrator networks: [t1-net, obs-net] ports: - "8080:8080" # FastAPI 관리 인터페이스 environment: - GATEWAY_A2A_URL=https://t2-gateway:8443 - CLOUD_GRPC_ADDR=cloud-server:9443 - LLM_MODEL=gpt-4o - OPENAI_API_KEY=${OPENAI_API_KEY} - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 depends_on: - cloud-server # ───────────────────────────────────────────────────── # T2: 엣지 계층 # ───────────────────────────────────────────────────── t2-gateway: build: context: .. dockerfile: deployments/Dockerfile.go args: CMD: cmd/gateway image: edge-aiot/gateway:dev container_name: t2-gateway networks: [t1-net, t2-net, t3-net, obs-net] ports: - "19443:9443" # gRPC over QUIC (T1 방향) - "18443:8443" # A2A HTTP/3 environment: - GRPC_ADDR=:9443 - A2A_ADDR=:8443 - MQTT_BROKER=mqtt-broker - MQTT_PORT=1883 - T1_GRPC_ADDR=cloud-server:9443 - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 - SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock volumes: - /run/spire/t2/sockets:/run/spire/sockets:ro depends_on: - mqtt-broker - spire-server - otel-collector # ───────────────────────────────────────────────────── # T3: 현장 계층 (복수 인스턴스) # ───────────────────────────────────────────────────── t3-field-agent-1: build: context: .. dockerfile: deployments/Dockerfile.go args: CMD: cmd/field-agent image: edge-aiot/field-agent:dev container_name: t3-field-agent-1 networks: [t3-net, obs-net] environment: - DEVICE_ID=SENSOR-001 - SITE_ID=factory-a - MQTT_BROKER=mqtt-broker - MQTT_PORT=1883 - OFFLINE_DB=/data/queue.db - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317 volumes: - ./data/agent-1:/data depends_on: - mqtt-broker t3-field-agent-2: extends: service: t3-field-agent-1 container_name: t3-field-agent-2 environment: - DEVICE_ID=SENSOR-002 - SITE_ID=factory-a - MQTT_BROKER=mqtt-broker - MQTT_PORT=1883 - OFFLINE_DB=/data/queue.db volumes: - ./data/agent-2:/data t3-field-agent-3: extends: service: t3-field-agent-1 container_name: t3-field-agent-3 environment: - DEVICE_ID=AGV-001 - SITE_ID=factory-a - MQTT_BROKER=mqtt-broker - MQTT_PORT=1883 - OFFLINE_DB=/data/queue.db volumes: - ./data/agent-3:/data # ───────────────────────────────────────────────────── # 인프라: MQTT 브로커 # ───────────────────────────────────────────────────── mqtt-broker: image: eclipse-mosquitto:2.0 container_name: mqtt-broker networks: [t2-net, t3-net] ports: - "1883:1883" - "9001:9001" # WebSocket volumes: - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro # ───────────────────────────────────────────────────── # 보안: SPIRE # ───────────────────────────────────────────────────── spire-server: image: ghcr.io/spiffe/spire-server:1.9.0 container_name: spire-server networks: [t1-net] ports: - "8081:8081" volumes: - ./spire/server.conf:/etc/spire/server/server.conf:ro - spire-data:/var/lib/spire command: ["-config", "/etc/spire/server/server.conf"] spire-agent-t1: image: ghcr.io/spiffe/spire-agent:1.9.0 container_name: spire-agent-t1 networks: [t1-net] volumes: - ./spire/agent.conf:/etc/spire/agent/agent.conf:ro - /run/spire/t1/sockets:/run/spire/sockets depends_on: [spire-server] command: ["-config", "/etc/spire/agent/agent.conf"] spire-agent-t2: image: ghcr.io/spiffe/spire-agent:1.9.0 container_name: spire-agent-t2 networks: [t2-net] volumes: - ./spire/agent.conf:/etc/spire/agent/agent.conf:ro - /run/spire/t2/sockets:/run/spire/sockets depends_on: [spire-server] command: ["-config", "/etc/spire/agent/agent.conf"] # ───────────────────────────────────────────────────── # 관측성: OTel Collector + Prometheus + Grafana + Jaeger # ───────────────────────────────────────────────────── otel-collector: image: otel/opentelemetry-collector-contrib:0.100.0 container_name: otel-collector networks: [t1-net, t2-net, t3-net, obs-net] ports: - "4317:4317" # OTLP gRPC - "4318:4318" # OTLP HTTP - "8889:8889" # Prometheus metrics exporter volumes: - ./otel/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro command: ["--config", "/etc/otelcol/config.yaml"] jaeger: image: jaegertracing/all-in-one:1.57 container_name: jaeger networks: [obs-net] ports: - "16686:16686" # Jaeger UI - "14250:14250" # gRPC (OTel Collector → Jaeger) prometheus: image: prom/prometheus:v2.51.0 container_name: prometheus networks: [obs-net] ports: - "9090:9090" volumes: - ./otel/prometheus.yml:/etc/prometheus/prometheus.yml:ro - prometheus-data:/prometheus grafana: image: grafana/grafana:10.4.0 container_name: grafana networks: [obs-net] ports: - "3000:3000" environment: - GF_SECURITY_ADMIN_PASSWORD=admin - GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/edge-aiot.json volumes: - grafana-data:/var/lib/grafana - ./grafana/dashboards:/var/lib/grafana/dashboards:ro depends_on: - prometheus - jaeger ``` ### `deployments/Dockerfile.go` ```dockerfile # 멀티 스테이지 빌드 — 최소 런타임 이미지 FROM golang:1.22-alpine AS builder WORKDIR /app COPY go.mod go.sum ./ RUN go mod download COPY . . ARG CMD=cmd/gateway RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/app ./${CMD}/... FROM gcr.io/distroless/static-debian12 COPY --from=builder /bin/app /app ENTRYPOINT ["/app"] ``` ### `deployments/otel/otel-collector-config.yaml` (핵심 스니펫) ```yaml receivers: otlp: protocols: grpc: endpoint: 0.0.0.0:4317 http: endpoint: 0.0.0.0:4318 processors: batch: timeout: 1s send_batch_size: 1024 memory_limiter: check_interval: 1s limit_mib: 512 exporters: jaeger: endpoint: jaeger:14250 tls: insecure: true prometheus: endpoint: "0.0.0.0:8889" service: pipelines: traces: receivers: [otlp] processors: [memory_limiter, batch] exporters: [jaeger] metrics: receivers: [otlp] processors: [memory_limiter, batch] exporters: [prometheus] ``` --- ## 10. 구현 단계별 계획 (Phase) ### Phase 1: Proto 정의 + 기본 gRPC over QUIC (2주) **목표**: 컴파일 가능한 Proto + quic-go 위에서 gRPC 서버/클라이언트 통신 검증 | 주 | 작업 항목 | 산출물 | |----|---------|--------| | 1 | Proto 파일 설계 및 `buf generate` 설정 | `proto/`, `gen/` 디렉터리 | | 1 | Go 모듈 초기화, 의존성 설치 | `go.mod`, `go.sum` | | 2 | quic-go + gRPC-Go 통합 (QuicNetListener 구현) | `internal/gateway/server.go` | | 2 | T1 gRPC-over-QUIC 클라이언트 구현 | `internal/cloud/grpc_quic_client.go` | | 2 | 기본 AgentService Unary RPC 동작 검증 | `tests/integration/basic_grpc_test.go` | **완료 기준**: `go test ./tests/integration/... -run TestBasicUnaryRPC` 통과 ### Phase 2: MQTT 변환 게이트웨이 (2주) **목표**: T3 MQTT 발행 → T2 변환 → T1 gRPC 전달 end-to-end 동작 | 주 | 작업 항목 | 산출물 | |----|---------|--------| | 3 | MQTT 클라이언트 구현 + Topic 라우팅 | `internal/gateway/mqtt_client.go` | | 3 | MQTT→gRPC 변환 엔진 (`Translator`) | `internal/gateway/translator.go` | | 4 | T3 Field Agent (MQTT 발행 + BoltDB 오프라인 큐) | `internal/field/` 패키지 | | 4 | Docker Compose 기본 환경 (T1+T2+T3+Mosquitto) | `deployments/docker-compose.yml` | | 4 | 오프라인 시나리오 테스트 (MQTT 브로커 중단 후 재연결) | `tests/integration/offline_test.go` | **완료 기준**: T3 → MQTT → T2 Translator → gRPC → T1 텔레메트리 수신 확인 ### Phase 3: A2A 통합 + Resume Token (2주) **목표**: A2A HTTP/3 엔드포인트 + Resume Token 기반 스트리밍 재개 동작 | 주 | 작업 항목 | 산출물 | |----|---------|--------| | 5 | Resume Token Manager + gRPC Interceptor | `internal/gateway/resume_token.go`, `interceptors/resume.go` | | 5 | A2A HTTP/3 핸들러 (quic-go http3 패키지) | `internal/gateway/a2a_handler.go` | | 6 | Python A2A 클라이언트 + LangGraph 에이전트 | `python/orchestrator/` | | 6 | 스트리밍 중단-재개 통합 테스트 | `tests/integration/resume_test.go` | **완료 기준**: 스트리밍 중 네트워크 차단 → 재연결 → Resume Token으로 이어받기 검증 ### Phase 4: SPIFFE/SPIRE + OTel (2주) **목표**: mTLS(SPIFFE SVID) + 분산 트레이싱 end-to-end 동작 | 주 | 작업 항목 | 산출물 | |----|---------|--------| | 7 | SPIRE Server/Agent Docker Compose 통합 | `deployments/docker-compose.spire.yml` | | 7 | go-spiffe SDK 래퍼 + gRPC credentials 교체 | `internal/security/spiffe.go` | | 8 | OTel TracerProvider + otelgrpc 인터셉터 | `internal/observability/tracer.go` | | 8 | MQTT→gRPC trace propagation (traceparent in payload Props) | `internal/gateway/interceptors/otel.go` | | 8 | Jaeger에서 T3→T2→T1 trace 체인 시각화 확인 | - | **완료 기준**: Jaeger에서 단일 MQTT 발행의 전체 트레이스(T3→T2→T1) 확인 ### Phase 5: 실험 검증 (2주) **목표**: 성능 지표 측정, 단절 내성 검증, 문서화 | 주 | 작업 항목 | 산출물 | |----|---------|--------| | 9 | 성능 벤치마크 (gRPC vs REST 비교, latency/throughput) | `tests/perf/benchmark_test.go` | | 9 | 네트워크 파티션 시나리오 (tc netem으로 손실·지연 주입) | chaos 테스트 스크립트 | | 10 | Grafana 대시보드 구성 (latency, throughput, error rate) | `deployments/grafana/dashboards/` | | 10 | 실험 결과 분석 및 구현 설계서 업데이트 | 본 문서 v1.0 | **완료 기준**: 모든 KPI 측정값 수집 완료, Grafana 대시보드 정상 동작 --- ## 11. 테스트 전략 ### 11.1 단위 테스트 ``` 대상 패키지 테스트 항목 ───────────────────────────────── ──────────────────────────────────── internal/gateway/resume_token.go 토큰 발급/검증, 서명 위조 감지, 만료 처리 internal/gateway/translator.go MQTT 토픽 패턴 매칭, JSON→Protobuf 변환 internal/field/offline_queue.go Enqueue/Drain 순서 보장, 동시 접근 internal/security/spiffe.go SVID 갱신 모킹, 인증 실패 처리 ``` ```go // 예시: Resume Token 단위 테스트 func TestResumeTokenRoundTrip(t *testing.T) { mgr := NewResumeTokenManager() state := ResumeState{ StreamID: "stream-abc", SequenceNum: 42, DeviceID: "SENSOR-001", TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01", } token, err := mgr.Issue(state) require.NoError(t, err) require.NotEmpty(t, token) restored, err := mgr.Verify(token) require.NoError(t, err) assert.Equal(t, state.StreamID, restored.StreamID) assert.Equal(t, state.SequenceNum, restored.SequenceNum) assert.Equal(t, state.TraceParent, restored.TraceParent) } func TestResumeTokenTampering(t *testing.T) { mgr := NewResumeTokenManager() state := ResumeState{StreamID: "s1", SequenceNum: 1, DeviceID: "d1"} token, _ := mgr.Issue(state) // 마지막 바이트 변조 tampered := token[:len(token)-2] + "ZZ" _, err := mgr.Verify(tampered) assert.Error(t, err) } ``` ### 11.2 통합 테스트 ```go // 예시: MQTT→gRPC 변환 통합 테스트 (testcontainers-go 사용) func TestMQTTToGRPCTranslation(t *testing.T) { ctx := context.Background() // Mosquitto 컨테이너 기동 mosquitto, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ ContainerRequest: testcontainers.ContainerRequest{ Image: "eclipse-mosquitto:2.0", ExposedPorts: []string{"1883/tcp"}, WaitingFor: wait.ForListeningPort("1883/tcp"), }, Started: true, }, ) require.NoError(t, err) defer mosquitto.Terminate(ctx) host, _ := mosquitto.Host(ctx) port, _ := mosquitto.MappedPort(ctx, "1883") brokerAddr := fmt.Sprintf("%s:%s", host, port.Port()) // Gateway 초기화 gw, err := gateway.New(ctx, gateway.GatewayConfig{ GRPCAddr: ":0", MQTTBroker: brokerAddr, }) require.NoError(t, err) // 텔레메트리 수신 채널 received := make(chan *telemetryv1.DataPoint, 1) gw.SetTelemetryHandler(func(dp *telemetryv1.DataPoint) { received <- dp }) // T3 역할: MQTT 발행 publisher, _ := newTestMQTTClient(brokerAddr) payload := `{"v":42.5,"u":"celsius","ts":1234567890000}` publisher.Publish("telemetry/factory-a/SENSOR-001/temperature", 1, false, payload) // 변환 결과 검증 select { case dp := <-received: assert.Equal(t, "SENSOR-001", dp.DeviceId) assert.Equal(t, "temperature", dp.Metric) assert.InDelta(t, 42.5, dp.Value, 0.001) case <-time.After(5 * time.Second): t.Fatal("timeout waiting for telemetry") } } ``` ### 11.3 성능 테스트 ```go // tests/perf/benchmark_test.go func BenchmarkUnaryRPCOverQUIC(b *testing.B) { // 환경: Docker Compose로 T1+T2 기동 후 실행 client, err := cloud.NewT1Client( context.Background(), "localhost:9443", testTLSConfig(), ) require.NoError(b, err) defer client.Close() task := &agentv1.AgentTask{ TaskId: "bench", AgentId: "t1", TaskType: "query", Payload: make([]byte, 256), // 256B 페이로드 } b.ResetTimer() b.RunParallel(func(pb *testing.PB) { for pb.Next() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) _, err := client.ExecuteTask(ctx, task) cancel() if err != nil { b.Errorf("RPC error: %v", err) } } }) } func BenchmarkTelemetryFanIn(b *testing.B) { // 1000개 T3 에이전트에서 동시 텔레메트리 발행 b.SetParallelism(1000) // 구현 생략 (위와 유사한 패턴) } ``` --- ## 12. 예상 성능 지표 및 측정 방법 ### 12.1 목표 성능 지표 (KPI) | 지표 | 목표값 | 측정 방법 | |------|--------|---------| | T1↔T2 gRPC Unary p50 latency | < 5ms (LAN) | `go test -bench=BenchmarkUnaryRPC` + `otelgrpc` 자동 계측 | | T1↔T2 gRPC Unary p99 latency | < 20ms (LAN) | Jaeger → Grafana 히스토그램 | | T2↔T3 MQTT→gRPC 변환 지연 | < 10ms | MQTT 발행 TS → gRPC 수신 TS 차분 | | T3 텔레메트리 동시 처리 | 1,000 노드 @ 1Hz | `BenchmarkTelemetryFanIn` | | Resume Token 재연결 후 재개 지연 | < 500ms | 네트워크 차단 → 재연결 시간 측정 | | gRPC over QUIC vs TCP 핸드오버 | QUIC -50% RTT | `tc netem`으로 IP 변경 시뮬레이션 | | SPIFFE SVID 갱신 오버헤드 | < 1ms/RPC | `pprof` + Prometheus `grpc_server_handling_seconds` | ### 12.2 측정 방법 상세 #### Prometheus 메트릭 수집 ```yaml # deployments/otel/prometheus.yml scrape_configs: - job_name: 'cloud-server' static_configs: - targets: ['cloud-server:2112'] scrape_interval: 5s - job_name: 't2-gateway' static_configs: - targets: ['t2-gateway:2112'] - job_name: 'otel-collector' static_configs: - targets: ['otel-collector:8889'] ``` #### 주요 모니터링 메트릭 ``` # gRPC 레이턴시 히스토그램 (otelgrpc 자동 생성) grpc_server_handling_seconds_bucket{grpc_method="Execute",grpc_service="agent.v1.AgentService"} # MQTT 메시지 처리량 mqtt_messages_received_total{topic="telemetry"} mqtt_translation_duration_seconds_bucket # Resume Token 재연결 resume_token_reconnect_total resume_token_reconnect_latency_seconds # 오프라인 큐 크기 offline_queue_size{device_id="SENSOR-001"} ``` #### 네트워크 장애 주입 (tc netem) ```bash # T2 게이트웨이에서 T1으로 가는 경로에 30% 패킷 손실 + 100ms 지연 주입 docker exec t2-gateway tc qdisc add dev eth0 root netem loss 30% delay 100ms # 30초 후 정상 복구 sleep 30 docker exec t2-gateway tc qdisc del dev eth0 root # IP 변경 시뮬레이션 (QUIC connection migration 테스트) docker network disconnect t1-net t2-gateway docker network connect t1-net t2-gateway ``` #### 핵심 그래프 (Grafana 대시보드) 1. **레이턴시 p50/p95/p99 시계열**: tier별 gRPC 호출 지연 추이 2. **처리량 시계열**: 초당 RPC 수, MQTT 메시지 수 3. **에러율**: gRPC 상태 코드별 에러율 (UNAVAILABLE, DEADLINE_EXCEEDED 등) 4. **Resume Token 이벤트**: 단절/재연결 빈도 및 재개 지연 5. **오프라인 큐 크기**: T3 디바이스별 버퍼 누적 현황 6. **Jaeger 트레이스 뷰**: T3→T2→T1 E2E 호출 그래프 --- ## 부록 A. Go 모듈 의존성 (`go.mod` 핵심 부분) ```go module github.com/your-org/edge-aiot-mas go 1.22 require ( // gRPC google.golang.org/grpc v1.64.0 google.golang.org/protobuf v1.34.1 // QUIC github.com/quic-go/quic-go v0.44.0 // MQTT github.com/eclipse/paho.mqtt.golang v1.4.3 // SPIFFE/SPIRE github.com/spiffe/go-spiffe/v2 v2.3.0 // OpenTelemetry go.opentelemetry.io/otel v1.27.0 go.opentelemetry.io/otel/sdk v1.27.0 go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0 go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0 // BoltDB (오프라인 큐) go.etcd.io/bbolt v1.3.10 // 테스트 github.com/testcontainers/testcontainers-go v0.31.0 github.com/stretchr/testify v1.9.0 ) ``` ## 부록 B. Python 의존성 (`python/pyproject.toml` 핵심 부분) ```toml [project] name = "edge-aiot-orchestrator" version = "0.1.0" requires-python = ">=3.11" dependencies = [ # LangGraph 에이전트 "langgraph>=0.1.0", "langchain-openai>=0.1.0", "langchain-core>=0.2.0", # gRPC "grpcio>=1.64.0", "grpcio-tools>=1.64.0", "protobuf>=5.27.0", # HTTP/A2A 클라이언트 "httpx[http2]>=0.27.0", # 관측성 "opentelemetry-sdk>=1.27.0", "opentelemetry-exporter-otlp>=1.27.0", "opentelemetry-instrumentation-grpc>=0.48b0", ] [project.optional-dependencies] dev = [ "pytest>=8.0", "pytest-asyncio>=0.23", "pytest-cov>=5.0", ] ```