Files
2026-06-25 12:19:20 +09:00

82 KiB

엣지 AIoT 3-tier 멀티 에이전트 통신 프레임워크 — 구현 설계서

문서 버전: v0.1.0
최종 수정: 2026-06-07
관련 문서: FINAL_REPORT.md (설계 근거 및 사용 사례 분석)


목차

  1. 기술 스택 선정 근거
  2. 프로젝트 디렉터리 구조
  3. Proto 파일 설계
  4. T2 Edge Gateway 구현 설계
  5. T1 Orchestrator 구현 설계
  6. T3 Field Agent 구현 설계
  7. SPIFFE/SPIRE 통합
  8. OpenTelemetry 통합
  9. Docker Compose 실험 환경 설계
  10. 구현 단계별 계획 (Phase)
  11. 테스트 전략
  12. 예상 성능 지표 및 측정 방법

1. 기술 스택 선정 근거

1.1 핵심 언어 선정

언어 적용 컴포넌트 선정 이유
Go T2 Gateway, T3 Field Agent, gRPC 서버/클라이언트 단일 바이너리 배포, 낮은 메모리 푸트프린트, 고루틴 기반 고동시성, quic-go 네이티브 지원, 크로스 컴파일(ARM/MIPS/x86)
Python T1 Orchestrator LLM 에이전트, A2A 클라이언트 LangGraph/AutoGen 등 LLM 프레임워크 생태계, 빠른 프로토타이핑, grpcio 지원

1.2 전송 계층 라이브러리

quic-go (github.com/quic-go/quic-go)

  • Go 네이티브 QUIC RFC 9000 구현 — CGO 의존 없음
  • net/http 인터페이스 호환 → HTTP/3 서버를 표준 핸들러로 운영 가능
  • Connection migration 내장: 핸드오버(IP 변경) 시 세션 유지
  • QUIC Datagram 확장(RFC 9221) 지원 → 저지연 V2X 메시지에 활용 가능
  • gRPC-Go 트랜스포트 레이어 교체 방식으로 통합 (WithContextDialer + quic.DialAddr)

gRPC-Go (google.golang.org/grpc)

  • 공식 Google Go gRPC 구현 — Interceptor Chain, Health Checking, Service Config(retry policy) 내장
  • Protobuf codec 교체 가능 → 커스텀 직렬화 확장 가능
  • grpc.WithTransportCredentials + quic-go TLS 설정을 연동하여 mTLS over QUIC 구현

Eclipse Paho Go (github.com/eclipse/paho.mqtt.golang)

  • MQTT 3.1.1 / 5.0 지원 Go 클라이언트
  • Reconnect 자동 처리 + Message Queue(오프라인 버퍼)
  • T3 현장 디바이스와의 Pub/Sub 인터페이스

1.3 에이전트 프레임워크

프레임워크 선정 이유
LangGraph (Python) 상태 머신 기반 에이전트 그래프, 체크포인팅·재시도 내장, T1 Orchestrator의 복잡한 오케스트레이션 로직에 적합
A2A Protocol Google 주도 에이전트 간 통신 표준, HTTP/JSON 기반으로 언어 무관 상호운용성, Task/Artifact 개념으로 장기 실행 작업 모델링

1.4 보안 및 관측성

라이브러리 선정 이유
SPIFFE/SPIRE Go SDK (github.com/spiffe/go-spiffe/v2) X.509-SVID 자동 갱신, gRPC credentials.TransportCredentials 구현 제공, TPM attestation 플러그인 체인
OpenTelemetry Go SDK (go.opentelemetry.io/otel) 벤더 중립 trace/metric/log, gRPC 자동 계측(otelgrpc), W3C TraceContext 전파
Prometheus Go 네이티브 exporter, 엣지 환경 경량 스크랩

2. 프로젝트 디렉터리 구조

edge-aiot-mas/
├── proto/                          # Protobuf 스키마 (언어 무관 공유)
│   ├── agent/
│   │   └── v1/
│   │       └── agent.proto         # AgentService (Unary/Stream 4모드)
│   ├── telemetry/
│   │   └── v1/
│   │       └── telemetry.proto     # TelemetryService
│   ├── control/
│   │   └── v1/
│   │       └── control.proto       # ControlService (OTA, Config)
│   └── buf.yaml                    # Buf 스키마 설정
│
├── gen/                            # proto 컴파일 출력 (생성 파일, git 추적)
│   ├── go/
│   │   ├── agent/v1/
│   │   ├── telemetry/v1/
│   │   └── control/v1/
│   └── python/
│       ├── agent/v1/
│       └── telemetry/v1/
│
├── go.mod                          # Go 모듈 루트
├── go.sum
│
├── cmd/
│   ├── gateway/
│   │   └── main.go                 # T2 Edge Gateway 진입점
│   ├── field-agent/
│   │   └── main.go                 # T3 Field Agent 진입점
│   └── cloud-server/
│       └── main.go                 # T1 Go gRPC 서버 진입점
│
├── internal/
│   ├── gateway/
│   │   ├── server.go               # gRPC over QUIC 서버 초기화
│   │   ├── mqtt_client.go          # MQTT 브로커 클라이언트
│   │   ├── translator.go           # MQTT topic → gRPC method 변환 엔진
│   │   ├── resume_token.go         # Resume Token 관리자
│   │   ├── a2a_handler.go          # A2A HTTP/3 엔드포인트
│   │   └── interceptors/
│   │       ├── resume.go           # Resume Token gRPC Interceptor
│   │       ├── deadline.go         # Deadline 강제 Interceptor
│   │       └── otel.go             # OTel trace 전파 Interceptor
│   │
│   ├── field/
│   │   ├── mqtt_publisher.go       # MQTT 텔레메트리 발행
│   │   ├── offline_queue.go        # 오프라인 큐 (BoltDB)
│   │   └── resume_client.go        # Resume Token 클라이언트
│   │
│   ├── cloud/
│   │   ├── agent_service.go        # AgentService 서버 구현
│   │   └── telemetry_service.go    # TelemetryService 서버 구현
│   │
│   ├── security/
│   │   ├── spiffe.go               # SPIFFE/SPIRE 클라이언트 래퍼
│   │   └── credentials.go          # gRPC TransportCredentials 생성
│   │
│   └── observability/
│       ├── tracer.go               # OTel TracerProvider 초기화
│       └── metrics.go              # Prometheus 메트릭 등록
│
├── python/
│   ├── pyproject.toml
│   ├── orchestrator/
│   │   ├── __init__.py
│   │   ├── agent.py                # LangGraph 오케스트레이터 에이전트
│   │   ├── a2a_client.py           # A2A 클라이언트
│   │   ├── grpc_client.py          # gRPC-over-QUIC 클라이언트 (aioquic 또는 subprocess)
│   │   └── tools/
│   │       ├── telemetry_tool.py   # 텔레메트리 조회 도구
│   │       └── control_tool.py     # 제어 명령 도구
│   └── tests/
│       └── test_agent.py
│
├── deployments/
│   ├── docker-compose.yml          # 전체 실험 환경
│   ├── docker-compose.spire.yml    # SPIRE 오버레이
│   ├── spire/
│   │   ├── server.conf
│   │   └── agent.conf
│   ├── mosquitto/
│   │   └── mosquitto.conf
│   └── otel/
│       └── otel-collector-config.yaml
│
├── configs/
│   ├── gateway.yaml                # T2 Gateway 설정
│   ├── field-agent.yaml            # T3 Field Agent 설정
│   └── cloud-server.yaml           # T1 서버 설정
│
└── tests/
    ├── integration/
    │   ├── gateway_test.go
    │   └── e2e_test.go
    └── perf/
        └── benchmark_test.go

3. Proto 파일 설계

3.1 AgentService (proto/agent/v1/agent.proto)

syntax = "proto3";

package agent.v1;

option go_package = "github.com/your-org/edge-aiot-mas/gen/go/agent/v1;agentv1";

import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";

// 에이전트 태스크 요청
message AgentTask {
  string task_id        = 1;  // UUID
  string agent_id       = 2;  // 발신 에이전트 ID
  string target_agent   = 3;  // 수신 에이전트 ID
  string task_type      = 4;  // "infer", "control", "query", "ota"
  bytes  payload        = 5;  // Protobuf Any 직렬화
  map<string, string> metadata = 6;  // X-Tenant-ID, X-Device-Class 등
  google.protobuf.Timestamp deadline = 7;
}

// 에이전트 태스크 응답
message AgentResult {
  string task_id   = 1;
  string agent_id  = 2;
  Status status    = 3;
  bytes  payload   = 4;
  string resume_token = 5;  // 스트리밍 재개 토큰
  map<string, string> metadata = 6;

  enum Status {
    STATUS_UNSPECIFIED = 0;
    STATUS_OK          = 1;
    STATUS_PARTIAL     = 2;  // 스트림 중간 청크
    STATUS_ERROR       = 3;
  }
}

// 스트리밍 청크 (Server/Client/Bidi Streaming 공용)
message StreamChunk {
  string stream_id    = 1;
  uint64 sequence_num = 2;
  bytes  data         = 3;
  bool   is_last      = 4;
  string resume_token = 5;
}

// 4대 RPC 모드를 모두 지원하는 AgentService
service AgentService {
  // Unary: 단일 명령/조회 (OTA 명령, 상태 조회)
  rpc Execute(AgentTask) returns (AgentResult);

  // Server Streaming: T1→T2 모델 업데이트, 토큰 스트리밍
  rpc Subscribe(AgentTask) returns (stream StreamChunk);

  // Client Streaming: T3→T2 배치 센서 데이터 업로드
  rpc Upload(stream StreamChunk) returns (AgentResult);

  // Bidi Streaming: T2↔T2 엣지 합의, 로봇 실시간 협업
  rpc Collaborate(stream AgentTask) returns (stream AgentResult);
}

3.2 TelemetryService (proto/telemetry/v1/telemetry.proto)

syntax = "proto3";

package telemetry.v1;

option go_package = "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1;telemetryv1";

import "google/protobuf/timestamp.proto";

// 단일 센서 데이터 포인트
message DataPoint {
  string device_id  = 1;
  string metric     = 2;   // "temperature", "vibration", "voltage" 등
  double value      = 3;
  string unit       = 4;
  google.protobuf.Timestamp ts = 5;
  map<string, string> labels   = 6;  // site, line, zone 등 레이블
}

// 배치 텔레메트리 (다수 디바이스 팬인)
message TelemetryBatch {
  string source_gateway = 1;
  repeated DataPoint points = 2;
  string resume_token       = 3;  // 마지막 처리 위치
}

// 텔레메트리 수신 확인
message TelemetryAck {
  string resume_token  = 1;  // 서버가 발급한 다음 재개 위치
  uint64 accepted      = 2;
  uint64 rejected      = 3;
  string error_message = 4;
}

