Files
2026-06-25 12:19:20 +09:00

2661 lines
82 KiB
Markdown

# 엣지 AIoT 3-tier 멀티 에이전트 통신 프레임워크 — 구현 설계서
**문서 버전**: v0.1.0
**최종 수정**: 2026-06-07
**관련 문서**: `FINAL_REPORT.md` (설계 근거 및 사용 사례 분석)
---
## 목차
1. [기술 스택 선정 근거](#1-기술-스택-선정-근거)
2. [프로젝트 디렉터리 구조](#2-프로젝트-디렉터리-구조)
3. [Proto 파일 설계](#3-proto-파일-설계)
4. [T2 Edge Gateway 구현 설계](#4-t2-edge-gateway-구현-설계)
5. [T1 Orchestrator 구현 설계](#5-t1-orchestrator-구현-설계)
6. [T3 Field Agent 구현 설계](#6-t3-field-agent-구현-설계)
7. [SPIFFE/SPIRE 통합](#7-spiffespire-통합)
8. [OpenTelemetry 통합](#8-opentelemetry-통합)
9. [Docker Compose 실험 환경 설계](#9-docker-compose-실험-환경-설계)
10. [구현 단계별 계획 (Phase)](#10-구현-단계별-계획-phase)
11. [테스트 전략](#11-테스트-전략)
12. [예상 성능 지표 및 측정 방법](#12-예상-성능-지표-및-측정-방법)
---
## 1. 기술 스택 선정 근거
### 1.1 핵심 언어 선정
| 언어 | 적용 컴포넌트 | 선정 이유 |
|------|-------------|-----------|
| **Go** | T2 Gateway, T3 Field Agent, gRPC 서버/클라이언트 | 단일 바이너리 배포, 낮은 메모리 푸트프린트, 고루틴 기반 고동시성, quic-go 네이티브 지원, 크로스 컴파일(ARM/MIPS/x86) |
| **Python** | T1 Orchestrator LLM 에이전트, A2A 클라이언트 | LangGraph/AutoGen 등 LLM 프레임워크 생태계, 빠른 프로토타이핑, grpcio 지원 |
### 1.2 전송 계층 라이브러리
#### quic-go (`github.com/quic-go/quic-go`)
- Go 네이티브 QUIC RFC 9000 구현 — CGO 의존 없음
- `net/http` 인터페이스 호환 → HTTP/3 서버를 표준 핸들러로 운영 가능
- Connection migration 내장: 핸드오버(IP 변경) 시 세션 유지
- QUIC Datagram 확장(RFC 9221) 지원 → 저지연 V2X 메시지에 활용 가능
- gRPC-Go 트랜스포트 레이어 교체 방식으로 통합 (`WithContextDialer` + `quic.DialAddr`)
#### gRPC-Go (`google.golang.org/grpc`)
- 공식 Google Go gRPC 구현 — Interceptor Chain, Health Checking, Service Config(retry policy) 내장
- Protobuf codec 교체 가능 → 커스텀 직렬화 확장 가능
- `grpc.WithTransportCredentials` + quic-go TLS 설정을 연동하여 mTLS over QUIC 구현
#### Eclipse Paho Go (`github.com/eclipse/paho.mqtt.golang`)
- MQTT 3.1.1 / 5.0 지원 Go 클라이언트
- Reconnect 자동 처리 + Message Queue(오프라인 버퍼)
- T3 현장 디바이스와의 Pub/Sub 인터페이스
### 1.3 에이전트 프레임워크
| 프레임워크 | 선정 이유 |
|-----------|-----------|
| **LangGraph** (Python) | 상태 머신 기반 에이전트 그래프, 체크포인팅·재시도 내장, T1 Orchestrator의 복잡한 오케스트레이션 로직에 적합 |
| **A2A Protocol** | Google 주도 에이전트 간 통신 표준, HTTP/JSON 기반으로 언어 무관 상호운용성, Task/Artifact 개념으로 장기 실행 작업 모델링 |
### 1.4 보안 및 관측성
| 라이브러리 | 선정 이유 |
|-----------|-----------|
| **SPIFFE/SPIRE Go SDK** (`github.com/spiffe/go-spiffe/v2`) | X.509-SVID 자동 갱신, gRPC `credentials.TransportCredentials` 구현 제공, TPM attestation 플러그인 체인 |
| **OpenTelemetry Go SDK** (`go.opentelemetry.io/otel`) | 벤더 중립 trace/metric/log, gRPC 자동 계측(`otelgrpc`), W3C TraceContext 전파 |
| **Prometheus** | Go 네이티브 exporter, 엣지 환경 경량 스크랩 |
---
## 2. 프로젝트 디렉터리 구조
```
edge-aiot-mas/
├── proto/ # Protobuf 스키마 (언어 무관 공유)
│ ├── agent/
│ │ └── v1/
│ │ └── agent.proto # AgentService (Unary/Stream 4모드)
│ ├── telemetry/
│ │ └── v1/
│ │ └── telemetry.proto # TelemetryService
│ ├── control/
│ │ └── v1/
│ │ └── control.proto # ControlService (OTA, Config)
│ └── buf.yaml # Buf 스키마 설정
├── gen/ # proto 컴파일 출력 (생성 파일, git 추적)
│ ├── go/
│ │ ├── agent/v1/
│ │ ├── telemetry/v1/
│ │ └── control/v1/
│ └── python/
│ ├── agent/v1/
│ └── telemetry/v1/
├── go.mod # Go 모듈 루트
├── go.sum
├── cmd/
│ ├── gateway/
│ │ └── main.go # T2 Edge Gateway 진입점
│ ├── field-agent/
│ │ └── main.go # T3 Field Agent 진입점
│ └── cloud-server/
│ └── main.go # T1 Go gRPC 서버 진입점
├── internal/
│ ├── gateway/
│ │ ├── server.go # gRPC over QUIC 서버 초기화
│ │ ├── mqtt_client.go # MQTT 브로커 클라이언트
│ │ ├── translator.go # MQTT topic → gRPC method 변환 엔진
│ │ ├── resume_token.go # Resume Token 관리자
│ │ ├── a2a_handler.go # A2A HTTP/3 엔드포인트
│ │ └── interceptors/
│ │ ├── resume.go # Resume Token gRPC Interceptor
│ │ ├── deadline.go # Deadline 강제 Interceptor
│ │ └── otel.go # OTel trace 전파 Interceptor
│ │
│ ├── field/
│ │ ├── mqtt_publisher.go # MQTT 텔레메트리 발행
│ │ ├── offline_queue.go # 오프라인 큐 (BoltDB)
│ │ └── resume_client.go # Resume Token 클라이언트
│ │
│ ├── cloud/
│ │ ├── agent_service.go # AgentService 서버 구현
│ │ └── telemetry_service.go # TelemetryService 서버 구현
│ │
│ ├── security/
│ │ ├── spiffe.go # SPIFFE/SPIRE 클라이언트 래퍼
│ │ └── credentials.go # gRPC TransportCredentials 생성
│ │
│ └── observability/
│ ├── tracer.go # OTel TracerProvider 초기화
│ └── metrics.go # Prometheus 메트릭 등록
├── python/
│ ├── pyproject.toml
│ ├── orchestrator/
│ │ ├── __init__.py
│ │ ├── agent.py # LangGraph 오케스트레이터 에이전트
│ │ ├── a2a_client.py # A2A 클라이언트
│ │ ├── grpc_client.py # gRPC-over-QUIC 클라이언트 (aioquic 또는 subprocess)
│ │ └── tools/
│ │ ├── telemetry_tool.py # 텔레메트리 조회 도구
│ │ └── control_tool.py # 제어 명령 도구
│ └── tests/
│ └── test_agent.py
├── deployments/
│ ├── docker-compose.yml # 전체 실험 환경
│ ├── docker-compose.spire.yml # SPIRE 오버레이
│ ├── spire/
│ │ ├── server.conf
│ │ └── agent.conf
│ ├── mosquitto/
│ │ └── mosquitto.conf
│ └── otel/
│ └── otel-collector-config.yaml
├── configs/
│ ├── gateway.yaml # T2 Gateway 설정
│ ├── field-agent.yaml # T3 Field Agent 설정
│ └── cloud-server.yaml # T1 서버 설정
└── tests/
├── integration/
│ ├── gateway_test.go
│ └── e2e_test.go
└── perf/
└── benchmark_test.go
```
---
## 3. Proto 파일 설계
### 3.1 AgentService (`proto/agent/v1/agent.proto`)
```protobuf
syntax = "proto3";
package agent.v1;
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/agent/v1;agentv1";
import "google/protobuf/timestamp.proto";
import "google/protobuf/any.proto";
// 에이전트 태스크 요청
message AgentTask {
string task_id = 1; // UUID
string agent_id = 2; // 발신 에이전트 ID
string target_agent = 3; // 수신 에이전트 ID
string task_type = 4; // "infer", "control", "query", "ota"
bytes payload = 5; // Protobuf Any 직렬화
map<string, string> metadata = 6; // X-Tenant-ID, X-Device-Class 등
google.protobuf.Timestamp deadline = 7;
}
// 에이전트 태스크 응답
message AgentResult {
string task_id = 1;
string agent_id = 2;
Status status = 3;
bytes payload = 4;
string resume_token = 5; // 스트리밍 재개 토큰
map<string, string> metadata = 6;
enum Status {
STATUS_UNSPECIFIED = 0;
STATUS_OK = 1;
STATUS_PARTIAL = 2; // 스트림 중간 청크
STATUS_ERROR = 3;
}
}
// 스트리밍 청크 (Server/Client/Bidi Streaming 공용)
message StreamChunk {
string stream_id = 1;
uint64 sequence_num = 2;
bytes data = 3;
bool is_last = 4;
string resume_token = 5;
}
// 4대 RPC 모드를 모두 지원하는 AgentService
service AgentService {
// Unary: 단일 명령/조회 (OTA 명령, 상태 조회)
rpc Execute(AgentTask) returns (AgentResult);
// Server Streaming: T1→T2 모델 업데이트, 토큰 스트리밍
rpc Subscribe(AgentTask) returns (stream StreamChunk);
// Client Streaming: T3→T2 배치 센서 데이터 업로드
rpc Upload(stream StreamChunk) returns (AgentResult);
// Bidi Streaming: T2↔T2 엣지 합의, 로봇 실시간 협업
rpc Collaborate(stream AgentTask) returns (stream AgentResult);
}
```
### 3.2 TelemetryService (`proto/telemetry/v1/telemetry.proto`)
```protobuf
syntax = "proto3";
package telemetry.v1;
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1;telemetryv1";
import "google/protobuf/timestamp.proto";
// 단일 센서 데이터 포인트
message DataPoint {
string device_id = 1;
string metric = 2; // "temperature", "vibration", "voltage" 등
double value = 3;
string unit = 4;
google.protobuf.Timestamp ts = 5;
map<string, string> labels = 6; // site, line, zone 등 레이블
}
// 배치 텔레메트리 (다수 디바이스 팬인)
message TelemetryBatch {
string source_gateway = 1;
repeated DataPoint points = 2;
string resume_token = 3; // 마지막 처리 위치
}
// 텔레메트리 수신 확인
message TelemetryAck {
string resume_token = 1; // 서버가 발급한 다음 재개 위치
uint64 accepted = 2;
uint64 rejected = 3;
string error_message = 4;
}
// 텔레메트리 쿼리
message TelemetryQuery {
string device_id = 1;
string metric = 2;
google.protobuf.Timestamp from = 3;
google.protobuf.Timestamp to = 4;
int32 limit = 5;
}
service TelemetryService {
// Client Streaming: T3 다수 센서 배치 업로드
rpc PushBatch(stream DataPoint) returns (TelemetryAck);
// Unary: 텔레메트리 배치 전송
rpc Push(TelemetryBatch) returns (TelemetryAck);
// Server Streaming: 실시간 텔레메트리 구독
rpc Stream(TelemetryQuery) returns (stream DataPoint);
}
```
### 3.3 ControlService (`proto/control/v1/control.proto`)
```protobuf
syntax = "proto3";
package control.v1;
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/control/v1;controlv1";
message OTARequest {
string device_id = 1;
string firmware_url = 2;
string expected_sha256 = 3;
string version = 4;
}
message OTAStatus {
string device_id = 1;
State state = 2;
int32 progress = 3; // 0-100
string error = 4;
enum State {
STATE_UNSPECIFIED = 0;
STATE_DOWNLOADING = 1;
STATE_VERIFYING = 2;
STATE_APPLYING = 3;
STATE_COMPLETE = 4;
STATE_FAILED = 5;
}
}
message ConfigUpdate {
string device_id = 1;
bytes config = 2; // JSON or Protobuf Any
string version = 3;
}
message ConfigAck {
bool applied = 1;
string version = 2;
string error = 3;
}
service ControlService {
// Server Streaming: OTA 다운로드 진행 상황 푸시
rpc OTAUpdate(OTARequest) returns (stream OTAStatus);
// Unary: 설정 업데이트
rpc UpdateConfig(ConfigUpdate) returns (ConfigAck);
}
```
### 3.4 Buf 설정 (`proto/buf.yaml`)
```yaml
version: v2
modules:
- path: .
lint:
use:
- STANDARD
breaking:
use:
- FILE
deps:
- buf.build/googleapis/googleapis
```
---
## 4. T2 Edge Gateway 구현 설계
### 4.1 quic-go + gRPC 통합
T2 게이트웨이는 T1 방향으로 gRPC over QUIC 서버를 운영하고, T3 방향으로 MQTT 클라이언트를 유지한다.
#### `internal/gateway/server.go`
```go
package gateway
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"github.com/quic-go/quic-go"
"github.com/quic-go/quic-go/http3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
"github.com/your-org/edge-aiot-mas/internal/gateway/interceptors"
"github.com/your-org/edge-aiot-mas/internal/security"
)
// GatewayConfig는 T2 게이트웨이 설정을 담는다.
type GatewayConfig struct {
GRPCAddr string // T1 방향 gRPC over QUIC 수신 주소 (예: ":9443")
A2AAddr string // A2A HTTP/3 수신 주소 (예: ":8443")
MQTTBroker string // T3 방향 MQTT 브로커 주소
SPIFFESocket string // SPIRE Agent 소켓 경로
}
// Gateway는 T2 엣지 게이트웨이의 핵심 컴포넌트다.
type Gateway struct {
cfg GatewayConfig
grpcServer *grpc.Server
http3Server *http3.Server
mqttClient *MQTTClient
translator *Translator
tokenMgr *ResumeTokenManager
}
// New는 Gateway를 초기화하고 반환한다.
func New(ctx context.Context, cfg GatewayConfig) (*Gateway, error) {
// 1. SPIFFE/SPIRE에서 TLS 자격증명 획득
creds, err := security.NewSPIFFECredentials(ctx, cfg.SPIFFESocket)
if err != nil {
return nil, fmt.Errorf("SPIFFE credentials: %w", err)
}
// 2. Resume Token 관리자 초기화
tokenMgr := NewResumeTokenManager()
// 3. gRPC 서버 인터셉터 체인 구성
srv := grpc.NewServer(
grpc.Creds(creds),
grpc.ChainUnaryInterceptor(
interceptors.DeadlineEnforcer(defaultDeadlines),
interceptors.ResumeTokenUnary(tokenMgr),
interceptors.OTelUnary(),
),
grpc.ChainStreamInterceptor(
interceptors.ResumeTokenStream(tokenMgr),
interceptors.OTelStream(),
),
)
// 4. 서비스 등록
agentSvc := NewAgentServiceServer(tokenMgr)
telemetrySvc := NewTelemetryServiceServer(tokenMgr)
agentv1.RegisterAgentServiceServer(srv, agentSvc)
telemetryv1.RegisterTelemetryServiceServer(srv, telemetrySvc)
// 5. MQTT 클라이언트 초기화
mqttClient, err := NewMQTTClient(cfg.MQTTBroker)
if err != nil {
return nil, fmt.Errorf("MQTT client: %w", err)
}
return &Gateway{
cfg: cfg,
grpcServer: srv,
mqttClient: mqttClient,
translator: NewTranslator(agentSvc, telemetrySvc),
tokenMgr: tokenMgr,
}, nil
}
// ServeGRPCOverQUIC는 quic-go 리스너 위에서 gRPC 서버를 구동한다.
func (g *Gateway) ServeGRPCOverQUIC(ctx context.Context) error {
tlsCfg := g.grpcServer.GetServiceInfo() // SPIFFE creds에서 TLS config 추출
_ = tlsCfg
// quic-go 리스너 생성
quicListener, err := quic.ListenAddr(g.cfg.GRPCAddr, &tls.Config{
NextProtos: []string{"h2", "h3"}, // gRPC는 h2, HTTP/3는 h3
MinVersion: tls.VersionTLS13,
// GetCertificate는 SPIFFE SVID에서 동적으로 제공
}, &quic.Config{
EnableDatagrams: true,
MaxIdleTimeout: 30 * 60 * 1e9, // 30분
})
if err != nil {
return fmt.Errorf("quic listen: %w", err)
}
defer quicListener.Close()
// quic.Listener를 net.Listener 인터페이스로 래핑
netListener := &quicNetListener{inner: quicListener}
// gRPC 서버를 quic-go 리스너 위에서 구동
go func() {
<-ctx.Done()
g.grpcServer.GracefulStop()
}()
return g.grpcServer.Serve(netListener)
}
// quicNetListener는 quic.Listener를 net.Listener로 래핑한다.
type quicNetListener struct {
inner *quic.Listener
}
func (l *quicNetListener) Accept() (net.Conn, error) {
conn, err := l.inner.Accept(context.Background())
if err != nil {
return nil, err
}
return &quicConn{conn: conn}, nil
}
func (l *quicNetListener) Close() error { return l.inner.Close() }
func (l *quicNetListener) Addr() net.Addr { return l.inner.Addr() }
// quicConn은 quic.Connection을 net.Conn으로 래핑한다.
// gRPC-Go는 net.Conn 인터페이스를 사용하므로 첫 번째 스트림을 단일 연결로 노출.
type quicConn struct {
conn quic.Connection
stream quic.Stream
}
func (c *quicConn) Read(b []byte) (int, error) {
if c.stream == nil {
var err error
c.stream, err = c.conn.AcceptStream(context.Background())
if err != nil {
return 0, err
}
}
return c.stream.Read(b)
}
func (c *quicConn) Write(b []byte) (int, error) {
if c.stream == nil {
return 0, fmt.Errorf("stream not initialized")
}
return c.stream.Write(b)
}
func (c *quicConn) Close() error { return c.conn.CloseWithError(0, "closed") }
func (c *quicConn) LocalAddr() net.Addr { return c.conn.LocalAddr() }
func (c *quicConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() }
func (c *quicConn) SetDeadline(t interface{}) error { return nil }
func (c *quicConn) SetReadDeadline(t interface{}) error { return nil }
func (c *quicConn) SetWriteDeadline(t interface{}) error { return nil }
// defaultDeadlines는 디바이스 클래스별 기본 deadline 정책이다.
var defaultDeadlines = map[string]int64{
"agv": 50, // ms
"robot": 100,
"sensor": 5000,
"default": 3000,
}
```
> **참고**: 실제 quic-go와 gRPC-Go 통합 시 `grpc.WithContextDialer`를 사용하여 클라이언트 측에서 QUIC 다이얼러를 주입하는 방식이 더 실용적이다. 위 코드는 서버 측 리스너 래핑 패턴을 보여준다. 프로덕션에서는 `quic-go/http3` 패키지의 `ServeListener` 방식 또는 별도 gRPC-QUIC 어댑터 라이브러리(`github.com/open-telemetry/opentelemetry-collector-contrib` 참조)를 활용한다.
### 4.2 MQTT→gRPC 변환 엔진
#### MQTT Topic 네이밍 규칙
```
telemetry/{site}/{device_id}/{metric} → TelemetryService.Push
control/{site}/{device_id}/ota → ControlService.OTAUpdate
agent/{site}/{device_id}/task → AgentService.Execute
```
#### `internal/gateway/translator.go`
```go
package gateway
import (
"context"
"encoding/json"
"fmt"
"strings"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"google.golang.org/protobuf/proto"
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// TopicRoute는 MQTT topic 패턴과 핸들러 함수의 매핑이다.
type TopicRoute struct {
Pattern string
Handler func(ctx context.Context, topic string, payload []byte) error
}
// Translator는 MQTT 메시지를 gRPC 호출로 변환한다.
type Translator struct {
agentSvc agentv1.AgentServiceServer
telemetrySvc telemetryv1.TelemetryServiceServer
routes []TopicRoute
propagator propagation.TextMapPropagator
}
// NewTranslator는 변환 엔진을 초기화한다.
func NewTranslator(
agentSvc agentv1.AgentServiceServer,
telemetrySvc telemetryv1.TelemetryServiceServer,
) *Translator {
t := &Translator{
agentSvc: agentSvc,
telemetrySvc: telemetrySvc,
propagator: otel.GetTextMapPropagator(),
}
t.registerRoutes()
return t
}
// registerRoutes는 MQTT topic 패턴별 핸들러를 등록한다.
func (t *Translator) registerRoutes() {
t.routes = []TopicRoute{
{
Pattern: "telemetry/+/+/+",
Handler: t.handleTelemetry,
},
{
Pattern: "agent/+/+/task",
Handler: t.handleAgentTask,
},
}
}
// HandleMessage는 MQTT 메시지를 수신하고 라우팅한다.
func (t *Translator) HandleMessage(_ mqtt.Client, msg mqtt.Message) {
topic := msg.Topic()
payload := msg.Payload()
// MQTT UserProperties에서 OTel trace context 추출
ctx := context.Background()
// MQTT 5.0 UserProperties를 통한 traceparent 전파는
// paho.mqtt.golang v2(MQTT 5.0 지원)에서 msg.Properties()로 접근
for _, route := range t.routes {
if matchTopic(route.Pattern, topic) {
if err := route.Handler(ctx, topic, payload); err != nil {
// 에러 로깅 (OTel span에 기록)
fmt.Printf("translation error [%s]: %v\n", topic, err)
}
return
}
}
}
// handleTelemetry는 telemetry/{site}/{device}/{metric} 토픽을 처리한다.
func (t *Translator) handleTelemetry(ctx context.Context, topic string, payload []byte) error {
parts := strings.Split(topic, "/")
if len(parts) != 4 {
return fmt.Errorf("invalid telemetry topic: %s", topic)
}
site, deviceID, metric := parts[1], parts[2], parts[3]
// JSON 페이로드 파싱 (T3 디바이스는 JSON으로 발행)
var raw struct {
Value float64 `json:"v"`
Unit string `json:"u"`
TS int64 `json:"ts"` // Unix milliseconds
}
if err := json.Unmarshal(payload, &raw); err != nil {
return fmt.Errorf("json unmarshal: %w", err)
}
// Protobuf DataPoint 생성
dp := &telemetryv1.DataPoint{
DeviceId: deviceID,
Metric: metric,
Value: raw.Value,
Unit: raw.Unit,
Labels: map[string]string{"site": site},
}
if raw.TS > 0 {
ts := time.UnixMilli(raw.TS)
_ = ts // timestamppb 변환 후 dp.Ts에 할당
}
// TelemetryService.Push를 내부 호출
batch := &telemetryv1.TelemetryBatch{
SourceGateway: "t2-gateway",
Points: []*telemetryv1.DataPoint{dp},
}
_, err := t.telemetrySvc.(*TelemetryServiceServer).pushBatch(ctx, batch)
return err
}
// handleAgentTask는 agent/{site}/{device}/task 토픽을 처리한다.
func (t *Translator) handleAgentTask(ctx context.Context, topic string, payload []byte) error {
parts := strings.Split(topic, "/")
if len(parts) != 4 {
return fmt.Errorf("invalid agent topic: %s", topic)
}
deviceID := parts[2]
var task agentv1.AgentTask
if err := proto.Unmarshal(payload, &task); err != nil {
// JSON fallback
if err2 := json.Unmarshal(payload, &task); err2 != nil {
return fmt.Errorf("unmarshal agent task: %w", err)
}
}
task.AgentId = deviceID
_, err := t.agentSvc.Execute(ctx, &task)
return err
}
// matchTopic는 MQTT 와일드카드(+, #) 패턴 매칭을 수행한다.
func matchTopic(pattern, topic string) bool {
pp := strings.Split(pattern, "/")
tp := strings.Split(topic, "/")
if len(pp) != len(tp) {
if len(pp) > 0 && pp[len(pp)-1] == "#" {
return strings.HasPrefix(topic, strings.Join(pp[:len(pp)-1], "/"))
}
return false
}
for i, p := range pp {
if p != "+" && p != tp[i] {
return false
}
}
return true
}
```
### 4.3 Resume Token 구현
Resume Token은 스트리밍 RPC가 중단되었을 때 재연결 후 중단 위치부터 이어받기 위한 서버 발급 토큰이다.
#### `internal/gateway/resume_token.go`
```go
package gateway
import (
"crypto/hmac"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"fmt"
"sync"
"time"
)
// ResumeState는 스트리밍 재개 상태를 표현한다.
type ResumeState struct {
StreamID string `json:"sid"`
SequenceNum uint64 `json:"seq"`
DeviceID string `json:"did"`
Timestamp int64 `json:"ts"`
TraceParent string `json:"tp,omitempty"` // W3C traceparent
Extra map[string]string `json:"extra,omitempty"`
}
// ResumeTokenManager는 Resume Token을 발급하고 검증한다.
type ResumeTokenManager struct {
secret []byte
store sync.Map // streamID → *ResumeState
}
// NewResumeTokenManager는 새 관리자를 초기화한다.
func NewResumeTokenManager() *ResumeTokenManager {
// 프로덕션에서는 SPIFFE SVID 또는 환경 변수에서 시크릿 로드
return &ResumeTokenManager{
secret: []byte("change-me-in-production"),
}
}
// Issue는 현재 스트리밍 상태를 HMAC 서명된 토큰으로 발급한다.
func (m *ResumeTokenManager) Issue(state ResumeState) (string, error) {
state.Timestamp = time.Now().Unix()
data, err := json.Marshal(state)
if err != nil {
return "", fmt.Errorf("marshal state: %w", err)
}
// HMAC-SHA256 서명
mac := hmac.New(sha256.New, m.secret)
mac.Write(data)
sig := mac.Sum(nil)
// base64url(data) + "." + base64url(sig)
token := base64.RawURLEncoding.EncodeToString(data) +
"." +
base64.RawURLEncoding.EncodeToString(sig)
// 상태 저장 (메모리 캐시)
m.store.Store(state.StreamID, &state)
return token, nil
}
// Verify는 토큰을 검증하고 ResumeState를 반환한다.
func (m *ResumeTokenManager) Verify(token string) (*ResumeState, error) {
parts := splitOnLast(token, ".")
if len(parts) != 2 {
return nil, fmt.Errorf("invalid token format")
}
data, err := base64.RawURLEncoding.DecodeString(parts[0])
if err != nil {
return nil, fmt.Errorf("decode data: %w", err)
}
sig, err := base64.RawURLEncoding.DecodeString(parts[1])
if err != nil {
return nil, fmt.Errorf("decode sig: %w", err)
}
// HMAC 검증
mac := hmac.New(sha256.New, m.secret)
mac.Write(data)
expected := mac.Sum(nil)
if !hmac.Equal(sig, expected) {
return nil, fmt.Errorf("invalid token signature")
}
var state ResumeState
if err := json.Unmarshal(data, &state); err != nil {
return nil, fmt.Errorf("unmarshal state: %w", err)
}
// 24시간 만료 체크
if time.Now().Unix()-state.Timestamp > 86400 {
return nil, fmt.Errorf("token expired")
}
return &state, nil
}
func splitOnLast(s, sep string) []string {
idx := len(s) - 1
for idx >= 0 && string(s[idx]) != sep {
idx--
}
if idx < 0 {
return []string{s}
}
return []string{s[:idx], s[idx+1:]}
}
```
#### Resume Token gRPC Interceptor (`internal/gateway/interceptors/resume.go`)
```go
package interceptors
import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/your-org/edge-aiot-mas/internal/gateway"
)
const resumeTokenMetaKey = "x-resume-token"
// ResumeTokenUnary는 Unary RPC에서 Resume Token을 처리하는 인터셉터다.
func ResumeTokenUnary(mgr *gateway.ResumeTokenManager) grpc.UnaryServerInterceptor {
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
ctx = injectResumeState(ctx, mgr)
resp, err := handler(ctx, req)
return resp, err
}
}
// ResumeTokenStream는 Streaming RPC에서 Resume Token을 처리하는 인터셉터다.
func ResumeTokenStream(mgr *gateway.ResumeTokenManager) grpc.StreamServerInterceptor {
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
// 클라이언트가 재연결 시 보내는 Resume Token 검증
ctx := ss.Context()
if md, ok := metadata.FromIncomingContext(ctx); ok {
if tokens := md.Get(resumeTokenMetaKey); len(tokens) > 0 {
state, err := mgr.Verify(tokens[0])
if err == nil {
// 검증된 상태를 context에 주입
ctx = context.WithValue(ctx, resumeStateKey{}, state)
ss = &resumeServerStream{ServerStream: ss, ctx: ctx}
}
}
}
return handler(srv, ss)
}
}
// injectResumeState는 incoming metadata에서 Resume Token을 추출하고 context에 주입한다.
func injectResumeState(ctx context.Context, mgr *gateway.ResumeTokenManager) context.Context {
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return ctx
}
tokens := md.Get(resumeTokenMetaKey)
if len(tokens) == 0 {
return ctx
}
state, err := mgr.Verify(tokens[0])
if err != nil {
return ctx
}
return context.WithValue(ctx, resumeStateKey{}, state)
}
// GetResumeState는 context에서 ResumeState를 추출한다.
func GetResumeState(ctx context.Context) (*gateway.ResumeState, bool) {
state, ok := ctx.Value(resumeStateKey{}).(*gateway.ResumeState)
return state, ok
}
type resumeStateKey struct{}
// resumeServerStream은 context를 오버라이드한 ServerStream 래퍼다.
type resumeServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *resumeServerStream) Context() context.Context { return s.ctx }
```
### 4.4 A2A HTTP/3 엔드포인트
#### `internal/gateway/a2a_handler.go`
```go
package gateway
import (
"encoding/json"
"net/http"
"github.com/quic-go/quic-go/http3"
)
// A2ATask는 Google A2A 프로토콜의 태스크 요청 구조체다.
type A2ATask struct {
ID string `json:"id"`
Message A2AMessage `json:"message"`
Metadata map[string]any `json:"metadata,omitempty"`
}
// A2AMessage는 A2A 메시지 구조체다.
type A2AMessage struct {
Role string `json:"role"`
Parts []A2APart `json:"parts"`
}
// A2APart는 A2A 메시지의 단일 파트(텍스트, 데이터 등)다.
type A2APart struct {
Type string `json:"type"`
Text string `json:"text,omitempty"`
MimeType string `json:"mimeType,omitempty"`
Data []byte `json:"data,omitempty"`
}
// ServeA2AHTTP3는 A2A HTTP/3 서버를 구동한다.
func (g *Gateway) ServeA2AHTTP3() error {
mux := http.NewServeMux()
// A2A 에이전트 카드 (에이전트 능력 공개)
mux.HandleFunc("/.well-known/agent.json", g.handleAgentCard)
// A2A 태스크 수신 엔드포인트
mux.HandleFunc("/tasks/send", g.handleTaskSend)
mux.HandleFunc("/tasks/sendSubscribe", g.handleTaskSendSubscribe)
mux.HandleFunc("/tasks/get", g.handleTaskGet)
server := &http3.Server{
Addr: g.cfg.A2AAddr,
Handler: mux,
}
return server.ListenAndServeTLS("", "") // TLS config는 SPIFFE에서 주입
}
// handleAgentCard는 에이전트의 능력을 JSON으로 반환한다.
func (g *Gateway) handleAgentCard(w http.ResponseWriter, r *http.Request) {
card := map[string]any{
"name": "T2-Edge-Gateway",
"description": "엣지 AIoT T2 게이트웨이 에이전트",
"url": "https://" + g.cfg.A2AAddr,
"version": "1.0.0",
"capabilities": map[string]any{
"streaming": true,
"pushNotifications": false,
},
"skills": []map[string]any{
{
"id": "telemetry_query",
"name": "텔레메트리 조회",
"description": "T3 디바이스 텔레메트리 데이터를 조회한다",
},
{
"id": "device_control",
"name": "디바이스 제어",
"description": "T3 디바이스에 제어 명령을 전달한다",
},
},
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(card)
}
// handleTaskSend는 A2A 태스크를 수신하고 처리한다.
func (g *Gateway) handleTaskSend(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
return
}
var task A2ATask
if err := json.NewDecoder(r.Body).Decode(&task); err != nil {
http.Error(w, "invalid request body", http.StatusBadRequest)
return
}
// 태스크를 gRPC AgentService.Execute로 위임
ctx := r.Context()
result, err := g.processA2ATask(ctx, &task)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}
// handleTaskSendSubscribe는 SSE 스트리밍으로 태스크 상태를 푸시한다.
func (g *Gateway) handleTaskSendSubscribe(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
// SSE 스트리밍 구현 (생략)
flusher, ok := w.(http.Flusher)
if !ok {
http.Error(w, "streaming not supported", http.StatusInternalServerError)
return
}
_ = flusher
// TODO: gRPC Bidi 스트림 → SSE 변환
}
func (g *Gateway) processA2ATask(ctx interface{}, task *A2ATask) (map[string]any, error) {
// 구현: task.Message.Parts에서 요청 추출 → gRPC 호출 → 결과 반환
return map[string]any{
"id": task.ID,
"status": map[string]string{"state": "completed"},
}, nil
}
```
---
## 5. T1 Orchestrator 구현 설계
### 5.1 Python LangGraph 에이전트 + A2A 클라이언트
#### `python/orchestrator/agent.py`
```python
from __future__ import annotations
import asyncio
import json
import os
from typing import Annotated, TypedDict
import httpx
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import MemorySaver
from langgraph.graph import END, StateGraph
from langgraph.graph.message import add_messages
from .a2a_client import A2AClient
from .tools.telemetry_tool import TelemetryQueryTool
from .tools.control_tool import DeviceControlTool
class OrchestratorState(TypedDict):
"""오케스트레이터 에이전트 상태."""
messages: Annotated[list, add_messages]
site_id: str
active_tasks: dict[str, str] # task_id → status
# 에이전트가 사용할 도구 목록
tools = [TelemetryQueryTool(), DeviceControlTool()]
# LLM 초기화 (모델은 환경 변수로 교체 가능)
llm = ChatOpenAI(
model=os.getenv("LLM_MODEL", "gpt-4o"),
temperature=0,
).bind_tools(tools)
def should_continue(state: OrchestratorState) -> str:
"""도구 호출 여부에 따라 다음 노드를 결정한다."""
last = state["messages"][-1]
if isinstance(last, AIMessage) and last.tool_calls:
return "tools"
return END
async def call_model(state: OrchestratorState) -> OrchestratorState:
"""LLM을 호출하고 응답을 상태에 추가한다."""
system = SystemMessage(content=(
"당신은 엣지 AIoT 오케스트레이터입니다. "
"T2 게이트웨이 에이전트를 통해 현장 디바이스를 조율합니다. "
f"현재 사이트: {state['site_id']}"
))
messages = [system] + state["messages"]
response = await llm.ainvoke(messages)
return {"messages": [response]}
async def call_tools(state: OrchestratorState) -> OrchestratorState:
"""도구를 실행하고 결과를 상태에 추가한다."""
from langchain_core.messages import ToolMessage
last = state["messages"][-1]
results = []
for tool_call in last.tool_calls:
# 도구 이름으로 실제 도구 인스턴스 검색
tool = next((t for t in tools if t.name == tool_call["name"]), None)
if tool is None:
content = f"Unknown tool: {tool_call['name']}"
else:
try:
content = await tool.arun(tool_call["args"])
except Exception as e:
content = f"Tool error: {e}"
results.append(ToolMessage(
content=str(content),
tool_call_id=tool_call["id"],
))
return {"messages": results}
def build_graph() -> StateGraph:
"""LangGraph 오케스트레이터 그래프를 빌드한다."""
builder = StateGraph(OrchestratorState)
builder.add_node("agent", call_model)
builder.add_node("tools", call_tools)
builder.set_entry_point("agent")
builder.add_conditional_edges("agent", should_continue)
builder.add_edge("tools", "agent")
return builder.compile(checkpointer=MemorySaver())
# 그래프 싱글턴
graph = build_graph()
async def run_orchestrator(user_input: str, site_id: str, thread_id: str) -> str:
"""오케스트레이터를 실행하고 최종 응답을 반환한다."""
config = {"configurable": {"thread_id": thread_id}}
initial_state: OrchestratorState = {
"messages": [HumanMessage(content=user_input)],
"site_id": site_id,
"active_tasks": {},
}
final_state = await graph.ainvoke(initial_state, config=config)
last_message = final_state["messages"][-1]
return last_message.content if hasattr(last_message, "content") else str(last_message)
```
#### `python/orchestrator/a2a_client.py`
```python
from __future__ import annotations
import asyncio
import json
import uuid
from typing import AsyncIterator
import httpx
class A2AClient:
"""T2 Edge Gateway의 A2A HTTP/3 엔드포인트를 호출하는 클라이언트."""
def __init__(self, base_url: str, timeout: float = 30.0):
self.base_url = base_url.rstrip("/")
# HTTP/3(QUIC) 지원을 위해 httpx + h3 백엔드 사용
# 실험 환경에서는 HTTP/2로 폴백 가능
self._client = httpx.AsyncClient(
http2=True,
timeout=timeout,
verify=False, # 개발 환경 — 프로덕션에서는 SPIFFE SVID CA 적용
)
async def send_task(self, message: str, skill_id: str | None = None) -> dict:
"""태스크를 T2 게이트웨이로 전송하고 결과를 반환한다."""
task = {
"id": str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}],
},
"metadata": {"skill_id": skill_id} if skill_id else {},
}
resp = await self._client.post(
f"{self.base_url}/tasks/send",
json=task,
headers={"Content-Type": "application/json"},
)
resp.raise_for_status()
return resp.json()
async def send_task_streaming(
self, message: str, skill_id: str | None = None
) -> AsyncIterator[dict]:
"""SSE 스트리밍으로 태스크 진행 상태를 수신한다."""
task = {
"id": str(uuid.uuid4()),
"message": {
"role": "user",
"parts": [{"type": "text", "text": message}],
},
}
async with self._client.stream(
"POST",
f"{self.base_url}/tasks/sendSubscribe",
json=task,
) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if line.startswith("data: "):
data = json.loads(line[6:])
yield data
async def get_agent_card(self) -> dict:
"""T2 게이트웨이의 에이전트 카드를 조회한다."""
resp = await self._client.get(f"{self.base_url}/.well-known/agent.json")
resp.raise_for_status()
return resp.json()
async def close(self):
await self._client.aclose()
```
### 5.2 Go gRPC-over-QUIC 클라이언트 코드
#### `internal/cloud/grpc_quic_client.go`
```go
package cloud
import (
"context"
"crypto/tls"
"fmt"
"net"
"github.com/quic-go/quic-go"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
)
// QUICDialer는 quic-go를 사용하여 gRPC 연결을 수립하는 다이얼러다.
type QUICDialer struct {
tlsConfig *tls.Config
}
// NewQUICDialer는 TLS 설정으로 QUIC 다이얼러를 초기화한다.
func NewQUICDialer(tlsConfig *tls.Config) *QUICDialer {
return &QUICDialer{tlsConfig: tlsConfig}
}
// DialContext는 gRPC의 WithContextDialer에 전달될 다이얼 함수다.
func (d *QUICDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
// QUIC connection 수립
conn, err := quic.DialAddr(ctx, addr, d.tlsConfig, &quic.Config{
EnableDatagrams: true,
MaxIdleTimeout: 30 * 60 * 1e9,
})
if err != nil {
return nil, fmt.Errorf("quic dial %s: %w", addr, err)
}
// 첫 번째 스트림 개방 (gRPC framing에 사용)
stream, err := conn.OpenStreamSync(ctx)
if err != nil {
conn.CloseWithError(0, "stream open failed")
return nil, fmt.Errorf("open quic stream: %w", err)
}
return &quicStreamConn{
conn: conn,
stream: stream,
}, nil
}
// quicStreamConn은 QUIC 스트림을 net.Conn으로 노출한다.
type quicStreamConn struct {
conn quic.Connection
stream quic.Stream
}
func (c *quicStreamConn) Read(b []byte) (int, error) { return c.stream.Read(b) }
func (c *quicStreamConn) Write(b []byte) (int, error) { return c.stream.Write(b) }
func (c *quicStreamConn) Close() error {
c.stream.Close()
return c.conn.CloseWithError(0, "client closed")
}
func (c *quicStreamConn) LocalAddr() net.Addr { return c.conn.LocalAddr() }
func (c *quicStreamConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() }
func (c *quicStreamConn) SetDeadline(t interface{}) error { return nil }
func (c *quicStreamConn) SetReadDeadline(t interface{}) error { return nil }
func (c *quicStreamConn) SetWriteDeadline(t interface{}) error { return nil }
// T1Client는 T1 클라우드에서 T2 게이트웨이로 연결하는 gRPC 클라이언트다.
type T1Client struct {
conn *grpc.ClientConn
agentClient agentv1.AgentServiceClient
telemetryClient telemetryv1.TelemetryServiceClient
}
// NewT1Client는 T2 게이트웨이에 gRPC over QUIC 연결을 수립한다.
func NewT1Client(ctx context.Context, gatewayAddr string, tlsConfig *tls.Config) (*T1Client, error) {
dialer := NewQUICDialer(tlsConfig)
conn, err := grpc.DialContext(
ctx,
gatewayAddr,
grpc.WithContextDialer(dialer.DialContext),
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
// Retry policy: 무선 손실 시 지수 백오프
grpc.WithDefaultServiceConfig(`{
"methodConfig": [{
"name": [{}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.5s",
"maxBackoff": "30s",
"backoffMultiplier": 2,
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
}
}]
}`),
)
if err != nil {
return nil, fmt.Errorf("grpc dial: %w", err)
}
return &T1Client{
conn: conn,
agentClient: agentv1.NewAgentServiceClient(conn),
telemetryClient: telemetryv1.NewTelemetryServiceClient(conn),
}, nil
}
// ExecuteTask는 T2 게이트웨이에 AgentTask를 전송한다.
func (c *T1Client) ExecuteTask(ctx context.Context, task *agentv1.AgentTask) (*agentv1.AgentResult, error) {
return c.agentClient.Execute(ctx, task)
}
// StreamTelemetry는 T2 게이트웨이에서 텔레메트리 스트림을 구독한다.
func (c *T1Client) StreamTelemetry(
ctx context.Context,
query *telemetryv1.TelemetryQuery,
handler func(*telemetryv1.DataPoint) error,
) error {
stream, err := c.telemetryClient.Stream(ctx, query)
if err != nil {
return fmt.Errorf("stream start: %w", err)
}
for {
dp, err := stream.Recv()
if err != nil {
return fmt.Errorf("stream recv: %w", err)
}
if err := handler(dp); err != nil {
return err
}
}
}
// Close는 gRPC 연결을 종료한다.
func (c *T1Client) Close() error { return c.conn.Close() }
```
---
## 6. T3 Field Agent 구현 설계
### 6.1 MQTT 텔레메트리 발행
#### `internal/field/mqtt_publisher.go`
```go
package field
import (
"context"
"encoding/json"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
)
// TelemetryPayload는 T3 디바이스가 발행하는 텔레메트리 JSON 구조체다.
type TelemetryPayload struct {
Value float64 `json:"v"`
Unit string `json:"u"`
TS int64 `json:"ts"` // Unix milliseconds
Props map[string]string `json:"p,omitempty"` // MQTT 5.0 UserProperties 대체
}
// FieldConfig는 T3 Field Agent 설정을 담는다.
type FieldConfig struct {
DeviceID string
SiteID string
MQTTBroker string
MQTTPort int
OfflineDB string // BoltDB 파일 경로
}
// FieldAgent는 T3 현장 에이전트다.
type FieldAgent struct {
cfg FieldConfig
mqttClient mqtt.Client
offlineQueue *OfflineQueue
tracer interface{} // OTel tracer
}
// NewFieldAgent는 Field Agent를 초기화한다.
func NewFieldAgent(cfg FieldConfig) (*FieldAgent, error) {
// MQTT 클라이언트 옵션 설정
opts := mqtt.NewClientOptions().
AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTBroker, cfg.MQTTPort)).
SetClientID(fmt.Sprintf("field-%s", cfg.DeviceID)).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(5 * time.Second).
SetMaxReconnectInterval(60 * time.Second).
SetCleanSession(false). // 오프라인 중 QoS 1 메시지 보존
SetResumeSubs(true)
// 연결 로스트 핸들러
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
fmt.Printf("[%s] MQTT connection lost: %v\n", cfg.DeviceID, err)
})
// 재연결 핸들러
opts.SetOnConnectHandler(func(client mqtt.Client) {
fmt.Printf("[%s] MQTT reconnected\n", cfg.DeviceID)
})
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
return nil, fmt.Errorf("mqtt connect: %w", token.Error())
}
offlineQueue, err := NewOfflineQueue(cfg.OfflineDB)
if err != nil {
return nil, fmt.Errorf("offline queue: %w", err)
}
return &FieldAgent{
cfg: cfg,
mqttClient: client,
offlineQueue: offlineQueue,
}, nil
}
// PublishTelemetry는 텔레메트리 데이터를 MQTT로 발행한다.
// 오프라인 상태라면 로컬 큐에 저장한다.
func (a *FieldAgent) PublishTelemetry(ctx context.Context, metric string, value float64, unit string) error {
topic := fmt.Sprintf("telemetry/%s/%s/%s", a.cfg.SiteID, a.cfg.DeviceID, metric)
// OTel trace context를 페이로드 Props에 포함
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
payload := TelemetryPayload{
Value: value,
Unit: unit,
TS: time.Now().UnixMilli(),
Props: carrier, // traceparent 등 포함
}
data, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal payload: %w", err)
}
// MQTT 연결 상태 확인
if !a.mqttClient.IsConnected() {
// 오프라인 큐에 저장
return a.offlineQueue.Enqueue(topic, data)
}
// QoS 1로 발행 (전달 보장)
token := a.mqttClient.Publish(topic, 1, false, data)
token.Wait()
if err := token.Error(); err != nil {
// 발행 실패 시 오프라인 큐로 폴백
if qErr := a.offlineQueue.Enqueue(topic, data); qErr != nil {
return fmt.Errorf("publish failed and queue failed: %v, %v", err, qErr)
}
return nil // 큐에 저장 성공
}
return nil
}
// FlushOfflineQueue는 재연결 후 오프라인 큐를 드레인한다.
func (a *FieldAgent) FlushOfflineQueue(ctx context.Context) error {
if !a.mqttClient.IsConnected() {
return fmt.Errorf("not connected")
}
return a.offlineQueue.Drain(func(topic string, data []byte) error {
token := a.mqttClient.Publish(topic, 1, false, data)
token.Wait()
return token.Error()
})
}
```
### 6.2 오프라인 큐 (BoltDB)
#### `internal/field/offline_queue.go`
```go
package field
import (
"encoding/binary"
"fmt"
"time"
bolt "go.etcd.io/bbolt"
)
var queueBucket = []byte("offline_queue")
// QueueEntry는 오프라인 큐의 단일 항목이다.
type QueueEntry struct {
Topic string
Payload []byte
EnqueuedAt time.Time
}
// OfflineQueue는 BoltDB 기반 오프라인 메시지 큐다.
type OfflineQueue struct {
db *bolt.DB
}
// NewOfflineQueue는 BoltDB 파일을 열고 큐 버킷을 초기화한다.
func NewOfflineQueue(path string) (*OfflineQueue, error) {
db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
if err != nil {
return nil, fmt.Errorf("bolt open %s: %w", path, err)
}
if err := db.Update(func(tx *bolt.Tx) error {
_, err := tx.CreateBucketIfNotExists(queueBucket)
return err
}); err != nil {
return nil, fmt.Errorf("create bucket: %w", err)
}
return &OfflineQueue{db: db}, nil
}
// Enqueue는 메시지를 큐에 추가한다.
func (q *OfflineQueue) Enqueue(topic string, payload []byte) error {
return q.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(queueBucket)
// 자동 증가 시퀀스 키 생성
seq, err := b.NextSequence()
if err != nil {
return fmt.Errorf("next sequence: %w", err)
}
key := make([]byte, 8)
binary.BigEndian.PutUint64(key, seq)
// topic + null + payload 직렬화
value := append([]byte(topic+"\x00"), payload...)
return b.Put(key, value)
})
}
// Drain은 큐의 모든 항목을 순서대로 처리하고 성공한 항목을 삭제한다.
func (q *OfflineQueue) Drain(handler func(topic string, payload []byte) error) error {
return q.db.Update(func(tx *bolt.Tx) error {
b := tx.Bucket(queueBucket)
var toDelete [][]byte
if err := b.ForEach(func(k, v []byte) error {
// topic 분리
for i, c := range v {
if c == 0x00 {
topic := string(v[:i])
payload := v[i+1:]
if err := handler(topic, payload); err != nil {
return err // 실패 시 드레인 중단
}
toDelete = append(toDelete, append([]byte{}, k...))
return nil
}
}
return fmt.Errorf("invalid queue entry")
}); err != nil {
return err
}
// 성공한 항목 삭제
for _, key := range toDelete {
if err := b.Delete(key); err != nil {
return err
}
}
return nil
})
}
// Size는 큐에 저장된 항목 수를 반환한다.
func (q *OfflineQueue) Size() (int, error) {
var count int
err := q.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket(queueBucket)
count = b.Stats().KeyN
return nil
})
return count, err
}
// Close는 BoltDB를 닫는다.
func (q *OfflineQueue) Close() error { return q.db.Close() }
```
---
## 7. SPIFFE/SPIRE 통합
### 7.1 아키텍처 개요
```
SPIRE Server (T1)
├── Registration Entry: T1 에이전트 (k8s SA)
├── Registration Entry: T2 게이트웨이 (HW serial)
└── Registration Entry: T3 디바이스 (TPM attestation)
SPIRE Agent (각 노드에 DaemonSet/프로세스로 실행)
└── Workload API (Unix socket)
└── Go Workload (SVID 자동 갱신)
```
### 7.2 Go SDK 사용법
#### `internal/security/spiffe.go`
```go
package security
import (
"context"
"fmt"
"github.com/spiffe/go-spiffe/v2/spiffeid"
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
"github.com/spiffe/go-spiffe/v2/workloadapi"
"google.golang.org/grpc/credentials"
)
// SPIFFECredentials는 SPIRE Workload API에서 SVID를 가져와
// gRPC TransportCredentials를 생성한다.
type SPIFFECredentials struct {
source *workloadapi.X509Source
}
// NewSPIFFECredentials는 SPIRE Agent 소켓에 연결하고
// X.509 SVID 소스를 초기화한다.
func NewSPIFFECredentials(ctx context.Context, socketPath string) (*SPIFFECredentials, error) {
// SPIRE Agent Workload API에 연결
source, err := workloadapi.NewX509Source(
ctx,
workloadapi.WithClientOptions(
workloadapi.WithAddr("unix://" + socketPath),
),
)
if err != nil {
return nil, fmt.Errorf("workload API connect: %w", err)
}
return &SPIFFECredentials{source: source}, nil
}
// ServerCredentials는 gRPC 서버용 mTLS 자격증명을 반환한다.
// 클라이언트 SVID를 trustDomain으로 검증한다.
func (s *SPIFFECredentials) ServerCredentials(trustDomain string) (credentials.TransportCredentials, error) {
td, err := spiffeid.TrustDomainFromString(trustDomain)
if err != nil {
return nil, fmt.Errorf("trust domain: %w", err)
}
tlsCfg := tlsconfig.MTLSServerConfig(
s.source,
s.source,
tlsconfig.AuthorizeMemberOf(td),
)
return credentials.NewTLS(tlsCfg), nil
}
// ClientCredentials는 gRPC 클라이언트용 mTLS 자격증명을 반환한다.
// 서버 SPIFFE ID를 serverID로 검증한다.
func (s *SPIFFECredentials) ClientCredentials(serverSpiffeID string) (credentials.TransportCredentials, error) {
serverID, err := spiffeid.IDFromString(serverSpiffeID)
if err != nil {
return nil, fmt.Errorf("server SPIFFE ID: %w", err)
}
tlsCfg := tlsconfig.MTLSClientConfig(
s.source,
s.source,
tlsconfig.AuthorizeID(serverID),
)
return credentials.NewTLS(tlsCfg), nil
}
// Close는 X509Source를 닫는다.
func (s *SPIFFECredentials) Close() error { return s.source.Close() }
```
### 7.3 SVID 발급 흐름
```
T3 Field Agent 부팅
├─► TPM Quote 생성 (PCR 측정값)
├─► SPIRE Agent에 Attestation 요청 (TPM 플러그인)
│ └── SPIRE Agent → SPIRE Server 검증
├─► SPIRE Server: Registration Entry 매칭
│ └── spiffe://iot/{org}/{factory}/device/{serial}
├─► X.509-SVID 발급 (TTL: 1시간)
│ ├── Subject: spiffe://iot/...
│ └── SAN: URI = SPIFFE ID
└─► Workload API를 통해 Go 워크로드에 SVID 제공
└── go-spiffe SDK가 자동 갱신 (만료 10분 전)
```
### 7.4 SPIRE Registration Entry 예시
```bash
# T2 게이트웨이 등록 (하드웨어 시리얼 기반)
spire-server entry create \
-spiffeID spiffe://edge-aiot/t2/gateway/GW-001 \
-parentID spiffe://edge-aiot/spire-agent \
-selector unix:uid:1000 \
-selector aws_iid:instance-id:i-0abc123
# T3 Field Agent 등록 (TPM attestation)
spire-server entry create \
-spiffeID spiffe://edge-aiot/t3/factory-a/device/SENSOR-001 \
-parentID spiffe://edge-aiot/spire-agent \
-selector tpm:pub_hash:sha256:abcdef...
```
---
## 8. OpenTelemetry 통합
### 8.1 TracerProvider 초기화
#### `internal/observability/tracer.go`
```go
package observability
import (
"context"
"fmt"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
// InitTracer는 OTel TracerProvider를 초기화하고 글로벌에 등록한다.
// otlpEndpoint: OTel Collector OTLP gRPC 엔드포인트 (예: "otel-collector:4317")
func InitTracer(ctx context.Context, serviceName, otlpEndpoint string) (func(context.Context) error, error) {
// OTLP gRPC exporter 생성
conn, err := grpc.DialContext(ctx, otlpEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, fmt.Errorf("grpc dial otlp: %w", err)
}
exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
return nil, fmt.Errorf("otlp exporter: %w", err)
}
// 리소스 속성 (서비스 이름, 버전, 배포 환경)
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceName(serviceName),
semconv.ServiceVersion("0.1.0"),
),
resource.WithOS(),
resource.WithHost(),
)
if err != nil {
return nil, fmt.Errorf("resource: %w", err)
}
// TracerProvider 생성
tp := sdktrace.NewTracerProvider(
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(res),
sdktrace.WithSampler(sdktrace.AlwaysSample()), // 실험 환경 — 프로덕션은 ParentBased
)
// 글로벌 TracerProvider 및 Propagator 설정
otel.SetTracerProvider(tp)
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
propagation.TraceContext{}, // W3C traceparent/tracestate
propagation.Baggage{},
))
// 종료 함수 반환 (defer로 호출)
return tp.Shutdown, nil
}
```
### 8.2 gRPC Interceptor에서 Trace Propagation
#### `internal/gateway/interceptors/otel.go`
```go
package interceptors
import (
"context"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)
// OTelUnary는 Unary RPC에 OTel trace를 주입하는 인터셉터다.
// otelgrpc.UnaryServerInterceptor()를 래핑하여 Resume Token trace 연속성을 추가한다.
func OTelUnary() grpc.UnaryServerInterceptor {
base := otelgrpc.UnaryServerInterceptor()
return func(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// Resume Token에 포함된 traceparent로 trace 연속성 복원
ctx = restoreTraceFromResumeToken(ctx)
return base(ctx, req, info, handler)
}
}
// OTelStream는 Streaming RPC에 OTel trace를 주입하는 인터셉터다.
func OTelStream() grpc.StreamServerInterceptor {
base := otelgrpc.StreamServerInterceptor()
return func(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctx := restoreTraceFromResumeToken(ss.Context())
wrapped := &otelServerStream{ServerStream: ss, ctx: ctx}
return base(srv, wrapped, info, handler)
}
}
// restoreTraceFromResumeToken은 Resume Token에 포함된 traceparent를
// context에 복원하여 단절-재연결 후에도 trace 체인이 유지되도록 한다.
func restoreTraceFromResumeToken(ctx context.Context) context.Context {
state, ok := GetResumeState(ctx)
if !ok || state.TraceParent == "" {
return ctx
}
// traceparent를 metadata carrier로 변환하여 propagate
carrier := propagation.MapCarrier{
"traceparent": state.TraceParent,
}
return otel.GetTextMapPropagator().Extract(ctx, carrier)
}
// InjectResumeTraceParent는 현재 span의 traceparent를 ResumeState에 저장한다.
// Resume Token 발급 시 호출한다.
func InjectResumeTraceParent(ctx context.Context) string {
carrier := propagation.MapCarrier{}
otel.GetTextMapPropagator().Inject(ctx, carrier)
return carrier.Get("traceparent")
}
// T2FanOutSpan은 T3 디바이스에서 전달된 trace를 T2 게이트웨이에서
// fan-out 지점으로 처리하는 span을 생성한다.
func T2FanOutSpan(ctx context.Context, deviceID, metric string) (context.Context, trace.Span) {
tracer := otel.Tracer("t2-gateway")
return tracer.Start(ctx, "t2.fanout",
trace.WithAttributes(
attribute.String("device.id", deviceID),
attribute.String("telemetry.metric", metric),
),
)
}
// RecordError는 gRPC 에러를 span에 기록한다.
func RecordError(span trace.Span, err error) {
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}
}
type otelServerStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *otelServerStream) Context() context.Context { return s.ctx }
```
### 8.3 MQTT→gRPC Trace 연속성 패턴
T3 디바이스가 MQTT로 발행할 때 UserProperties(MQTT 5.0) 또는 페이로드 필드로 `traceparent`를 포함시키고, T2 게이트웨이의 `Translator`가 이를 gRPC metadata로 변환하여 T1까지 trace 체인을 연결한다.
```go
// MQTT 페이로드의 Props 필드에서 traceparent 추출 후 context 복원 예시
func extractTraceFromMQTTPayload(payload TelemetryPayload) context.Context {
ctx := context.Background()
if len(payload.Props) == 0 {
return ctx
}
carrier := propagation.MapCarrier(payload.Props)
return otel.GetTextMapPropagator().Extract(ctx, carrier)
}
```
---
## 9. Docker Compose 실험 환경 설계
### `deployments/docker-compose.yml`
```yaml
version: "3.9"
networks:
t1-net:
driver: bridge
t2-net:
driver: bridge
t3-net:
driver: bridge
obs-net:
driver: bridge
volumes:
spire-data:
prometheus-data:
grafana-data:
services:
# ─────────────────────────────────────────────────────
# T1: 클라우드 계층
# ─────────────────────────────────────────────────────
cloud-server:
build:
context: ..
dockerfile: deployments/Dockerfile.go
args:
CMD: cmd/cloud-server
image: edge-aiot/cloud-server:dev
container_name: t1-cloud-server
networks: [t1-net, obs-net]
ports:
- "9443:9443" # gRPC over QUIC
environment:
- GRPC_ADDR=:9443
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
volumes:
- /run/spire/t1/sockets:/run/spire/sockets:ro
depends_on:
- spire-server
- otel-collector
t1-orchestrator:
build:
context: ../python
dockerfile: ../deployments/Dockerfile.python
image: edge-aiot/orchestrator:dev
container_name: t1-orchestrator
networks: [t1-net, obs-net]
ports:
- "8080:8080" # FastAPI 관리 인터페이스
environment:
- GATEWAY_A2A_URL=https://t2-gateway:8443
- CLOUD_GRPC_ADDR=cloud-server:9443
- LLM_MODEL=gpt-4o
- OPENAI_API_KEY=${OPENAI_API_KEY}
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
depends_on:
- cloud-server
# ─────────────────────────────────────────────────────
# T2: 엣지 계층
# ─────────────────────────────────────────────────────
t2-gateway:
build:
context: ..
dockerfile: deployments/Dockerfile.go
args:
CMD: cmd/gateway
image: edge-aiot/gateway:dev
container_name: t2-gateway
networks: [t1-net, t2-net, t3-net, obs-net]
ports:
- "19443:9443" # gRPC over QUIC (T1 방향)
- "18443:8443" # A2A HTTP/3
environment:
- GRPC_ADDR=:9443
- A2A_ADDR=:8443
- MQTT_BROKER=mqtt-broker
- MQTT_PORT=1883
- T1_GRPC_ADDR=cloud-server:9443
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
- SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
volumes:
- /run/spire/t2/sockets:/run/spire/sockets:ro
depends_on:
- mqtt-broker
- spire-server
- otel-collector
# ─────────────────────────────────────────────────────
# T3: 현장 계층 (복수 인스턴스)
# ─────────────────────────────────────────────────────
t3-field-agent-1:
build:
context: ..
dockerfile: deployments/Dockerfile.go
args:
CMD: cmd/field-agent
image: edge-aiot/field-agent:dev
container_name: t3-field-agent-1
networks: [t3-net, obs-net]
environment:
- DEVICE_ID=SENSOR-001
- SITE_ID=factory-a
- MQTT_BROKER=mqtt-broker
- MQTT_PORT=1883
- OFFLINE_DB=/data/queue.db
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
volumes:
- ./data/agent-1:/data
depends_on:
- mqtt-broker
t3-field-agent-2:
extends:
service: t3-field-agent-1
container_name: t3-field-agent-2
environment:
- DEVICE_ID=SENSOR-002
- SITE_ID=factory-a
- MQTT_BROKER=mqtt-broker
- MQTT_PORT=1883
- OFFLINE_DB=/data/queue.db
volumes:
- ./data/agent-2:/data
t3-field-agent-3:
extends:
service: t3-field-agent-1
container_name: t3-field-agent-3
environment:
- DEVICE_ID=AGV-001
- SITE_ID=factory-a
- MQTT_BROKER=mqtt-broker
- MQTT_PORT=1883
- OFFLINE_DB=/data/queue.db
volumes:
- ./data/agent-3:/data
# ─────────────────────────────────────────────────────
# 인프라: MQTT 브로커
# ─────────────────────────────────────────────────────
mqtt-broker:
image: eclipse-mosquitto:2.0
container_name: mqtt-broker
networks: [t2-net, t3-net]
ports:
- "1883:1883"
- "9001:9001" # WebSocket
volumes:
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
# ─────────────────────────────────────────────────────
# 보안: SPIRE
# ─────────────────────────────────────────────────────
spire-server:
image: ghcr.io/spiffe/spire-server:1.9.0
container_name: spire-server
networks: [t1-net]
ports:
- "8081:8081"
volumes:
- ./spire/server.conf:/etc/spire/server/server.conf:ro
- spire-data:/var/lib/spire
command: ["-config", "/etc/spire/server/server.conf"]
spire-agent-t1:
image: ghcr.io/spiffe/spire-agent:1.9.0
container_name: spire-agent-t1
networks: [t1-net]
volumes:
- ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
- /run/spire/t1/sockets:/run/spire/sockets
depends_on: [spire-server]
command: ["-config", "/etc/spire/agent/agent.conf"]
spire-agent-t2:
image: ghcr.io/spiffe/spire-agent:1.9.0
container_name: spire-agent-t2
networks: [t2-net]
volumes:
- ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
- /run/spire/t2/sockets:/run/spire/sockets
depends_on: [spire-server]
command: ["-config", "/etc/spire/agent/agent.conf"]
# ─────────────────────────────────────────────────────
# 관측성: OTel Collector + Prometheus + Grafana + Jaeger
# ─────────────────────────────────────────────────────
otel-collector:
image: otel/opentelemetry-collector-contrib:0.100.0
container_name: otel-collector
networks: [t1-net, t2-net, t3-net, obs-net]
ports:
- "4317:4317" # OTLP gRPC
- "4318:4318" # OTLP HTTP
- "8889:8889" # Prometheus metrics exporter
volumes:
- ./otel/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
command: ["--config", "/etc/otelcol/config.yaml"]
jaeger:
image: jaegertracing/all-in-one:1.57
container_name: jaeger
networks: [obs-net]
ports:
- "16686:16686" # Jaeger UI
- "14250:14250" # gRPC (OTel Collector → Jaeger)
prometheus:
image: prom/prometheus:v2.51.0
container_name: prometheus
networks: [obs-net]
ports:
- "9090:9090"
volumes:
- ./otel/prometheus.yml:/etc/prometheus/prometheus.yml:ro
- prometheus-data:/prometheus
grafana:
image: grafana/grafana:10.4.0
container_name: grafana
networks: [obs-net]
ports:
- "3000:3000"
environment:
- GF_SECURITY_ADMIN_PASSWORD=admin
- GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/edge-aiot.json
volumes:
- grafana-data:/var/lib/grafana
- ./grafana/dashboards:/var/lib/grafana/dashboards:ro
depends_on:
- prometheus
- jaeger
```
### `deployments/Dockerfile.go`
```dockerfile
# 멀티 스테이지 빌드 — 최소 런타임 이미지
FROM golang:1.22-alpine AS builder
WORKDIR /app
COPY go.mod go.sum ./
RUN go mod download
COPY . .
ARG CMD=cmd/gateway
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/app ./${CMD}/...
FROM gcr.io/distroless/static-debian12
COPY --from=builder /bin/app /app
ENTRYPOINT ["/app"]
```
### `deployments/otel/otel-collector-config.yaml` (핵심 스니펫)
```yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
timeout: 1s
send_batch_size: 1024
memory_limiter:
check_interval: 1s
limit_mib: 512
exporters:
jaeger:
endpoint: jaeger:14250
tls:
insecure: true
prometheus:
endpoint: "0.0.0.0:8889"
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [jaeger]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [prometheus]
```
---
## 10. 구현 단계별 계획 (Phase)
### Phase 1: Proto 정의 + 기본 gRPC over QUIC (2주)
**목표**: 컴파일 가능한 Proto + quic-go 위에서 gRPC 서버/클라이언트 통신 검증
| 주 | 작업 항목 | 산출물 |
|----|---------|--------|
| 1 | Proto 파일 설계 및 `buf generate` 설정 | `proto/`, `gen/` 디렉터리 |
| 1 | Go 모듈 초기화, 의존성 설치 | `go.mod`, `go.sum` |
| 2 | quic-go + gRPC-Go 통합 (QuicNetListener 구현) | `internal/gateway/server.go` |
| 2 | T1 gRPC-over-QUIC 클라이언트 구현 | `internal/cloud/grpc_quic_client.go` |
| 2 | 기본 AgentService Unary RPC 동작 검증 | `tests/integration/basic_grpc_test.go` |
**완료 기준**: `go test ./tests/integration/... -run TestBasicUnaryRPC` 통과
### Phase 2: MQTT 변환 게이트웨이 (2주)
**목표**: T3 MQTT 발행 → T2 변환 → T1 gRPC 전달 end-to-end 동작
| 주 | 작업 항목 | 산출물 |
|----|---------|--------|
| 3 | MQTT 클라이언트 구현 + Topic 라우팅 | `internal/gateway/mqtt_client.go` |
| 3 | MQTT→gRPC 변환 엔진 (`Translator`) | `internal/gateway/translator.go` |
| 4 | T3 Field Agent (MQTT 발행 + BoltDB 오프라인 큐) | `internal/field/` 패키지 |
| 4 | Docker Compose 기본 환경 (T1+T2+T3+Mosquitto) | `deployments/docker-compose.yml` |
| 4 | 오프라인 시나리오 테스트 (MQTT 브로커 중단 후 재연결) | `tests/integration/offline_test.go` |
**완료 기준**: T3 → MQTT → T2 Translator → gRPC → T1 텔레메트리 수신 확인
### Phase 3: A2A 통합 + Resume Token (2주)
**목표**: A2A HTTP/3 엔드포인트 + Resume Token 기반 스트리밍 재개 동작
| 주 | 작업 항목 | 산출물 |
|----|---------|--------|
| 5 | Resume Token Manager + gRPC Interceptor | `internal/gateway/resume_token.go`, `interceptors/resume.go` |
| 5 | A2A HTTP/3 핸들러 (quic-go http3 패키지) | `internal/gateway/a2a_handler.go` |
| 6 | Python A2A 클라이언트 + LangGraph 에이전트 | `python/orchestrator/` |
| 6 | 스트리밍 중단-재개 통합 테스트 | `tests/integration/resume_test.go` |
**완료 기준**: 스트리밍 중 네트워크 차단 → 재연결 → Resume Token으로 이어받기 검증
### Phase 4: SPIFFE/SPIRE + OTel (2주)
**목표**: mTLS(SPIFFE SVID) + 분산 트레이싱 end-to-end 동작
| 주 | 작업 항목 | 산출물 |
|----|---------|--------|
| 7 | SPIRE Server/Agent Docker Compose 통합 | `deployments/docker-compose.spire.yml` |
| 7 | go-spiffe SDK 래퍼 + gRPC credentials 교체 | `internal/security/spiffe.go` |
| 8 | OTel TracerProvider + otelgrpc 인터셉터 | `internal/observability/tracer.go` |
| 8 | MQTT→gRPC trace propagation (traceparent in payload Props) | `internal/gateway/interceptors/otel.go` |
| 8 | Jaeger에서 T3→T2→T1 trace 체인 시각화 확인 | - |
**완료 기준**: Jaeger에서 단일 MQTT 발행의 전체 트레이스(T3→T2→T1) 확인
### Phase 5: 실험 검증 (2주)
**목표**: 성능 지표 측정, 단절 내성 검증, 문서화
| 주 | 작업 항목 | 산출물 |
|----|---------|--------|
| 9 | 성능 벤치마크 (gRPC vs REST 비교, latency/throughput) | `tests/perf/benchmark_test.go` |
| 9 | 네트워크 파티션 시나리오 (tc netem으로 손실·지연 주입) | chaos 테스트 스크립트 |
| 10 | Grafana 대시보드 구성 (latency, throughput, error rate) | `deployments/grafana/dashboards/` |
| 10 | 실험 결과 분석 및 구현 설계서 업데이트 | 본 문서 v1.0 |
**완료 기준**: 모든 KPI 측정값 수집 완료, Grafana 대시보드 정상 동작
---
## 11. 테스트 전략
### 11.1 단위 테스트
```
대상 패키지 테스트 항목
───────────────────────────────── ────────────────────────────────────
internal/gateway/resume_token.go 토큰 발급/검증, 서명 위조 감지, 만료 처리
internal/gateway/translator.go MQTT 토픽 패턴 매칭, JSON→Protobuf 변환
internal/field/offline_queue.go Enqueue/Drain 순서 보장, 동시 접근
internal/security/spiffe.go SVID 갱신 모킹, 인증 실패 처리
```
```go
// 예시: Resume Token 단위 테스트
func TestResumeTokenRoundTrip(t *testing.T) {
mgr := NewResumeTokenManager()
state := ResumeState{
StreamID: "stream-abc",
SequenceNum: 42,
DeviceID: "SENSOR-001",
TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
}
token, err := mgr.Issue(state)
require.NoError(t, err)
require.NotEmpty(t, token)
restored, err := mgr.Verify(token)
require.NoError(t, err)
assert.Equal(t, state.StreamID, restored.StreamID)
assert.Equal(t, state.SequenceNum, restored.SequenceNum)
assert.Equal(t, state.TraceParent, restored.TraceParent)
}
func TestResumeTokenTampering(t *testing.T) {
mgr := NewResumeTokenManager()
state := ResumeState{StreamID: "s1", SequenceNum: 1, DeviceID: "d1"}
token, _ := mgr.Issue(state)
// 마지막 바이트 변조
tampered := token[:len(token)-2] + "ZZ"
_, err := mgr.Verify(tampered)
assert.Error(t, err)
}
```
### 11.2 통합 테스트
```go
// 예시: MQTT→gRPC 변환 통합 테스트 (testcontainers-go 사용)
func TestMQTTToGRPCTranslation(t *testing.T) {
ctx := context.Background()
// Mosquitto 컨테이너 기동
mosquitto, err := testcontainers.GenericContainer(ctx,
testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "eclipse-mosquitto:2.0",
ExposedPorts: []string{"1883/tcp"},
WaitingFor: wait.ForListeningPort("1883/tcp"),
},
Started: true,
},
)
require.NoError(t, err)
defer mosquitto.Terminate(ctx)
host, _ := mosquitto.Host(ctx)
port, _ := mosquitto.MappedPort(ctx, "1883")
brokerAddr := fmt.Sprintf("%s:%s", host, port.Port())
// Gateway 초기화
gw, err := gateway.New(ctx, gateway.GatewayConfig{
GRPCAddr: ":0",
MQTTBroker: brokerAddr,
})
require.NoError(t, err)
// 텔레메트리 수신 채널
received := make(chan *telemetryv1.DataPoint, 1)
gw.SetTelemetryHandler(func(dp *telemetryv1.DataPoint) {
received <- dp
})
// T3 역할: MQTT 발행
publisher, _ := newTestMQTTClient(brokerAddr)
payload := `{"v":42.5,"u":"celsius","ts":1234567890000}`
publisher.Publish("telemetry/factory-a/SENSOR-001/temperature", 1, false, payload)
// 변환 결과 검증
select {
case dp := <-received:
assert.Equal(t, "SENSOR-001", dp.DeviceId)
assert.Equal(t, "temperature", dp.Metric)
assert.InDelta(t, 42.5, dp.Value, 0.001)
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for telemetry")
}
}
```
### 11.3 성능 테스트
```go
// tests/perf/benchmark_test.go
func BenchmarkUnaryRPCOverQUIC(b *testing.B) {
// 환경: Docker Compose로 T1+T2 기동 후 실행
client, err := cloud.NewT1Client(
context.Background(),
"localhost:9443",
testTLSConfig(),
)
require.NoError(b, err)
defer client.Close()
task := &agentv1.AgentTask{
TaskId: "bench",
AgentId: "t1",
TaskType: "query",
Payload: make([]byte, 256), // 256B 페이로드
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, err := client.ExecuteTask(ctx, task)
cancel()
if err != nil {
b.Errorf("RPC error: %v", err)
}
}
})
}
func BenchmarkTelemetryFanIn(b *testing.B) {
// 1000개 T3 에이전트에서 동시 텔레메트리 발행
b.SetParallelism(1000)
// 구현 생략 (위와 유사한 패턴)
}
```
---
## 12. 예상 성능 지표 및 측정 방법
### 12.1 목표 성능 지표 (KPI)
| 지표 | 목표값 | 측정 방법 |
|------|--------|---------|
| T1↔T2 gRPC Unary p50 latency | < 5ms (LAN) | `go test -bench=BenchmarkUnaryRPC` + `otelgrpc` 자동 계측 |
| T1↔T2 gRPC Unary p99 latency | < 20ms (LAN) | Jaeger → Grafana 히스토그램 |
| T2↔T3 MQTT→gRPC 변환 지연 | < 10ms | MQTT 발행 TS → gRPC 수신 TS 차분 |
| T3 텔레메트리 동시 처리 | 1,000 노드 @ 1Hz | `BenchmarkTelemetryFanIn` |
| Resume Token 재연결 후 재개 지연 | < 500ms | 네트워크 차단 → 재연결 시간 측정 |
| gRPC over QUIC vs TCP 핸드오버 | QUIC -50% RTT | `tc netem`으로 IP 변경 시뮬레이션 |
| SPIFFE SVID 갱신 오버헤드 | < 1ms/RPC | `pprof` + Prometheus `grpc_server_handling_seconds` |
### 12.2 측정 방법 상세
#### Prometheus 메트릭 수집
```yaml
# deployments/otel/prometheus.yml
scrape_configs:
- job_name: 'cloud-server'
static_configs:
- targets: ['cloud-server:2112']
scrape_interval: 5s
- job_name: 't2-gateway'
static_configs:
- targets: ['t2-gateway:2112']
- job_name: 'otel-collector'
static_configs:
- targets: ['otel-collector:8889']
```
#### 주요 모니터링 메트릭
```
# gRPC 레이턴시 히스토그램 (otelgrpc 자동 생성)
grpc_server_handling_seconds_bucket{grpc_method="Execute",grpc_service="agent.v1.AgentService"}
# MQTT 메시지 처리량
mqtt_messages_received_total{topic="telemetry"}
mqtt_translation_duration_seconds_bucket
# Resume Token 재연결
resume_token_reconnect_total
resume_token_reconnect_latency_seconds
# 오프라인 큐 크기
offline_queue_size{device_id="SENSOR-001"}
```
#### 네트워크 장애 주입 (tc netem)
```bash
# T2 게이트웨이에서 T1으로 가는 경로에 30% 패킷 손실 + 100ms 지연 주입
docker exec t2-gateway tc qdisc add dev eth0 root netem loss 30% delay 100ms
# 30초 후 정상 복구
sleep 30
docker exec t2-gateway tc qdisc del dev eth0 root
# IP 변경 시뮬레이션 (QUIC connection migration 테스트)
docker network disconnect t1-net t2-gateway
docker network connect t1-net t2-gateway
```
#### 핵심 그래프 (Grafana 대시보드)
1. **레이턴시 p50/p95/p99 시계열**: tier별 gRPC 호출 지연 추이
2. **처리량 시계열**: 초당 RPC 수, MQTT 메시지 수
3. **에러율**: gRPC 상태 코드별 에러율 (UNAVAILABLE, DEADLINE_EXCEEDED 등)
4. **Resume Token 이벤트**: 단절/재연결 빈도 및 재개 지연
5. **오프라인 큐 크기**: T3 디바이스별 버퍼 누적 현황
6. **Jaeger 트레이스 뷰**: T3→T2→T1 E2E 호출 그래프
---
## 부록 A. Go 모듈 의존성 (`go.mod` 핵심 부분)
```go
module github.com/your-org/edge-aiot-mas
go 1.22
require (
// gRPC
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
// QUIC
github.com/quic-go/quic-go v0.44.0
// MQTT
github.com/eclipse/paho.mqtt.golang v1.4.3
// SPIFFE/SPIRE
github.com/spiffe/go-spiffe/v2 v2.3.0
// OpenTelemetry
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/sdk v1.27.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
// BoltDB (오프라인 큐)
go.etcd.io/bbolt v1.3.10
// 테스트
github.com/testcontainers/testcontainers-go v0.31.0
github.com/stretchr/testify v1.9.0
)
```
## 부록 B. Python 의존성 (`python/pyproject.toml` 핵심 부분)
```toml
[project]
name = "edge-aiot-orchestrator"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
# LangGraph 에이전트
"langgraph>=0.1.0",
"langchain-openai>=0.1.0",
"langchain-core>=0.2.0",
# gRPC
"grpcio>=1.64.0",
"grpcio-tools>=1.64.0",
"protobuf>=5.27.0",
# HTTP/A2A 클라이언트
"httpx[http2]>=0.27.0",
# 관측성
"opentelemetry-sdk>=1.27.0",
"opentelemetry-exporter-otlp>=1.27.0",
"opentelemetry-instrumentation-grpc>=0.48b0",
]
[project.optional-dependencies]
dev = [
"pytest>=8.0",
"pytest-asyncio>=0.23",
"pytest-cov>=5.0",
]
```