// 텔레메트리 쿼리
message TelemetryQuery {
  string device_id = 1;
  string metric    = 2;
  google.protobuf.Timestamp from = 3;
  google.protobuf.Timestamp to   = 4;
  int32  limit     = 5;
}

service TelemetryService {
  // Client Streaming: T3 다수 센서 배치 업로드
  rpc PushBatch(stream DataPoint) returns (TelemetryAck);

  // Unary: 텔레메트리 배치 전송
  rpc Push(TelemetryBatch) returns (TelemetryAck);

  // Server Streaming: 실시간 텔레메트리 구독
  rpc Stream(TelemetryQuery) returns (stream DataPoint);
}

3.3 ControlService (proto/control/v1/control.proto)

syntax = "proto3";

package control.v1;

option go_package = "github.com/your-org/edge-aiot-mas/gen/go/control/v1;controlv1";

message OTARequest {
  string device_id      = 1;
  string firmware_url   = 2;
  string expected_sha256 = 3;
  string version        = 4;
}

message OTAStatus {
  string device_id = 1;
  State  state     = 2;
  int32  progress  = 3;  // 0-100
  string error     = 4;

  enum State {
    STATE_UNSPECIFIED  = 0;
    STATE_DOWNLOADING  = 1;
    STATE_VERIFYING    = 2;
    STATE_APPLYING     = 3;
    STATE_COMPLETE     = 4;
    STATE_FAILED       = 5;
  }
}

message ConfigUpdate {
  string device_id = 1;
  bytes  config    = 2;  // JSON or Protobuf Any
  string version   = 3;
}

message ConfigAck {
  bool   applied  = 1;
  string version  = 2;
  string error    = 3;
}

service ControlService {
  // Server Streaming: OTA 다운로드 진행 상황 푸시
  rpc OTAUpdate(OTARequest) returns (stream OTAStatus);

  // Unary: 설정 업데이트
  rpc UpdateConfig(ConfigUpdate) returns (ConfigAck);
}

3.4 Buf 설정 (proto/buf.yaml)

version: v2
modules:
  - path: .
lint:
  use:
    - STANDARD
breaking:
  use:
    - FILE
deps:
  - buf.build/googleapis/googleapis

4. T2 Edge Gateway 구현 설계

4.1 quic-go + gRPC 통합

T2 게이트웨이는 T1 방향으로 gRPC over QUIC 서버를 운영하고, T3 방향으로 MQTT 클라이언트를 유지한다.

internal/gateway/server.go

package gateway

import (
    "context"
    "crypto/tls"
    "fmt"
    "net"
    "net/http"

    "github.com/quic-go/quic-go"
    "github.com/quic-go/quic-go/http3"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"

    agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
    telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
    "github.com/your-org/edge-aiot-mas/internal/gateway/interceptors"
    "github.com/your-org/edge-aiot-mas/internal/security"
)

// GatewayConfig는 T2 게이트웨이 설정을 담는다.
type GatewayConfig struct {
    GRPCAddr    string // T1 방향 gRPC over QUIC 수신 주소 (예: ":9443")
    A2AAddr     string // A2A HTTP/3 수신 주소 (예: ":8443")
    MQTTBroker  string // T3 방향 MQTT 브로커 주소
    SPIFFESocket string // SPIRE Agent 소켓 경로
}

// Gateway는 T2 엣지 게이트웨이의 핵심 컴포넌트다.
type Gateway struct {
    cfg         GatewayConfig
    grpcServer  *grpc.Server
    http3Server *http3.Server
    mqttClient  *MQTTClient
    translator  *Translator
    tokenMgr    *ResumeTokenManager
}

// New는 Gateway를 초기화하고 반환한다.
func New(ctx context.Context, cfg GatewayConfig) (*Gateway, error) {
    // 1. SPIFFE/SPIRE에서 TLS 자격증명 획득
    creds, err := security.NewSPIFFECredentials(ctx, cfg.SPIFFESocket)
    if err != nil {
        return nil, fmt.Errorf("SPIFFE credentials: %w", err)
    }

    // 2. Resume Token 관리자 초기화
    tokenMgr := NewResumeTokenManager()

    // 3. gRPC 서버 인터셉터 체인 구성
    srv := grpc.NewServer(
        grpc.Creds(creds),
        grpc.ChainUnaryInterceptor(
            interceptors.DeadlineEnforcer(defaultDeadlines),
            interceptors.ResumeTokenUnary(tokenMgr),
            interceptors.OTelUnary(),
        ),
        grpc.ChainStreamInterceptor(
            interceptors.ResumeTokenStream(tokenMgr),
            interceptors.OTelStream(),
        ),
    )

    // 4. 서비스 등록
    agentSvc := NewAgentServiceServer(tokenMgr)
    telemetrySvc := NewTelemetryServiceServer(tokenMgr)
    agentv1.RegisterAgentServiceServer(srv, agentSvc)
    telemetryv1.RegisterTelemetryServiceServer(srv, telemetrySvc)

    // 5. MQTT 클라이언트 초기화
    mqttClient, err := NewMQTTClient(cfg.MQTTBroker)
    if err != nil {
        return nil, fmt.Errorf("MQTT client: %w", err)
    }

    return &Gateway{
        cfg:        cfg,
        grpcServer: srv,
        mqttClient: mqttClient,
        translator: NewTranslator(agentSvc, telemetrySvc),
        tokenMgr:   tokenMgr,
    }, nil
}

// ServeGRPCOverQUIC는 quic-go 리스너 위에서 gRPC 서버를 구동한다.
func (g *Gateway) ServeGRPCOverQUIC(ctx context.Context) error {
    tlsCfg := g.grpcServer.GetServiceInfo() // SPIFFE creds에서 TLS config 추출
    _ = tlsCfg

    // quic-go 리스너 생성
    quicListener, err := quic.ListenAddr(g.cfg.GRPCAddr, &tls.Config{
        NextProtos: []string{"h2", "h3"}, // gRPC는 h2, HTTP/3는 h3
        MinVersion: tls.VersionTLS13,
        // GetCertificate는 SPIFFE SVID에서 동적으로 제공
    }, &quic.Config{
        EnableDatagrams: true,
        MaxIdleTimeout:  30 * 60 * 1e9, // 30분
    })
    if err != nil {
        return fmt.Errorf("quic listen: %w", err)
    }
    defer quicListener.Close()

    // quic.Listener를 net.Listener 인터페이스로 래핑
    netListener := &quicNetListener{inner: quicListener}

    // gRPC 서버를 quic-go 리스너 위에서 구동
    go func() {
        <-ctx.Done()
        g.grpcServer.GracefulStop()
    }()

    return g.grpcServer.Serve(netListener)
}

// quicNetListener는 quic.Listener를 net.Listener로 래핑한다.
type quicNetListener struct {
    inner *quic.Listener
}

func (l *quicNetListener) Accept() (net.Conn, error) {
    conn, err := l.inner.Accept(context.Background())
    if err != nil {
        return nil, err
    }
    return &quicConn{conn: conn}, nil
}

func (l *quicNetListener) Close() error   { return l.inner.Close() }
func (l *quicNetListener) Addr() net.Addr { return l.inner.Addr() }

// quicConn은 quic.Connection을 net.Conn으로 래핑한다.
// gRPC-Go는 net.Conn 인터페이스를 사용하므로 첫 번째 스트림을 단일 연결로 노출.
type quicConn struct {
    conn   quic.Connection
    stream quic.Stream
}

func (c *quicConn) Read(b []byte) (int, error) {
    if c.stream == nil {
        var err error
        c.stream, err = c.conn.AcceptStream(context.Background())
        if err != nil {
            return 0, err
        }
    }
    return c.stream.Read(b)
}

func (c *quicConn) Write(b []byte) (int, error) {
    if c.stream == nil {
        return 0, fmt.Errorf("stream not initialized")
    }
    return c.stream.Write(b)
}

func (c *quicConn) Close() error                       { return c.conn.CloseWithError(0, "closed") }
func (c *quicConn) LocalAddr() net.Addr                { return c.conn.LocalAddr() }
func (c *quicConn) RemoteAddr() net.Addr               { return c.conn.RemoteAddr() }
func (c *quicConn) SetDeadline(t interface{}) error    { return nil }
func (c *quicConn) SetReadDeadline(t interface{}) error  { return nil }
func (c *quicConn) SetWriteDeadline(t interface{}) error { return nil }

// defaultDeadlines는 디바이스 클래스별 기본 deadline 정책이다.
var defaultDeadlines = map[string]int64{
    "agv":      50,    // ms
    "robot":    100,
    "sensor":   5000,
    "default":  3000,
}

참고: 실제 quic-go와 gRPC-Go 통합 시 grpc.WithContextDialer를 사용하여 클라이언트 측에서 QUIC 다이얼러를 주입하는 방식이 더 실용적이다. 위 코드는 서버 측 리스너 래핑 패턴을 보여준다. 프로덕션에서는 quic-go/http3 패키지의 ServeListener 방식 또는 별도 gRPC-QUIC 어댑터 라이브러리(github.com/open-telemetry/opentelemetry-collector-contrib 참조)를 활용한다.

4.2 MQTT→gRPC 변환 엔진

MQTT Topic 네이밍 규칙

telemetry/{site}/{device_id}/{metric}   → TelemetryService.Push
control/{site}/{device_id}/ota          → ControlService.OTAUpdate
agent/{site}/{device_id}/task           → AgentService.Execute

internal/gateway/translator.go

package gateway

import (
    "context"
    "encoding/json"
    "fmt"
    "strings"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "google.golang.org/protobuf/proto"

    agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
    telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

// TopicRoute는 MQTT topic 패턴과 핸들러 함수의 매핑이다.
type TopicRoute struct {
    Pattern string
    Handler func(ctx context.Context, topic string, payload []byte) error
}

// Translator는 MQTT 메시지를 gRPC 호출로 변환한다.
type Translator struct {
    agentSvc     agentv1.AgentServiceServer
    telemetrySvc telemetryv1.TelemetryServiceServer
    routes       []TopicRoute
    propagator   propagation.TextMapPropagator
}

// NewTranslator는 변환 엔진을 초기화한다.
func NewTranslator(
    agentSvc agentv1.AgentServiceServer,
    telemetrySvc telemetryv1.TelemetryServiceServer,
) *Translator {
    t := &Translator{
        agentSvc:     agentSvc,
        telemetrySvc: telemetrySvc,
        propagator:   otel.GetTextMapPropagator(),
    }
    t.registerRoutes()
    return t
}

// registerRoutes는 MQTT topic 패턴별 핸들러를 등록한다.
func (t *Translator) registerRoutes() {
    t.routes = []TopicRoute{
        {
            Pattern: "telemetry/+/+/+",
            Handler: t.handleTelemetry,
        },
        {
            Pattern: "agent/+/+/task",
            Handler: t.handleAgentTask,
        },
    }
}

// HandleMessage는 MQTT 메시지를 수신하고 라우팅한다.
func (t *Translator) HandleMessage(_ mqtt.Client, msg mqtt.Message) {
    topic := msg.Topic()
    payload := msg.Payload()

    // MQTT UserProperties에서 OTel trace context 추출
    ctx := context.Background()
    // MQTT 5.0 UserProperties를 통한 traceparent 전파는
    // paho.mqtt.golang v2(MQTT 5.0 지원)에서 msg.Properties()로 접근

    for _, route := range t.routes {
        if matchTopic(route.Pattern, topic) {
            if err := route.Handler(ctx, topic, payload); err != nil {
                // 에러 로깅 (OTel span에 기록)
                fmt.Printf("translation error [%s]: %v\n", topic, err)
            }
            return
        }
    }
}

// handleTelemetry는 telemetry/{site}/{device}/{metric} 토픽을 처리한다.
func (t *Translator) handleTelemetry(ctx context.Context, topic string, payload []byte) error {
    parts := strings.Split(topic, "/")
    if len(parts) != 4 {
        return fmt.Errorf("invalid telemetry topic: %s", topic)
    }
    site, deviceID, metric := parts[1], parts[2], parts[3]

    // JSON 페이로드 파싱 (T3 디바이스는 JSON으로 발행)
    var raw struct {
        Value float64 `json:"v"`
        Unit  string  `json:"u"`
        TS    int64   `json:"ts"` // Unix milliseconds
    }
    if err := json.Unmarshal(payload, &raw); err != nil {
        return fmt.Errorf("json unmarshal: %w", err)
    }

    // Protobuf DataPoint 생성
    dp := &telemetryv1.DataPoint{
        DeviceId: deviceID,
        Metric:   metric,
        Value:    raw.Value,
        Unit:     raw.Unit,
        Labels:   map[string]string{"site": site},
    }
    if raw.TS > 0 {
        ts := time.UnixMilli(raw.TS)
        _ = ts // timestamppb 변환 후 dp.Ts에 할당
    }

    // TelemetryService.Push를 내부 호출
    batch := &telemetryv1.TelemetryBatch{
        SourceGateway: "t2-gateway",
        Points:        []*telemetryv1.DataPoint{dp},
    }
    _, err := t.telemetrySvc.(*TelemetryServiceServer).pushBatch(ctx, batch)
    return err
}

// handleAgentTask는 agent/{site}/{device}/task 토픽을 처리한다.
func (t *Translator) handleAgentTask(ctx context.Context, topic string, payload []byte) error {
    parts := strings.Split(topic, "/")
    if len(parts) != 4 {
        return fmt.Errorf("invalid agent topic: %s", topic)
    }
    deviceID := parts[2]

    var task agentv1.AgentTask
    if err := proto.Unmarshal(payload, &task); err != nil {
        // JSON fallback
        if err2 := json.Unmarshal(payload, &task); err2 != nil {
            return fmt.Errorf("unmarshal agent task: %w", err)
        }
    }
    task.AgentId = deviceID

    _, err := t.agentSvc.Execute(ctx, &task)
    return err
}

// matchTopic는 MQTT 와일드카드(+, #) 패턴 매칭을 수행한다.
func matchTopic(pattern, topic string) bool {
    pp := strings.Split(pattern, "/")
    tp := strings.Split(topic, "/")
    if len(pp) != len(tp) {
        if len(pp) > 0 && pp[len(pp)-1] == "#" {
            return strings.HasPrefix(topic, strings.Join(pp[:len(pp)-1], "/"))
        }
        return false
    }
    for i, p := range pp {
        if p != "+" && p != tp[i] {
            return false
        }
    }
    return true
}

4.3 Resume Token 구현

Resume Token은 스트리밍 RPC가 중단되었을 때 재연결 후 중단 위치부터 이어받기 위한 서버 발급 토큰이다.

internal/gateway/resume_token.go

package gateway

import (
    "crypto/hmac"
    "crypto/sha256"
    "encoding/base64"
    "encoding/json"
    "fmt"
    "sync"
    "time"
)

// ResumeState는 스트리밍 재개 상태를 표현한다.
type ResumeState struct {
    StreamID    string            `json:"sid"`
    SequenceNum uint64            `json:"seq"`
    DeviceID    string            `json:"did"`
    Timestamp   int64             `json:"ts"`
    TraceParent string            `json:"tp,omitempty"` // W3C traceparent
    Extra       map[string]string `json:"extra,omitempty"`
}

// ResumeTokenManager는 Resume Token을 발급하고 검증한다.
type ResumeTokenManager struct {
    secret []byte
    store  sync.Map // streamID → *ResumeState
}

// NewResumeTokenManager는 새 관리자를 초기화한다.
func NewResumeTokenManager() *ResumeTokenManager {
    // 프로덕션에서는 SPIFFE SVID 또는 환경 변수에서 시크릿 로드
    return &ResumeTokenManager{
        secret: []byte("change-me-in-production"),
    }
}

// Issue는 현재 스트리밍 상태를 HMAC 서명된 토큰으로 발급한다.
func (m *ResumeTokenManager) Issue(state ResumeState) (string, error) {
    state.Timestamp = time.Now().Unix()
    data, err := json.Marshal(state)
    if err != nil {
        return "", fmt.Errorf("marshal state: %w", err)
    }

    // HMAC-SHA256 서명
    mac := hmac.New(sha256.New, m.secret)
    mac.Write(data)
    sig := mac.Sum(nil)

    // base64url(data) + "." + base64url(sig)
    token := base64.RawURLEncoding.EncodeToString(data) +
        "." +
        base64.RawURLEncoding.EncodeToString(sig)

    // 상태 저장 (메모리 캐시)
    m.store.Store(state.StreamID, &state)

    return token, nil
}

// Verify는 토큰을 검증하고 ResumeState를 반환한다.
func (m *ResumeTokenManager) Verify(token string) (*ResumeState, error) {
    parts := splitOnLast(token, ".")
    if len(parts) != 2 {
        return nil, fmt.Errorf("invalid token format")
    }

    data, err := base64.RawURLEncoding.DecodeString(parts[0])
    if err != nil {
        return nil, fmt.Errorf("decode data: %w", err)
    }
    sig, err := base64.RawURLEncoding.DecodeString(parts[1])
    if err != nil {
        return nil, fmt.Errorf("decode sig: %w", err)
    }

    // HMAC 검증
    mac := hmac.New(sha256.New, m.secret)
    mac.Write(data)
    expected := mac.Sum(nil)
    if !hmac.Equal(sig, expected) {
        return nil, fmt.Errorf("invalid token signature")
    }

    var state ResumeState
    if err := json.Unmarshal(data, &state); err != nil {
        return nil, fmt.Errorf("unmarshal state: %w", err)
    }

    // 24시간 만료 체크
    if time.Now().Unix()-state.Timestamp > 86400 {
        return nil, fmt.Errorf("token expired")
    }

    return &state, nil
}

func splitOnLast(s, sep string) []string {
    idx := len(s) - 1
    for idx >= 0 && string(s[idx]) != sep {
        idx--
    }
    if idx < 0 {
        return []string{s}
    }
    return []string{s[:idx], s[idx+1:]}
}

Resume Token gRPC Interceptor (internal/gateway/interceptors/resume.go)

package interceptors

import (
    "context"

    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"

    "github.com/your-org/edge-aiot-mas/internal/gateway"
)

const resumeTokenMetaKey = "x-resume-token"

// ResumeTokenUnary는 Unary RPC에서 Resume Token을 처리하는 인터셉터다.
func ResumeTokenUnary(mgr *gateway.ResumeTokenManager) grpc.UnaryServerInterceptor {
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        ctx = injectResumeState(ctx, mgr)
        resp, err := handler(ctx, req)
        return resp, err
    }
}

// ResumeTokenStream는 Streaming RPC에서 Resume Token을 처리하는 인터셉터다.
func ResumeTokenStream(mgr *gateway.ResumeTokenManager) grpc.StreamServerInterceptor {
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        // 클라이언트가 재연결 시 보내는 Resume Token 검증
        ctx := ss.Context()
        if md, ok := metadata.FromIncomingContext(ctx); ok {
            if tokens := md.Get(resumeTokenMetaKey); len(tokens) > 0 {
                state, err := mgr.Verify(tokens[0])
                if err == nil {
                    // 검증된 상태를 context에 주입
                    ctx = context.WithValue(ctx, resumeStateKey{}, state)
                    ss = &resumeServerStream{ServerStream: ss, ctx: ctx}
                }
            }
        }
        return handler(srv, ss)
    }
}

// injectResumeState는 incoming metadata에서 Resume Token을 추출하고 context에 주입한다.
func injectResumeState(ctx context.Context, mgr *gateway.ResumeTokenManager) context.Context {
    md, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return ctx
    }
    tokens := md.Get(resumeTokenMetaKey)
    if len(tokens) == 0 {
        return ctx
    }
    state, err := mgr.Verify(tokens[0])
    if err != nil {
        return ctx
    }
    return context.WithValue(ctx, resumeStateKey{}, state)
}

// GetResumeState는 context에서 ResumeState를 추출한다.
func GetResumeState(ctx context.Context) (*gateway.ResumeState, bool) {
    state, ok := ctx.Value(resumeStateKey{}).(*gateway.ResumeState)
    return state, ok
}

type resumeStateKey struct{}

// resumeServerStream은 context를 오버라이드한 ServerStream 래퍼다.
type resumeServerStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (s *resumeServerStream) Context() context.Context { return s.ctx }

4.4 A2A HTTP/3 엔드포인트

internal/gateway/a2a_handler.go

package gateway

import (
    "encoding/json"
    "net/http"

    "github.com/quic-go/quic-go/http3"
)

// A2ATask는 Google A2A 프로토콜의 태스크 요청 구조체다.
type A2ATask struct {
    ID       string          `json:"id"`
    Message  A2AMessage      `json:"message"`
    Metadata map[string]any  `json:"metadata,omitempty"`
}

// A2AMessage는 A2A 메시지 구조체다.
type A2AMessage struct {
    Role    string      `json:"role"`
    Parts   []A2APart   `json:"parts"`
}

// A2APart는 A2A 메시지의 단일 파트(텍스트, 데이터 등)다.
type A2APart struct {
    Type    string `json:"type"`
    Text    string `json:"text,omitempty"`
    MimeType string `json:"mimeType,omitempty"`
    Data    []byte `json:"data,omitempty"`
}

// ServeA2AHTTP3는 A2A HTTP/3 서버를 구동한다.
func (g *Gateway) ServeA2AHTTP3() error {
    mux := http.NewServeMux()

    // A2A 에이전트 카드 (에이전트 능력 공개)
    mux.HandleFunc("/.well-known/agent.json", g.handleAgentCard)

    // A2A 태스크 수신 엔드포인트
    mux.HandleFunc("/tasks/send", g.handleTaskSend)
    mux.HandleFunc("/tasks/sendSubscribe", g.handleTaskSendSubscribe)
    mux.HandleFunc("/tasks/get", g.handleTaskGet)

    server := &http3.Server{
        Addr:    g.cfg.A2AAddr,
        Handler: mux,
    }

    return server.ListenAndServeTLS("", "") // TLS config는 SPIFFE에서 주입
}

// handleAgentCard는 에이전트의 능력을 JSON으로 반환한다.
func (g *Gateway) handleAgentCard(w http.ResponseWriter, r *http.Request) {
    card := map[string]any{
        "name":        "T2-Edge-Gateway",
        "description": "엣지 AIoT T2 게이트웨이 에이전트",
        "url":         "https://" + g.cfg.A2AAddr,
        "version":     "1.0.0",
        "capabilities": map[string]any{
            "streaming":         true,
            "pushNotifications": false,
        },
        "skills": []map[string]any{
            {
                "id":          "telemetry_query",
                "name":        "텔레메트리 조회",
                "description": "T3 디바이스 텔레메트리 데이터를 조회한다",
            },
            {
                "id":          "device_control",
                "name":        "디바이스 제어",
                "description": "T3 디바이스에 제어 명령을 전달한다",
            },
        },
    }
    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(card)
}

// handleTaskSend는 A2A 태스크를 수신하고 처리한다.
func (g *Gateway) handleTaskSend(w http.ResponseWriter, r *http.Request) {
    if r.Method != http.MethodPost {
        http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
        return
    }

    var task A2ATask
    if err := json.NewDecoder(r.Body).Decode(&task); err != nil {
        http.Error(w, "invalid request body", http.StatusBadRequest)
        return
    }

    // 태스크를 gRPC AgentService.Execute로 위임
    ctx := r.Context()
    result, err := g.processA2ATask(ctx, &task)
    if err != nil {
        http.Error(w, err.Error(), http.StatusInternalServerError)
        return
    }

    w.Header().Set("Content-Type", "application/json")
    json.NewEncoder(w).Encode(result)
}

// handleTaskSendSubscribe는 SSE 스트리밍으로 태스크 상태를 푸시한다.
func (g *Gateway) handleTaskSendSubscribe(w http.ResponseWriter, r *http.Request) {
    w.Header().Set("Content-Type", "text/event-stream")
    w.Header().Set("Cache-Control", "no-cache")

    // SSE 스트리밍 구현 (생략)
    flusher, ok := w.(http.Flusher)
    if !ok {
        http.Error(w, "streaming not supported", http.StatusInternalServerError)
        return
    }
    _ = flusher
    // TODO: gRPC Bidi 스트림 → SSE 변환
}

func (g *Gateway) processA2ATask(ctx interface{}, task *A2ATask) (map[string]any, error) {
    // 구현: task.Message.Parts에서 요청 추출 → gRPC 호출 → 결과 반환
    return map[string]any{
        "id":     task.ID,
        "status": map[string]string{"state": "completed"},
    }, nil
}

5. T1 Orchestrator 구현 설계

5.1 Python LangGraph 에이전트 + A2A 클라이언트

python/orchestrator/agent.py

from __future__ import annotations

import asyncio
import json
import os
from typing import Annotated, TypedDict

import httpx
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages

from .a2a_client import A2AClient
from .tools.telemetry_tool import TelemetryQueryTool
from .tools.control_tool import DeviceControlTool


class OrchestratorState(TypedDict):
    """오케스트레이터 에이전트 상태."""
    messages: Annotated[list, add_messages]
    site_id: str
    active_tasks: dict[str, str]  # task_id → status


# 에이전트가 사용할 도구 목록
tools = [TelemetryQueryTool(), DeviceControlTool()]

# LLM 초기화 (모델은 환경 변수로 교체 가능)
llm = ChatOpenAI(
    model=os.getenv("LLM_MODEL", "gpt-4o"),
    temperature=0,
).bind_tools(tools)


def should_continue(state: OrchestratorState) -> str:
    """도구 호출 여부에 따라 다음 노드를 결정한다."""
    last = state["messages"][-1]
    if isinstance(last, AIMessage) and last.tool_calls:
        return "tools"
    return END


async def call_model(state: OrchestratorState) -> OrchestratorState:
    """LLM을 호출하고 응답을 상태에 추가한다."""
    system = SystemMessage(content=(
        "당신은 엣지 AIoT 오케스트레이터입니다. "
        "T2 게이트웨이 에이전트를 통해 현장 디바이스를 조율합니다. "
        f"현재 사이트: {state['site_id']}"
    ))
    messages = [system] + state["messages"]
    response = await llm.ainvoke(messages)
    return {"messages": [response]}


async def call_tools(state: OrchestratorState) -> OrchestratorState:
    """도구를 실행하고 결과를 상태에 추가한다."""
    from langchain_core.messages import ToolMessage

    last = state["messages"][-1]
    results = []
    for tool_call in last.tool_calls:
        # 도구 이름으로 실제 도구 인스턴스 검색
        tool = next((t for t in tools if t.name == tool_call["name"]), None)
        if tool is None:
            content = f"Unknown tool: {tool_call['name']}"
        else:
            try:
                content = await tool.arun(tool_call["args"])
            except Exception as e:
                content = f"Tool error: {e}"

        results.append(ToolMessage(
            content=str(content),
            tool_call_id=tool_call["id"],
        ))
    return {"messages": results}


def build_graph() -> StateGraph:
    """LangGraph 오케스트레이터 그래프를 빌드한다."""
    builder = StateGraph(OrchestratorState)

    builder.add_node("agent", call_model)
    builder.add_node("tools", call_tools)

    builder.set_entry_point("agent")
    builder.add_conditional_edges("agent", should_continue)
    builder.add_edge("tools", "agent")

    return builder.compile(checkpointer=MemorySaver())


# 그래프 싱글턴
graph = build_graph()


async def run_orchestrator(user_input: str, site_id: str, thread_id: str) -> str:
    """오케스트레이터를 실행하고 최종 응답을 반환한다."""
    config = {"configurable": {"thread_id": thread_id}}
    initial_state: OrchestratorState = {
        "messages": [HumanMessage(content=user_input)],
        "site_id": site_id,
        "active_tasks": {},
    }
    final_state = await graph.ainvoke(initial_state, config=config)
    last_message = final_state["messages"][-1]
    return last_message.content if hasattr(last_message, "content") else str(last_message)

python/orchestrator/a2a_client.py

from __future__ import annotations

import asyncio
import json
import uuid
from typing import AsyncIterator

import httpx


class A2AClient:
    """T2 Edge Gateway의 A2A HTTP/3 엔드포인트를 호출하는 클라이언트."""

    def __init__(self, base_url: str, timeout: float = 30.0):
        self.base_url = base_url.rstrip("/")
        # HTTP/3(QUIC) 지원을 위해 httpx + h3 백엔드 사용
        # 실험 환경에서는 HTTP/2로 폴백 가능
        self._client = httpx.AsyncClient(
            http2=True,
            timeout=timeout,
            verify=False,  # 개발 환경 — 프로덕션에서는 SPIFFE SVID CA 적용
        )

    async def send_task(self, message: str, skill_id: str | None = None) -> dict:
        """태스크를 T2 게이트웨이로 전송하고 결과를 반환한다."""
        task = {
            "id": str(uuid.uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}],
            },
            "metadata": {"skill_id": skill_id} if skill_id else {},
        }

        resp = await self._client.post(
            f"{self.base_url}/tasks/send",
            json=task,
            headers={"Content-Type": "application/json"},
        )
        resp.raise_for_status()
        return resp.json()

    async def send_task_streaming(
        self, message: str, skill_id: str | None = None
    ) -> AsyncIterator[dict]:
        """SSE 스트리밍으로 태스크 진행 상태를 수신한다."""
        task = {
            "id": str(uuid.uuid4()),
            "message": {
                "role": "user",
                "parts": [{"type": "text", "text": message}],
            },
        }

        async with self._client.stream(
            "POST",
            f"{self.base_url}/tasks/sendSubscribe",
            json=task,
        ) as resp:
            resp.raise_for_status()
            async for line in resp.aiter_lines():
                if line.startswith("data: "):
                    data = json.loads(line[6:])
                    yield data

    async def get_agent_card(self) -> dict:
        """T2 게이트웨이의 에이전트 카드를 조회한다."""
        resp = await self._client.get(f"{self.base_url}/.well-known/agent.json")
        resp.raise_for_status()
        return resp.json()

    async def close(self):
        await self._client.aclose()

5.2 Go gRPC-over-QUIC 클라이언트 코드

internal/cloud/grpc_quic_client.go

package cloud

import (
    "context"
    "crypto/tls"
    "fmt"
    "net"

    "github.com/quic-go/quic-go"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials"

    agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
    telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
)

// QUICDialer는 quic-go를 사용하여 gRPC 연결을 수립하는 다이얼러다.
type QUICDialer struct {
    tlsConfig *tls.Config
}

// NewQUICDialer는 TLS 설정으로 QUIC 다이얼러를 초기화한다.
func NewQUICDialer(tlsConfig *tls.Config) *QUICDialer {
    return &QUICDialer{tlsConfig: tlsConfig}
}

// DialContext는 gRPC의 WithContextDialer에 전달될 다이얼 함수다.
func (d *QUICDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
    // QUIC connection 수립
    conn, err := quic.DialAddr(ctx, addr, d.tlsConfig, &quic.Config{
        EnableDatagrams: true,
        MaxIdleTimeout:  30 * 60 * 1e9,
    })
    if err != nil {
        return nil, fmt.Errorf("quic dial %s: %w", addr, err)
    }

    // 첫 번째 스트림 개방 (gRPC framing에 사용)
    stream, err := conn.OpenStreamSync(ctx)
    if err != nil {
        conn.CloseWithError(0, "stream open failed")
        return nil, fmt.Errorf("open quic stream: %w", err)
    }

    return &quicStreamConn{
        conn:   conn,
        stream: stream,
    }, nil
}

// quicStreamConn은 QUIC 스트림을 net.Conn으로 노출한다.
type quicStreamConn struct {
    conn   quic.Connection
    stream quic.Stream
}

func (c *quicStreamConn) Read(b []byte) (int, error)  { return c.stream.Read(b) }
func (c *quicStreamConn) Write(b []byte) (int, error) { return c.stream.Write(b) }
func (c *quicStreamConn) Close() error {
    c.stream.Close()
    return c.conn.CloseWithError(0, "client closed")
}
func (c *quicStreamConn) LocalAddr() net.Addr  { return c.conn.LocalAddr() }
func (c *quicStreamConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() }
func (c *quicStreamConn) SetDeadline(t interface{}) error      { return nil }
func (c *quicStreamConn) SetReadDeadline(t interface{}) error  { return nil }
func (c *quicStreamConn) SetWriteDeadline(t interface{}) error { return nil }

// T1Client는 T1 클라우드에서 T2 게이트웨이로 연결하는 gRPC 클라이언트다.
type T1Client struct {
    conn         *grpc.ClientConn
    agentClient  agentv1.AgentServiceClient
    telemetryClient telemetryv1.TelemetryServiceClient
}

// NewT1Client는 T2 게이트웨이에 gRPC over QUIC 연결을 수립한다.
func NewT1Client(ctx context.Context, gatewayAddr string, tlsConfig *tls.Config) (*T1Client, error) {
    dialer := NewQUICDialer(tlsConfig)

    conn, err := grpc.DialContext(
        ctx,
        gatewayAddr,
        grpc.WithContextDialer(dialer.DialContext),
        grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
        // Retry policy: 무선 손실 시 지수 백오프
        grpc.WithDefaultServiceConfig(`{
            "methodConfig": [{
                "name": [{}],
                "retryPolicy": {
                    "maxAttempts": 5,
                    "initialBackoff": "0.5s",
                    "maxBackoff": "30s",
                    "backoffMultiplier": 2,
                    "retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
                }
            }]
        }`),
    )
    if err != nil {
        return nil, fmt.Errorf("grpc dial: %w", err)
    }

    return &T1Client{
        conn:            conn,
        agentClient:     agentv1.NewAgentServiceClient(conn),
        telemetryClient: telemetryv1.NewTelemetryServiceClient(conn),
    }, nil
}

// ExecuteTask는 T2 게이트웨이에 AgentTask를 전송한다.
func (c *T1Client) ExecuteTask(ctx context.Context, task *agentv1.AgentTask) (*agentv1.AgentResult, error) {
    return c.agentClient.Execute(ctx, task)
}

// StreamTelemetry는 T2 게이트웨이에서 텔레메트리 스트림을 구독한다.
func (c *T1Client) StreamTelemetry(
    ctx context.Context,
    query *telemetryv1.TelemetryQuery,
    handler func(*telemetryv1.DataPoint) error,
) error {
    stream, err := c.telemetryClient.Stream(ctx, query)
    if err != nil {
        return fmt.Errorf("stream start: %w", err)
    }

    for {
        dp, err := stream.Recv()
        if err != nil {
            return fmt.Errorf("stream recv: %w", err)
        }
        if err := handler(dp); err != nil {
            return err
        }
    }
}

// Close는 gRPC 연결을 종료한다.
func (c *T1Client) Close() error { return c.conn.Close() }

6. T3 Field Agent 구현 설계

6.1 MQTT 텔레메트리 발행

internal/field/mqtt_publisher.go

package field

import (
    "context"
    "encoding/json"
    "fmt"
    "time"

    mqtt "github.com/eclipse/paho.mqtt.golang"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/propagation"
)

// TelemetryPayload는 T3 디바이스가 발행하는 텔레메트리 JSON 구조체다.
type TelemetryPayload struct {
    Value float64           `json:"v"`
    Unit  string            `json:"u"`
    TS    int64             `json:"ts"` // Unix milliseconds
    Props map[string]string `json:"p,omitempty"` // MQTT 5.0 UserProperties 대체
}

// FieldConfig는 T3 Field Agent 설정을 담는다.
type FieldConfig struct {
    DeviceID   string
    SiteID     string
    MQTTBroker string
    MQTTPort   int
    OfflineDB  string // BoltDB 파일 경로
}

// FieldAgent는 T3 현장 에이전트다.
type FieldAgent struct {
    cfg          FieldConfig
    mqttClient   mqtt.Client
    offlineQueue *OfflineQueue
    tracer       interface{} // OTel tracer
}

// NewFieldAgent는 Field Agent를 초기화한다.
func NewFieldAgent(cfg FieldConfig) (*FieldAgent, error) {
    // MQTT 클라이언트 옵션 설정
    opts := mqtt.NewClientOptions().
        AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTBroker, cfg.MQTTPort)).
        SetClientID(fmt.Sprintf("field-%s", cfg.DeviceID)).
        SetAutoReconnect(true).
        SetConnectRetry(true).
        SetConnectRetryInterval(5 * time.Second).
        SetMaxReconnectInterval(60 * time.Second).
        SetCleanSession(false). // 오프라인 중 QoS 1 메시지 보존
        SetResumeSubs(true)

    // 연결 로스트 핸들러
    opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
        fmt.Printf("[%s] MQTT connection lost: %v\n", cfg.DeviceID, err)
    })

    // 재연결 핸들러
    opts.SetOnConnectHandler(func(client mqtt.Client) {
        fmt.Printf("[%s] MQTT reconnected\n", cfg.DeviceID)
    })

    client := mqtt.NewClient(opts)
    if token := client.Connect(); token.Wait() && token.Error() != nil {
        return nil, fmt.Errorf("mqtt connect: %w", token.Error())
    }

    offlineQueue, err := NewOfflineQueue(cfg.OfflineDB)
    if err != nil {
        return nil, fmt.Errorf("offline queue: %w", err)
    }

    return &FieldAgent{
        cfg:          cfg,
        mqttClient:   client,
        offlineQueue: offlineQueue,
    }, nil
}

// PublishTelemetry는 텔레메트리 데이터를 MQTT로 발행한다.
// 오프라인 상태라면 로컬 큐에 저장한다.
func (a *FieldAgent) PublishTelemetry(ctx context.Context, metric string, value float64, unit string) error {
    topic := fmt.Sprintf("telemetry/%s/%s/%s", a.cfg.SiteID, a.cfg.DeviceID, metric)

    // OTel trace context를 페이로드 Props에 포함
    carrier := propagation.MapCarrier{}
    otel.GetTextMapPropagator().Inject(ctx, carrier)

    payload := TelemetryPayload{
        Value: value,
        Unit:  unit,
        TS:    time.Now().UnixMilli(),
        Props: carrier, // traceparent 등 포함
    }

    data, err := json.Marshal(payload)
    if err != nil {
        return fmt.Errorf("marshal payload: %w", err)
    }

    // MQTT 연결 상태 확인
    if !a.mqttClient.IsConnected() {
        // 오프라인 큐에 저장
        return a.offlineQueue.Enqueue(topic, data)
    }

    // QoS 1로 발행 (전달 보장)
    token := a.mqttClient.Publish(topic, 1, false, data)
    token.Wait()
    if err := token.Error(); err != nil {
        // 발행 실패 시 오프라인 큐로 폴백
        if qErr := a.offlineQueue.Enqueue(topic, data); qErr != nil {
            return fmt.Errorf("publish failed and queue failed: %v, %v", err, qErr)
        }
        return nil // 큐에 저장 성공
    }

    return nil
}

// FlushOfflineQueue는 재연결 후 오프라인 큐를 드레인한다.
func (a *FieldAgent) FlushOfflineQueue(ctx context.Context) error {
    if !a.mqttClient.IsConnected() {
        return fmt.Errorf("not connected")
    }

    return a.offlineQueue.Drain(func(topic string, data []byte) error {
        token := a.mqttClient.Publish(topic, 1, false, data)
        token.Wait()
        return token.Error()
    })
}

6.2 오프라인 큐 (BoltDB)

internal/field/offline_queue.go

package field

import (
    "encoding/binary"
    "fmt"
    "time"

    bolt "go.etcd.io/bbolt"
)

var queueBucket = []byte("offline_queue")

// QueueEntry는 오프라인 큐의 단일 항목이다.
type QueueEntry struct {
    Topic    string
    Payload  []byte
    EnqueuedAt time.Time
}

// OfflineQueue는 BoltDB 기반 오프라인 메시지 큐다.
type OfflineQueue struct {
    db *bolt.DB
}

// NewOfflineQueue는 BoltDB 파일을 열고 큐 버킷을 초기화한다.
func NewOfflineQueue(path string) (*OfflineQueue, error) {
    db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
    if err != nil {
        return nil, fmt.Errorf("bolt open %s: %w", path, err)
    }

    if err := db.Update(func(tx *bolt.Tx) error {
        _, err := tx.CreateBucketIfNotExists(queueBucket)
        return err
    }); err != nil {
        return nil, fmt.Errorf("create bucket: %w", err)
    }

    return &OfflineQueue{db: db}, nil
}

// Enqueue는 메시지를 큐에 추가한다.
func (q *OfflineQueue) Enqueue(topic string, payload []byte) error {
    return q.db.Update(func(tx *bolt.Tx) error {
        b := tx.Bucket(queueBucket)

        // 자동 증가 시퀀스 키 생성
        seq, err := b.NextSequence()
        if err != nil {
            return fmt.Errorf("next sequence: %w", err)
        }

        key := make([]byte, 8)
        binary.BigEndian.PutUint64(key, seq)

        // topic + null + payload 직렬화
        value := append([]byte(topic+"\x00"), payload...)

        return b.Put(key, value)
    })
}

// Drain은 큐의 모든 항목을 순서대로 처리하고 성공한 항목을 삭제한다.
func (q *OfflineQueue) Drain(handler func(topic string, payload []byte) error) error {
    return q.db.Update(func(tx *bolt.Tx) error {
        b := tx.Bucket(queueBucket)

        var toDelete [][]byte

        if err := b.ForEach(func(k, v []byte) error {
            // topic 분리
            for i, c := range v {
                if c == 0x00 {
                    topic := string(v[:i])
                    payload := v[i+1:]
                    if err := handler(topic, payload); err != nil {
                        return err // 실패 시 드레인 중단
                    }
                    toDelete = append(toDelete, append([]byte{}, k...))
                    return nil
                }
            }
            return fmt.Errorf("invalid queue entry")
        }); err != nil {
            return err
        }

        // 성공한 항목 삭제
        for _, key := range toDelete {
            if err := b.Delete(key); err != nil {
                return err
            }
        }

        return nil
    })
}

// Size는 큐에 저장된 항목 수를 반환한다.
func (q *OfflineQueue) Size() (int, error) {
    var count int
    err := q.db.View(func(tx *bolt.Tx) error {
        b := tx.Bucket(queueBucket)
        count = b.Stats().KeyN
        return nil
    })
    return count, err
}

// Close는 BoltDB를 닫는다.
func (q *OfflineQueue) Close() error { return q.db.Close() }

7. SPIFFE/SPIRE 통합

7.1 아키텍처 개요

SPIRE Server (T1)
    ├── Registration Entry: T1 에이전트 (k8s SA)
    ├── Registration Entry: T2 게이트웨이 (HW serial)
    └── Registration Entry: T3 디바이스 (TPM attestation)

SPIRE Agent (각 노드에 DaemonSet/프로세스로 실행)
    └── Workload API (Unix socket)
        └── Go Workload (SVID 자동 갱신)

7.2 Go SDK 사용법

internal/security/spiffe.go

package security

import (
    "context"
    "fmt"

    "github.com/spiffe/go-spiffe/v2/spiffeid"
    "github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
    "github.com/spiffe/go-spiffe/v2/workloadapi"
    "google.golang.org/grpc/credentials"
)

// SPIFFECredentials는 SPIRE Workload API에서 SVID를 가져와
// gRPC TransportCredentials를 생성한다.
type SPIFFECredentials struct {
    source *workloadapi.X509Source
}

// NewSPIFFECredentials는 SPIRE Agent 소켓에 연결하고
// X.509 SVID 소스를 초기화한다.
func NewSPIFFECredentials(ctx context.Context, socketPath string) (*SPIFFECredentials, error) {
    // SPIRE Agent Workload API에 연결
    source, err := workloadapi.NewX509Source(
        ctx,
        workloadapi.WithClientOptions(
            workloadapi.WithAddr("unix://" + socketPath),
        ),
    )
    if err != nil {
        return nil, fmt.Errorf("workload API connect: %w", err)
    }

    return &SPIFFECredentials{source: source}, nil
}

// ServerCredentials는 gRPC 서버용 mTLS 자격증명을 반환한다.
// 클라이언트 SVID를 trustDomain으로 검증한다.
func (s *SPIFFECredentials) ServerCredentials(trustDomain string) (credentials.TransportCredentials, error) {
    td, err := spiffeid.TrustDomainFromString(trustDomain)
    if err != nil {
        return nil, fmt.Errorf("trust domain: %w", err)
    }

    tlsCfg := tlsconfig.MTLSServerConfig(
        s.source,
        s.source,
        tlsconfig.AuthorizeMemberOf(td),
    )
    return credentials.NewTLS(tlsCfg), nil
}

// ClientCredentials는 gRPC 클라이언트용 mTLS 자격증명을 반환한다.
// 서버 SPIFFE ID를 serverID로 검증한다.
func (s *SPIFFECredentials) ClientCredentials(serverSpiffeID string) (credentials.TransportCredentials, error) {
    serverID, err := spiffeid.IDFromString(serverSpiffeID)
    if err != nil {
        return nil, fmt.Errorf("server SPIFFE ID: %w", err)
    }

    tlsCfg := tlsconfig.MTLSClientConfig(
        s.source,
        s.source,
        tlsconfig.AuthorizeID(serverID),
    )
    return credentials.NewTLS(tlsCfg), nil
}

// Close는 X509Source를 닫는다.
func (s *SPIFFECredentials) Close() error { return s.source.Close() }

7.3 SVID 발급 흐름

T3 Field Agent 부팅
    │
    ├─► TPM Quote 생성 (PCR 측정값)
    │
    ├─► SPIRE Agent에 Attestation 요청 (TPM 플러그인)
    │       └── SPIRE Agent → SPIRE Server 검증
    │
    ├─► SPIRE Server: Registration Entry 매칭
    │       └── spiffe://iot/{org}/{factory}/device/{serial}
    │
    ├─► X.509-SVID 발급 (TTL: 1시간)
    │       ├── Subject: spiffe://iot/...
    │       └── SAN: URI = SPIFFE ID
    │
    └─► Workload API를 통해 Go 워크로드에 SVID 제공
            └── go-spiffe SDK가 자동 갱신 (만료 10분 전)

7.4 SPIRE Registration Entry 예시

# T2 게이트웨이 등록 (하드웨어 시리얼 기반)
spire-server entry create \
  -spiffeID spiffe://edge-aiot/t2/gateway/GW-001 \
  -parentID spiffe://edge-aiot/spire-agent \
  -selector unix:uid:1000 \
  -selector aws_iid:instance-id:i-0abc123

# T3 Field Agent 등록 (TPM attestation)
spire-server entry create \
  -spiffeID spiffe://edge-aiot/t3/factory-a/device/SENSOR-001 \
  -parentID spiffe://edge-aiot/spire-agent \
  -selector tpm:pub_hash:sha256:abcdef...

8. OpenTelemetry 통합

8.1 TracerProvider 초기화

internal/observability/tracer.go

package observability

import (
    "context"
    "fmt"

    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/sdk/resource"
    sdktrace "go.opentelemetry.io/otel/sdk/trace"
    semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
    "google.golang.org/grpc"
    "google.golang.org/grpc/credentials/insecure"
)

// InitTracer는 OTel TracerProvider를 초기화하고 글로벌에 등록한다.
// otlpEndpoint: OTel Collector OTLP gRPC 엔드포인트 (예: "otel-collector:4317")
func InitTracer(ctx context.Context, serviceName, otlpEndpoint string) (func(context.Context) error, error) {
    // OTLP gRPC exporter 생성
    conn, err := grpc.DialContext(ctx, otlpEndpoint,
        grpc.WithTransportCredentials(insecure.NewCredentials()),
    )
    if err != nil {
        return nil, fmt.Errorf("grpc dial otlp: %w", err)
    }

    exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
    if err != nil {
        return nil, fmt.Errorf("otlp exporter: %w", err)
    }

    // 리소스 속성 (서비스 이름, 버전, 배포 환경)
    res, err := resource.New(ctx,
        resource.WithAttributes(
            semconv.ServiceName(serviceName),
            semconv.ServiceVersion("0.1.0"),
        ),
        resource.WithOS(),
        resource.WithHost(),
    )
    if err != nil {
        return nil, fmt.Errorf("resource: %w", err)
    }

    // TracerProvider 생성
    tp := sdktrace.NewTracerProvider(
        sdktrace.WithBatcher(exporter),
        sdktrace.WithResource(res),
        sdktrace.WithSampler(sdktrace.AlwaysSample()), // 실험 환경 — 프로덕션은 ParentBased
    )

    // 글로벌 TracerProvider 및 Propagator 설정
    otel.SetTracerProvider(tp)
    otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
        propagation.TraceContext{}, // W3C traceparent/tracestate
        propagation.Baggage{},
    ))

    // 종료 함수 반환 (defer로 호출)
    return tp.Shutdown, nil
}

8.2 gRPC Interceptor에서 Trace Propagation

internal/gateway/interceptors/otel.go

package interceptors

import (
    "context"

    "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    "go.opentelemetry.io/otel"
    "go.opentelemetry.io/otel/attribute"
    "go.opentelemetry.io/otel/codes"
    "go.opentelemetry.io/otel/propagation"
    "go.opentelemetry.io/otel/trace"
    "google.golang.org/grpc"
    "google.golang.org/grpc/metadata"
)

// OTelUnary는 Unary RPC에 OTel trace를 주입하는 인터셉터다.
// otelgrpc.UnaryServerInterceptor()를 래핑하여 Resume Token trace 연속성을 추가한다.
func OTelUnary() grpc.UnaryServerInterceptor {
    base := otelgrpc.UnaryServerInterceptor()
    return func(
        ctx context.Context,
        req interface{},
        info *grpc.UnaryServerInfo,
        handler grpc.UnaryHandler,
    ) (interface{}, error) {
        // Resume Token에 포함된 traceparent로 trace 연속성 복원
        ctx = restoreTraceFromResumeToken(ctx)
        return base(ctx, req, info, handler)
    }
}

// OTelStream는 Streaming RPC에 OTel trace를 주입하는 인터셉터다.
func OTelStream() grpc.StreamServerInterceptor {
    base := otelgrpc.StreamServerInterceptor()
    return func(
        srv interface{},
        ss grpc.ServerStream,
        info *grpc.StreamServerInfo,
        handler grpc.StreamHandler,
    ) error {
        ctx := restoreTraceFromResumeToken(ss.Context())
        wrapped := &otelServerStream{ServerStream: ss, ctx: ctx}
        return base(srv, wrapped, info, handler)
    }
}

// restoreTraceFromResumeToken은 Resume Token에 포함된 traceparent를
// context에 복원하여 단절-재연결 후에도 trace 체인이 유지되도록 한다.
func restoreTraceFromResumeToken(ctx context.Context) context.Context {
    state, ok := GetResumeState(ctx)
    if !ok || state.TraceParent == "" {
        return ctx
    }

    // traceparent를 metadata carrier로 변환하여 propagate
    carrier := propagation.MapCarrier{
        "traceparent": state.TraceParent,
    }
    return otel.GetTextMapPropagator().Extract(ctx, carrier)
}

// InjectResumeTraceParent는 현재 span의 traceparent를 ResumeState에 저장한다.
// Resume Token 발급 시 호출한다.
func InjectResumeTraceParent(ctx context.Context) string {
    carrier := propagation.MapCarrier{}
    otel.GetTextMapPropagator().Inject(ctx, carrier)
    return carrier.Get("traceparent")
}

// T2FanOutSpan은 T3 디바이스에서 전달된 trace를 T2 게이트웨이에서
// fan-out 지점으로 처리하는 span을 생성한다.
func T2FanOutSpan(ctx context.Context, deviceID, metric string) (context.Context, trace.Span) {
    tracer := otel.Tracer("t2-gateway")
    return tracer.Start(ctx, "t2.fanout",
        trace.WithAttributes(
            attribute.String("device.id", deviceID),
            attribute.String("telemetry.metric", metric),
        ),
    )
}

// RecordError는 gRPC 에러를 span에 기록한다.
func RecordError(span trace.Span, err error) {
    if err != nil {
        span.RecordError(err)
        span.SetStatus(codes.Error, err.Error())
    }
}

type otelServerStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (s *otelServerStream) Context() context.Context { return s.ctx }

8.3 MQTT→gRPC Trace 연속성 패턴

T3 디바이스가 MQTT로 발행할 때 UserProperties(MQTT 5.0) 또는 페이로드 필드로 traceparent를 포함시키고, T2 게이트웨이의 Translator가 이를 gRPC metadata로 변환하여 T1까지 trace 체인을 연결한다.

// MQTT 페이로드의 Props 필드에서 traceparent 추출 후 context 복원 예시
func extractTraceFromMQTTPayload(payload TelemetryPayload) context.Context {
    ctx := context.Background()
    if len(payload.Props) == 0 {
        return ctx
    }
    carrier := propagation.MapCarrier(payload.Props)
    return otel.GetTextMapPropagator().Extract(ctx, carrier)
}

9. Docker Compose 실험 환경 설계

deployments/docker-compose.yml

version: "3.9"

networks:
  t1-net:
    driver: bridge
  t2-net:
    driver: bridge
  t3-net:
    driver: bridge
  obs-net:
    driver: bridge

volumes:
  spire-data:
  prometheus-data:
  grafana-data:

services:
  # ─────────────────────────────────────────────────────
  # T1: 클라우드 계층
  # ─────────────────────────────────────────────────────
  cloud-server:
    build:
      context: ..
      dockerfile: deployments/Dockerfile.go
      args:
        CMD: cmd/cloud-server
    image: edge-aiot/cloud-server:dev
    container_name: t1-cloud-server
    networks: [t1-net, obs-net]
    ports:
      - "9443:9443"   # gRPC over QUIC
    environment:
      - GRPC_ADDR=:9443
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
      - SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
    volumes:
      - /run/spire/t1/sockets:/run/spire/sockets:ro
    depends_on:
      - spire-server
      - otel-collector

  t1-orchestrator:
    build:
      context: ../python
      dockerfile: ../deployments/Dockerfile.python
    image: edge-aiot/orchestrator:dev
    container_name: t1-orchestrator
    networks: [t1-net, obs-net]
    ports:
      - "8080:8080"   # FastAPI 관리 인터페이스
    environment:
      - GATEWAY_A2A_URL=https://t2-gateway:8443
      - CLOUD_GRPC_ADDR=cloud-server:9443
      - LLM_MODEL=gpt-4o
      - OPENAI_API_KEY=${OPENAI_API_KEY}
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    depends_on:
      - cloud-server

  # ─────────────────────────────────────────────────────
  # T2: 엣지 계층
  # ─────────────────────────────────────────────────────
  t2-gateway:
    build:
      context: ..
      dockerfile: deployments/Dockerfile.go
      args:
        CMD: cmd/gateway
    image: edge-aiot/gateway:dev
    container_name: t2-gateway
    networks: [t1-net, t2-net, t3-net, obs-net]
    ports:
      - "19443:9443"  # gRPC over QUIC (T1 방향)
      - "18443:8443"  # A2A HTTP/3
    environment:
      - GRPC_ADDR=:9443
      - A2A_ADDR=:8443
      - MQTT_BROKER=mqtt-broker
      - MQTT_PORT=1883
      - T1_GRPC_ADDR=cloud-server:9443
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
      - SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
    volumes:
      - /run/spire/t2/sockets:/run/spire/sockets:ro
    depends_on:
      - mqtt-broker
      - spire-server
      - otel-collector

  # ─────────────────────────────────────────────────────
  # T3: 현장 계층 (복수 인스턴스)
  # ─────────────────────────────────────────────────────
  t3-field-agent-1:
    build:
      context: ..
      dockerfile: deployments/Dockerfile.go
      args:
        CMD: cmd/field-agent
    image: edge-aiot/field-agent:dev
    container_name: t3-field-agent-1
    networks: [t3-net, obs-net]
    environment:
      - DEVICE_ID=SENSOR-001
      - SITE_ID=factory-a
      - MQTT_BROKER=mqtt-broker
      - MQTT_PORT=1883
      - OFFLINE_DB=/data/queue.db
      - OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
    volumes:
      - ./data/agent-1:/data
    depends_on:
      - mqtt-broker

  t3-field-agent-2:
    extends:
      service: t3-field-agent-1
    container_name: t3-field-agent-2
    environment:
      - DEVICE_ID=SENSOR-002
      - SITE_ID=factory-a
      - MQTT_BROKER=mqtt-broker
      - MQTT_PORT=1883
      - OFFLINE_DB=/data/queue.db
    volumes:
      - ./data/agent-2:/data

  t3-field-agent-3:
    extends:
      service: t3-field-agent-1
    container_name: t3-field-agent-3
    environment:
      - DEVICE_ID=AGV-001
      - SITE_ID=factory-a
      - MQTT_BROKER=mqtt-broker
      - MQTT_PORT=1883
      - OFFLINE_DB=/data/queue.db
    volumes:
      - ./data/agent-3:/data

  # ─────────────────────────────────────────────────────
  # 인프라: MQTT 브로커
  # ─────────────────────────────────────────────────────
  mqtt-broker:
    image: eclipse-mosquitto:2.0
    container_name: mqtt-broker
    networks: [t2-net, t3-net]
    ports:
      - "1883:1883"
      - "9001:9001"   # WebSocket
    volumes:
      - ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro

  # ─────────────────────────────────────────────────────
  # 보안: SPIRE
  # ─────────────────────────────────────────────────────
  spire-server:
    image: ghcr.io/spiffe/spire-server:1.9.0
    container_name: spire-server
    networks: [t1-net]
    ports:
      - "8081:8081"
    volumes:
      - ./spire/server.conf:/etc/spire/server/server.conf:ro
      - spire-data:/var/lib/spire
    command: ["-config", "/etc/spire/server/server.conf"]

  spire-agent-t1:
    image: ghcr.io/spiffe/spire-agent:1.9.0
    container_name: spire-agent-t1
    networks: [t1-net]
    volumes:
      - ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
      - /run/spire/t1/sockets:/run/spire/sockets
    depends_on: [spire-server]
    command: ["-config", "/etc/spire/agent/agent.conf"]

  spire-agent-t2:
    image: ghcr.io/spiffe/spire-agent:1.9.0
    container_name: spire-agent-t2
    networks: [t2-net]
    volumes:
      - ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
      - /run/spire/t2/sockets:/run/spire/sockets
    depends_on: [spire-server]
    command: ["-config", "/etc/spire/agent/agent.conf"]

  # ─────────────────────────────────────────────────────
  # 관측성: OTel Collector + Prometheus + Grafana + Jaeger
  # ─────────────────────────────────────────────────────
  otel-collector:
    image: otel/opentelemetry-collector-contrib:0.100.0
    container_name: otel-collector
    networks: [t1-net, t2-net, t3-net, obs-net]
    ports:
      - "4317:4317"   # OTLP gRPC
      - "4318:4318"   # OTLP HTTP
      - "8889:8889"   # Prometheus metrics exporter
    volumes:
      - ./otel/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
    command: ["--config", "/etc/otelcol/config.yaml"]

  jaeger:
    image: jaegertracing/all-in-one:1.57
    container_name: jaeger
    networks: [obs-net]
    ports:
      - "16686:16686" # Jaeger UI
      - "14250:14250" # gRPC (OTel Collector → Jaeger)

  prometheus:
    image: prom/prometheus:v2.51.0
    container_name: prometheus
    networks: [obs-net]
    ports:
      - "9090:9090"
    volumes:
      - ./otel/prometheus.yml:/etc/prometheus/prometheus.yml:ro
      - prometheus-data:/prometheus

  grafana:
    image: grafana/grafana:10.4.0
    container_name: grafana
    networks: [obs-net]
    ports:
      - "3000:3000"
    environment:
      - GF_SECURITY_ADMIN_PASSWORD=admin
      - GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/edge-aiot.json
    volumes:
      - grafana-data:/var/lib/grafana
      - ./grafana/dashboards:/var/lib/grafana/dashboards:ro
    depends_on:
      - prometheus
      - jaeger

deployments/Dockerfile.go

# 멀티 스테이지 빌드 — 최소 런타임 이미지
FROM golang:1.22-alpine AS builder

WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download

COPY . .

ARG CMD=cmd/gateway
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/app ./${CMD}/...

FROM gcr.io/distroless/static-debian12
COPY --from=builder /bin/app /app
ENTRYPOINT ["/app"]

deployments/otel/otel-collector-config.yaml (핵심 스니펫)

receivers:
  otlp:
    protocols:
      grpc:
        endpoint: 0.0.0.0:4317
      http:
        endpoint: 0.0.0.0:4318

processors:
  batch:
    timeout: 1s
    send_batch_size: 1024
  memory_limiter:
    check_interval: 1s
    limit_mib: 512

exporters:
  jaeger:
    endpoint: jaeger:14250
    tls:
      insecure: true
  prometheus:
    endpoint: "0.0.0.0:8889"

service:
  pipelines:
    traces:
      receivers: [otlp]
      processors: [memory_limiter, batch]
      exporters: [jaeger]
    metrics:
      receivers: [otlp]
      processors: [memory_limiter, batch]
      exporters: [prometheus]

10. 구현 단계별 계획 (Phase)

Phase 1: Proto 정의 + 기본 gRPC over QUIC (2주)

목표: 컴파일 가능한 Proto + quic-go 위에서 gRPC 서버/클라이언트 통신 검증

작업 항목 산출물
1 Proto 파일 설계 및 buf generate 설정 proto/, gen/ 디렉터리
1 Go 모듈 초기화, 의존성 설치 go.mod, go.sum
2 quic-go + gRPC-Go 통합 (QuicNetListener 구현) internal/gateway/server.go
2 T1 gRPC-over-QUIC 클라이언트 구현 internal/cloud/grpc_quic_client.go
2 기본 AgentService Unary RPC 동작 검증 tests/integration/basic_grpc_test.go

완료 기준: go test ./tests/integration/... -run TestBasicUnaryRPC 통과

Phase 2: MQTT 변환 게이트웨이 (2주)

목표: T3 MQTT 발행 → T2 변환 → T1 gRPC 전달 end-to-end 동작

작업 항목 산출물
3 MQTT 클라이언트 구현 + Topic 라우팅 internal/gateway/mqtt_client.go
3 MQTT→gRPC 변환 엔진 (Translator) internal/gateway/translator.go
4 T3 Field Agent (MQTT 발행 + BoltDB 오프라인 큐) internal/field/ 패키지
4 Docker Compose 기본 환경 (T1+T2+T3+Mosquitto) deployments/docker-compose.yml
4 오프라인 시나리오 테스트 (MQTT 브로커 중단 후 재연결) tests/integration/offline_test.go

완료 기준: T3 → MQTT → T2 Translator → gRPC → T1 텔레메트리 수신 확인

Phase 3: A2A 통합 + Resume Token (2주)

목표: A2A HTTP/3 엔드포인트 + Resume Token 기반 스트리밍 재개 동작

작업 항목 산출물
5 Resume Token Manager + gRPC Interceptor internal/gateway/resume_token.go, interceptors/resume.go
5 A2A HTTP/3 핸들러 (quic-go http3 패키지) internal/gateway/a2a_handler.go
6 Python A2A 클라이언트 + LangGraph 에이전트 python/orchestrator/
6 스트리밍 중단-재개 통합 테스트 tests/integration/resume_test.go

완료 기준: 스트리밍 중 네트워크 차단 → 재연결 → Resume Token으로 이어받기 검증

Phase 4: SPIFFE/SPIRE + OTel (2주)

목표: mTLS(SPIFFE SVID) + 분산 트레이싱 end-to-end 동작

작업 항목 산출물
7 SPIRE Server/Agent Docker Compose 통합 deployments/docker-compose.spire.yml
7 go-spiffe SDK 래퍼 + gRPC credentials 교체 internal/security/spiffe.go
8 OTel TracerProvider + otelgrpc 인터셉터 internal/observability/tracer.go
8 MQTT→gRPC trace propagation (traceparent in payload Props) internal/gateway/interceptors/otel.go
8 Jaeger에서 T3→T2→T1 trace 체인 시각화 확인 -

완료 기준: Jaeger에서 단일 MQTT 발행의 전체 트레이스(T3→T2→T1) 확인

Phase 5: 실험 검증 (2주)

목표: 성능 지표 측정, 단절 내성 검증, 문서화

작업 항목 산출물
9 성능 벤치마크 (gRPC vs REST 비교, latency/throughput) tests/perf/benchmark_test.go
9 네트워크 파티션 시나리오 (tc netem으로 손실·지연 주입) chaos 테스트 스크립트
10 Grafana 대시보드 구성 (latency, throughput, error rate) deployments/grafana/dashboards/
10 실험 결과 분석 및 구현 설계서 업데이트 본 문서 v1.0

완료 기준: 모든 KPI 측정값 수집 완료, Grafana 대시보드 정상 동작


11. 테스트 전략

11.1 단위 테스트

대상 패키지                        테스트 항목
─────────────────────────────────  ────────────────────────────────────
internal/gateway/resume_token.go   토큰 발급/검증, 서명 위조 감지, 만료 처리
internal/gateway/translator.go     MQTT 토픽 패턴 매칭, JSON→Protobuf 변환
internal/field/offline_queue.go    Enqueue/Drain 순서 보장, 동시 접근
internal/security/spiffe.go        SVID 갱신 모킹, 인증 실패 처리
// 예시: Resume Token 단위 테스트
func TestResumeTokenRoundTrip(t *testing.T) {
    mgr := NewResumeTokenManager()
    state := ResumeState{
        StreamID:    "stream-abc",
        SequenceNum: 42,
        DeviceID:    "SENSOR-001",
        TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
    }

    token, err := mgr.Issue(state)
    require.NoError(t, err)
    require.NotEmpty(t, token)

    restored, err := mgr.Verify(token)
    require.NoError(t, err)
    assert.Equal(t, state.StreamID, restored.StreamID)
    assert.Equal(t, state.SequenceNum, restored.SequenceNum)
    assert.Equal(t, state.TraceParent, restored.TraceParent)
}

func TestResumeTokenTampering(t *testing.T) {
    mgr := NewResumeTokenManager()
    state := ResumeState{StreamID: "s1", SequenceNum: 1, DeviceID: "d1"}

    token, _ := mgr.Issue(state)
    // 마지막 바이트 변조
    tampered := token[:len(token)-2] + "ZZ"

    _, err := mgr.Verify(tampered)
    assert.Error(t, err)
}

11.2 통합 테스트

// 예시: MQTT→gRPC 변환 통합 테스트 (testcontainers-go 사용)
func TestMQTTToGRPCTranslation(t *testing.T) {
    ctx := context.Background()

    // Mosquitto 컨테이너 기동
    mosquitto, err := testcontainers.GenericContainer(ctx,
        testcontainers.GenericContainerRequest{
            ContainerRequest: testcontainers.ContainerRequest{
                Image:        "eclipse-mosquitto:2.0",
                ExposedPorts: []string{"1883/tcp"},
                WaitingFor:   wait.ForListeningPort("1883/tcp"),
            },
            Started: true,
        },
    )
    require.NoError(t, err)
    defer mosquitto.Terminate(ctx)

    host, _ := mosquitto.Host(ctx)
    port, _ := mosquitto.MappedPort(ctx, "1883")
    brokerAddr := fmt.Sprintf("%s:%s", host, port.Port())

    // Gateway 초기화
    gw, err := gateway.New(ctx, gateway.GatewayConfig{
        GRPCAddr:   ":0",
        MQTTBroker: brokerAddr,
    })
    require.NoError(t, err)

    // 텔레메트리 수신 채널
    received := make(chan *telemetryv1.DataPoint, 1)
    gw.SetTelemetryHandler(func(dp *telemetryv1.DataPoint) {
        received <- dp
    })

    // T3 역할: MQTT 발행
    publisher, _ := newTestMQTTClient(brokerAddr)
    payload := `{"v":42.5,"u":"celsius","ts":1234567890000}`
    publisher.Publish("telemetry/factory-a/SENSOR-001/temperature", 1, false, payload)

    // 변환 결과 검증
    select {
    case dp := <-received:
        assert.Equal(t, "SENSOR-001", dp.DeviceId)
        assert.Equal(t, "temperature", dp.Metric)
        assert.InDelta(t, 42.5, dp.Value, 0.001)
    case <-time.After(5 * time.Second):
        t.Fatal("timeout waiting for telemetry")
    }
}

11.3 성능 테스트

// tests/perf/benchmark_test.go
func BenchmarkUnaryRPCOverQUIC(b *testing.B) {
    // 환경: Docker Compose로 T1+T2 기동 후 실행
    client, err := cloud.NewT1Client(
        context.Background(),
        "localhost:9443",
        testTLSConfig(),
    )
    require.NoError(b, err)
    defer client.Close()

    task := &agentv1.AgentTask{
        TaskId:    "bench",
        AgentId:   "t1",
        TaskType:  "query",
        Payload:   make([]byte, 256), // 256B 페이로드
    }

    b.ResetTimer()
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
            _, err := client.ExecuteTask(ctx, task)
            cancel()
            if err != nil {
                b.Errorf("RPC error: %v", err)
            }
        }
    })
}

func BenchmarkTelemetryFanIn(b *testing.B) {
    // 1000개 T3 에이전트에서 동시 텔레메트리 발행
    b.SetParallelism(1000)
    // 구현 생략 (위와 유사한 패턴)
}

12. 예상 성능 지표 및 측정 방법

12.1 목표 성능 지표 (KPI)

지표 목표값 측정 방법
T1↔T2 gRPC Unary p50 latency < 5ms (LAN) go test -bench=BenchmarkUnaryRPC + otelgrpc 자동 계측
T1↔T2 gRPC Unary p99 latency < 20ms (LAN) Jaeger → Grafana 히스토그램
T2↔T3 MQTT→gRPC 변환 지연 < 10ms MQTT 발행 TS → gRPC 수신 TS 차분
T3 텔레메트리 동시 처리 1,000 노드 @ 1Hz BenchmarkTelemetryFanIn
Resume Token 재연결 후 재개 지연 < 500ms 네트워크 차단 → 재연결 시간 측정
gRPC over QUIC vs TCP 핸드오버 QUIC -50% RTT tc netem으로 IP 변경 시뮬레이션
SPIFFE SVID 갱신 오버헤드 < 1ms/RPC pprof + Prometheus grpc_server_handling_seconds

12.2 측정 방법 상세

Prometheus 메트릭 수집

# deployments/otel/prometheus.yml
scrape_configs:
  - job_name: 'cloud-server'
    static_configs:
      - targets: ['cloud-server:2112']
    scrape_interval: 5s

  - job_name: 't2-gateway'
    static_configs:
      - targets: ['t2-gateway:2112']

  - job_name: 'otel-collector'
    static_configs:
      - targets: ['otel-collector:8889']

주요 모니터링 메트릭

# gRPC 레이턴시 히스토그램 (otelgrpc 자동 생성)
grpc_server_handling_seconds_bucket{grpc_method="Execute",grpc_service="agent.v1.AgentService"}

# MQTT 메시지 처리량
mqtt_messages_received_total{topic="telemetry"}
mqtt_translation_duration_seconds_bucket

# Resume Token 재연결
resume_token_reconnect_total
resume_token_reconnect_latency_seconds

# 오프라인 큐 크기
offline_queue_size{device_id="SENSOR-001"}

네트워크 장애 주입 (tc netem)

# T2 게이트웨이에서 T1으로 가는 경로에 30% 패킷 손실 + 100ms 지연 주입
docker exec t2-gateway tc qdisc add dev eth0 root netem loss 30% delay 100ms

# 30초 후 정상 복구
sleep 30
docker exec t2-gateway tc qdisc del dev eth0 root

# IP 변경 시뮬레이션 (QUIC connection migration 테스트)
docker network disconnect t1-net t2-gateway
docker network connect t1-net t2-gateway

핵심 그래프 (Grafana 대시보드)

  1. 레이턴시 p50/p95/p99 시계열: tier별 gRPC 호출 지연 추이
  2. 처리량 시계열: 초당 RPC 수, MQTT 메시지 수
  3. 에러율: gRPC 상태 코드별 에러율 (UNAVAILABLE, DEADLINE_EXCEEDED 등)
  4. Resume Token 이벤트: 단절/재연결 빈도 및 재개 지연
  5. 오프라인 큐 크기: T3 디바이스별 버퍼 누적 현황
  6. Jaeger 트레이스 뷰: T3→T2→T1 E2E 호출 그래프

부록 A. Go 모듈 의존성 (go.mod 핵심 부분)

module github.com/your-org/edge-aiot-mas

go 1.22

require (
    // gRPC
    google.golang.org/grpc v1.64.0
    google.golang.org/protobuf v1.34.1

    // QUIC
    github.com/quic-go/quic-go v0.44.0

    // MQTT
    github.com/eclipse/paho.mqtt.golang v1.4.3

    // SPIFFE/SPIRE
    github.com/spiffe/go-spiffe/v2 v2.3.0

    // OpenTelemetry
    go.opentelemetry.io/otel v1.27.0
    go.opentelemetry.io/otel/sdk v1.27.0
    go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
    go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0

    // BoltDB (오프라인 큐)
    go.etcd.io/bbolt v1.3.10

    // 테스트
    github.com/testcontainers/testcontainers-go v0.31.0
    github.com/stretchr/testify v1.9.0
)

부록 B. Python 의존성 (python/pyproject.toml 핵심 부분)

[project]
name = "edge-aiot-orchestrator"
version = "0.1.0"
requires-python = ">=3.11"

dependencies = [
    # LangGraph 에이전트
    "langgraph>=0.1.0",
    "langchain-openai>=0.1.0",
    "langchain-core>=0.2.0",

    # gRPC
    "grpcio>=1.64.0",
    "grpcio-tools>=1.64.0",
    "protobuf>=5.27.0",

    # HTTP/A2A 클라이언트
    "httpx[http2]>=0.27.0",

    # 관측성
    "opentelemetry-sdk>=1.27.0",
    "opentelemetry-exporter-otlp>=1.27.0",
    "opentelemetry-instrumentation-grpc>=0.48b0",
]

[project.optional-dependencies]
dev = [
    "pytest>=8.0",
    "pytest-asyncio>=0.23",
    "pytest-cov>=5.0",
]