2661 lines
82 KiB
Markdown
2661 lines
82 KiB
Markdown
# 엣지 AIoT 3-tier 멀티 에이전트 통신 프레임워크 — 구현 설계서
|
|
|
|
**문서 버전**: v0.1.0
|
|
**최종 수정**: 2026-06-07
|
|
**관련 문서**: `FINAL_REPORT.md` (설계 근거 및 사용 사례 분석)
|
|
|
|
---
|
|
|
|
## 목차
|
|
|
|
1. [기술 스택 선정 근거](#1-기술-스택-선정-근거)
|
|
2. [프로젝트 디렉터리 구조](#2-프로젝트-디렉터리-구조)
|
|
3. [Proto 파일 설계](#3-proto-파일-설계)
|
|
4. [T2 Edge Gateway 구현 설계](#4-t2-edge-gateway-구현-설계)
|
|
5. [T1 Orchestrator 구현 설계](#5-t1-orchestrator-구현-설계)
|
|
6. [T3 Field Agent 구현 설계](#6-t3-field-agent-구현-설계)
|
|
7. [SPIFFE/SPIRE 통합](#7-spiffespire-통합)
|
|
8. [OpenTelemetry 통합](#8-opentelemetry-통합)
|
|
9. [Docker Compose 실험 환경 설계](#9-docker-compose-실험-환경-설계)
|
|
10. [구현 단계별 계획 (Phase)](#10-구현-단계별-계획-phase)
|
|
11. [테스트 전략](#11-테스트-전략)
|
|
12. [예상 성능 지표 및 측정 방법](#12-예상-성능-지표-및-측정-방법)
|
|
|
|
---
|
|
|
|
## 1. 기술 스택 선정 근거
|
|
|
|
### 1.1 핵심 언어 선정
|
|
|
|
| 언어 | 적용 컴포넌트 | 선정 이유 |
|
|
|------|-------------|-----------|
|
|
| **Go** | T2 Gateway, T3 Field Agent, gRPC 서버/클라이언트 | 단일 바이너리 배포, 낮은 메모리 푸트프린트, 고루틴 기반 고동시성, quic-go 네이티브 지원, 크로스 컴파일(ARM/MIPS/x86) |
|
|
| **Python** | T1 Orchestrator LLM 에이전트, A2A 클라이언트 | LangGraph/AutoGen 등 LLM 프레임워크 생태계, 빠른 프로토타이핑, grpcio 지원 |
|
|
|
|
### 1.2 전송 계층 라이브러리
|
|
|
|
#### quic-go (`github.com/quic-go/quic-go`)
|
|
- Go 네이티브 QUIC RFC 9000 구현 — CGO 의존 없음
|
|
- `net/http` 인터페이스 호환 → HTTP/3 서버를 표준 핸들러로 운영 가능
|
|
- Connection migration 내장: 핸드오버(IP 변경) 시 세션 유지
|
|
- QUIC Datagram 확장(RFC 9221) 지원 → 저지연 V2X 메시지에 활용 가능
|
|
- gRPC-Go 트랜스포트 레이어 교체 방식으로 통합 (`WithContextDialer` + `quic.DialAddr`)
|
|
|
|
#### gRPC-Go (`google.golang.org/grpc`)
|
|
- 공식 Google Go gRPC 구현 — Interceptor Chain, Health Checking, Service Config(retry policy) 내장
|
|
- Protobuf codec 교체 가능 → 커스텀 직렬화 확장 가능
|
|
- `grpc.WithTransportCredentials` + quic-go TLS 설정을 연동하여 mTLS over QUIC 구현
|
|
|
|
#### Eclipse Paho Go (`github.com/eclipse/paho.mqtt.golang`)
|
|
- MQTT 3.1.1 / 5.0 지원 Go 클라이언트
|
|
- Reconnect 자동 처리 + Message Queue(오프라인 버퍼)
|
|
- T3 현장 디바이스와의 Pub/Sub 인터페이스
|
|
|
|
### 1.3 에이전트 프레임워크
|
|
|
|
| 프레임워크 | 선정 이유 |
|
|
|-----------|-----------|
|
|
| **LangGraph** (Python) | 상태 머신 기반 에이전트 그래프, 체크포인팅·재시도 내장, T1 Orchestrator의 복잡한 오케스트레이션 로직에 적합 |
|
|
| **A2A Protocol** | Google 주도 에이전트 간 통신 표준, HTTP/JSON 기반으로 언어 무관 상호운용성, Task/Artifact 개념으로 장기 실행 작업 모델링 |
|
|
|
|
### 1.4 보안 및 관측성
|
|
|
|
| 라이브러리 | 선정 이유 |
|
|
|-----------|-----------|
|
|
| **SPIFFE/SPIRE Go SDK** (`github.com/spiffe/go-spiffe/v2`) | X.509-SVID 자동 갱신, gRPC `credentials.TransportCredentials` 구현 제공, TPM attestation 플러그인 체인 |
|
|
| **OpenTelemetry Go SDK** (`go.opentelemetry.io/otel`) | 벤더 중립 trace/metric/log, gRPC 자동 계측(`otelgrpc`), W3C TraceContext 전파 |
|
|
| **Prometheus** | Go 네이티브 exporter, 엣지 환경 경량 스크랩 |
|
|
|
|
---
|
|
|
|
## 2. 프로젝트 디렉터리 구조
|
|
|
|
```
|
|
edge-aiot-mas/
|
|
├── proto/ # Protobuf 스키마 (언어 무관 공유)
|
|
│ ├── agent/
|
|
│ │ └── v1/
|
|
│ │ └── agent.proto # AgentService (Unary/Stream 4모드)
|
|
│ ├── telemetry/
|
|
│ │ └── v1/
|
|
│ │ └── telemetry.proto # TelemetryService
|
|
│ ├── control/
|
|
│ │ └── v1/
|
|
│ │ └── control.proto # ControlService (OTA, Config)
|
|
│ └── buf.yaml # Buf 스키마 설정
|
|
│
|
|
├── gen/ # proto 컴파일 출력 (생성 파일, git 추적)
|
|
│ ├── go/
|
|
│ │ ├── agent/v1/
|
|
│ │ ├── telemetry/v1/
|
|
│ │ └── control/v1/
|
|
│ └── python/
|
|
│ ├── agent/v1/
|
|
│ └── telemetry/v1/
|
|
│
|
|
├── go.mod # Go 모듈 루트
|
|
├── go.sum
|
|
│
|
|
├── cmd/
|
|
│ ├── gateway/
|
|
│ │ └── main.go # T2 Edge Gateway 진입점
|
|
│ ├── field-agent/
|
|
│ │ └── main.go # T3 Field Agent 진입점
|
|
│ └── cloud-server/
|
|
│ └── main.go # T1 Go gRPC 서버 진입점
|
|
│
|
|
├── internal/
|
|
│ ├── gateway/
|
|
│ │ ├── server.go # gRPC over QUIC 서버 초기화
|
|
│ │ ├── mqtt_client.go # MQTT 브로커 클라이언트
|
|
│ │ ├── translator.go # MQTT topic → gRPC method 변환 엔진
|
|
│ │ ├── resume_token.go # Resume Token 관리자
|
|
│ │ ├── a2a_handler.go # A2A HTTP/3 엔드포인트
|
|
│ │ └── interceptors/
|
|
│ │ ├── resume.go # Resume Token gRPC Interceptor
|
|
│ │ ├── deadline.go # Deadline 강제 Interceptor
|
|
│ │ └── otel.go # OTel trace 전파 Interceptor
|
|
│ │
|
|
│ ├── field/
|
|
│ │ ├── mqtt_publisher.go # MQTT 텔레메트리 발행
|
|
│ │ ├── offline_queue.go # 오프라인 큐 (BoltDB)
|
|
│ │ └── resume_client.go # Resume Token 클라이언트
|
|
│ │
|
|
│ ├── cloud/
|
|
│ │ ├── agent_service.go # AgentService 서버 구현
|
|
│ │ └── telemetry_service.go # TelemetryService 서버 구현
|
|
│ │
|
|
│ ├── security/
|
|
│ │ ├── spiffe.go # SPIFFE/SPIRE 클라이언트 래퍼
|
|
│ │ └── credentials.go # gRPC TransportCredentials 생성
|
|
│ │
|
|
│ └── observability/
|
|
│ ├── tracer.go # OTel TracerProvider 초기화
|
|
│ └── metrics.go # Prometheus 메트릭 등록
|
|
│
|
|
├── python/
|
|
│ ├── pyproject.toml
|
|
│ ├── orchestrator/
|
|
│ │ ├── __init__.py
|
|
│ │ ├── agent.py # LangGraph 오케스트레이터 에이전트
|
|
│ │ ├── a2a_client.py # A2A 클라이언트
|
|
│ │ ├── grpc_client.py # gRPC-over-QUIC 클라이언트 (aioquic 또는 subprocess)
|
|
│ │ └── tools/
|
|
│ │ ├── telemetry_tool.py # 텔레메트리 조회 도구
|
|
│ │ └── control_tool.py # 제어 명령 도구
|
|
│ └── tests/
|
|
│ └── test_agent.py
|
|
│
|
|
├── deployments/
|
|
│ ├── docker-compose.yml # 전체 실험 환경
|
|
│ ├── docker-compose.spire.yml # SPIRE 오버레이
|
|
│ ├── spire/
|
|
│ │ ├── server.conf
|
|
│ │ └── agent.conf
|
|
│ ├── mosquitto/
|
|
│ │ └── mosquitto.conf
|
|
│ └── otel/
|
|
│ └── otel-collector-config.yaml
|
|
│
|
|
├── configs/
|
|
│ ├── gateway.yaml # T2 Gateway 설정
|
|
│ ├── field-agent.yaml # T3 Field Agent 설정
|
|
│ └── cloud-server.yaml # T1 서버 설정
|
|
│
|
|
└── tests/
|
|
├── integration/
|
|
│ ├── gateway_test.go
|
|
│ └── e2e_test.go
|
|
└── perf/
|
|
└── benchmark_test.go
|
|
```
|
|
|
|
---
|
|
|
|
## 3. Proto 파일 설계
|
|
|
|
### 3.1 AgentService (`proto/agent/v1/agent.proto`)
|
|
|
|
```protobuf
|
|
syntax = "proto3";
|
|
|
|
package agent.v1;
|
|
|
|
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/agent/v1;agentv1";
|
|
|
|
import "google/protobuf/timestamp.proto";
|
|
import "google/protobuf/any.proto";
|
|
|
|
// 에이전트 태스크 요청
|
|
message AgentTask {
|
|
string task_id = 1; // UUID
|
|
string agent_id = 2; // 발신 에이전트 ID
|
|
string target_agent = 3; // 수신 에이전트 ID
|
|
string task_type = 4; // "infer", "control", "query", "ota"
|
|
bytes payload = 5; // Protobuf Any 직렬화
|
|
map<string, string> metadata = 6; // X-Tenant-ID, X-Device-Class 등
|
|
google.protobuf.Timestamp deadline = 7;
|
|
}
|
|
|
|
// 에이전트 태스크 응답
|
|
message AgentResult {
|
|
string task_id = 1;
|
|
string agent_id = 2;
|
|
Status status = 3;
|
|
bytes payload = 4;
|
|
string resume_token = 5; // 스트리밍 재개 토큰
|
|
map<string, string> metadata = 6;
|
|
|
|
enum Status {
|
|
STATUS_UNSPECIFIED = 0;
|
|
STATUS_OK = 1;
|
|
STATUS_PARTIAL = 2; // 스트림 중간 청크
|
|
STATUS_ERROR = 3;
|
|
}
|
|
}
|
|
|
|
// 스트리밍 청크 (Server/Client/Bidi Streaming 공용)
|
|
message StreamChunk {
|
|
string stream_id = 1;
|
|
uint64 sequence_num = 2;
|
|
bytes data = 3;
|
|
bool is_last = 4;
|
|
string resume_token = 5;
|
|
}
|
|
|
|
// 4대 RPC 모드를 모두 지원하는 AgentService
|
|
service AgentService {
|
|
// Unary: 단일 명령/조회 (OTA 명령, 상태 조회)
|
|
rpc Execute(AgentTask) returns (AgentResult);
|
|
|
|
// Server Streaming: T1→T2 모델 업데이트, 토큰 스트리밍
|
|
rpc Subscribe(AgentTask) returns (stream StreamChunk);
|
|
|
|
// Client Streaming: T3→T2 배치 센서 데이터 업로드
|
|
rpc Upload(stream StreamChunk) returns (AgentResult);
|
|
|
|
// Bidi Streaming: T2↔T2 엣지 합의, 로봇 실시간 협업
|
|
rpc Collaborate(stream AgentTask) returns (stream AgentResult);
|
|
}
|
|
```
|
|
|
|
### 3.2 TelemetryService (`proto/telemetry/v1/telemetry.proto`)
|
|
|
|
```protobuf
|
|
syntax = "proto3";
|
|
|
|
package telemetry.v1;
|
|
|
|
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1;telemetryv1";
|
|
|
|
import "google/protobuf/timestamp.proto";
|
|
|
|
// 단일 센서 데이터 포인트
|
|
message DataPoint {
|
|
string device_id = 1;
|
|
string metric = 2; // "temperature", "vibration", "voltage" 등
|
|
double value = 3;
|
|
string unit = 4;
|
|
google.protobuf.Timestamp ts = 5;
|
|
map<string, string> labels = 6; // site, line, zone 등 레이블
|
|
}
|
|
|
|
// 배치 텔레메트리 (다수 디바이스 팬인)
|
|
message TelemetryBatch {
|
|
string source_gateway = 1;
|
|
repeated DataPoint points = 2;
|
|
string resume_token = 3; // 마지막 처리 위치
|
|
}
|
|
|
|
// 텔레메트리 수신 확인
|
|
message TelemetryAck {
|
|
string resume_token = 1; // 서버가 발급한 다음 재개 위치
|
|
uint64 accepted = 2;
|
|
uint64 rejected = 3;
|
|
string error_message = 4;
|
|
}
|
|
|
|
// 텔레메트리 쿼리
|
|
message TelemetryQuery {
|
|
string device_id = 1;
|
|
string metric = 2;
|
|
google.protobuf.Timestamp from = 3;
|
|
google.protobuf.Timestamp to = 4;
|
|
int32 limit = 5;
|
|
}
|
|
|
|
service TelemetryService {
|
|
// Client Streaming: T3 다수 센서 배치 업로드
|
|
rpc PushBatch(stream DataPoint) returns (TelemetryAck);
|
|
|
|
// Unary: 텔레메트리 배치 전송
|
|
rpc Push(TelemetryBatch) returns (TelemetryAck);
|
|
|
|
// Server Streaming: 실시간 텔레메트리 구독
|
|
rpc Stream(TelemetryQuery) returns (stream DataPoint);
|
|
}
|
|
```
|
|
|
|
### 3.3 ControlService (`proto/control/v1/control.proto`)
|
|
|
|
```protobuf
|
|
syntax = "proto3";
|
|
|
|
package control.v1;
|
|
|
|
option go_package = "github.com/your-org/edge-aiot-mas/gen/go/control/v1;controlv1";
|
|
|
|
message OTARequest {
|
|
string device_id = 1;
|
|
string firmware_url = 2;
|
|
string expected_sha256 = 3;
|
|
string version = 4;
|
|
}
|
|
|
|
message OTAStatus {
|
|
string device_id = 1;
|
|
State state = 2;
|
|
int32 progress = 3; // 0-100
|
|
string error = 4;
|
|
|
|
enum State {
|
|
STATE_UNSPECIFIED = 0;
|
|
STATE_DOWNLOADING = 1;
|
|
STATE_VERIFYING = 2;
|
|
STATE_APPLYING = 3;
|
|
STATE_COMPLETE = 4;
|
|
STATE_FAILED = 5;
|
|
}
|
|
}
|
|
|
|
message ConfigUpdate {
|
|
string device_id = 1;
|
|
bytes config = 2; // JSON or Protobuf Any
|
|
string version = 3;
|
|
}
|
|
|
|
message ConfigAck {
|
|
bool applied = 1;
|
|
string version = 2;
|
|
string error = 3;
|
|
}
|
|
|
|
service ControlService {
|
|
// Server Streaming: OTA 다운로드 진행 상황 푸시
|
|
rpc OTAUpdate(OTARequest) returns (stream OTAStatus);
|
|
|
|
// Unary: 설정 업데이트
|
|
rpc UpdateConfig(ConfigUpdate) returns (ConfigAck);
|
|
}
|
|
```
|
|
|
|
### 3.4 Buf 설정 (`proto/buf.yaml`)
|
|
|
|
```yaml
|
|
version: v2
|
|
modules:
|
|
- path: .
|
|
lint:
|
|
use:
|
|
- STANDARD
|
|
breaking:
|
|
use:
|
|
- FILE
|
|
deps:
|
|
- buf.build/googleapis/googleapis
|
|
```
|
|
|
|
---
|
|
|
|
## 4. T2 Edge Gateway 구현 설계
|
|
|
|
### 4.1 quic-go + gRPC 통합
|
|
|
|
T2 게이트웨이는 T1 방향으로 gRPC over QUIC 서버를 운영하고, T3 방향으로 MQTT 클라이언트를 유지한다.
|
|
|
|
#### `internal/gateway/server.go`
|
|
|
|
```go
|
|
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
"net/http"
|
|
|
|
"github.com/quic-go/quic-go"
|
|
"github.com/quic-go/quic-go/http3"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
|
|
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
|
|
"github.com/your-org/edge-aiot-mas/internal/gateway/interceptors"
|
|
"github.com/your-org/edge-aiot-mas/internal/security"
|
|
)
|
|
|
|
// GatewayConfig는 T2 게이트웨이 설정을 담는다.
|
|
type GatewayConfig struct {
|
|
GRPCAddr string // T1 방향 gRPC over QUIC 수신 주소 (예: ":9443")
|
|
A2AAddr string // A2A HTTP/3 수신 주소 (예: ":8443")
|
|
MQTTBroker string // T3 방향 MQTT 브로커 주소
|
|
SPIFFESocket string // SPIRE Agent 소켓 경로
|
|
}
|
|
|
|
// Gateway는 T2 엣지 게이트웨이의 핵심 컴포넌트다.
|
|
type Gateway struct {
|
|
cfg GatewayConfig
|
|
grpcServer *grpc.Server
|
|
http3Server *http3.Server
|
|
mqttClient *MQTTClient
|
|
translator *Translator
|
|
tokenMgr *ResumeTokenManager
|
|
}
|
|
|
|
// New는 Gateway를 초기화하고 반환한다.
|
|
func New(ctx context.Context, cfg GatewayConfig) (*Gateway, error) {
|
|
// 1. SPIFFE/SPIRE에서 TLS 자격증명 획득
|
|
creds, err := security.NewSPIFFECredentials(ctx, cfg.SPIFFESocket)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("SPIFFE credentials: %w", err)
|
|
}
|
|
|
|
// 2. Resume Token 관리자 초기화
|
|
tokenMgr := NewResumeTokenManager()
|
|
|
|
// 3. gRPC 서버 인터셉터 체인 구성
|
|
srv := grpc.NewServer(
|
|
grpc.Creds(creds),
|
|
grpc.ChainUnaryInterceptor(
|
|
interceptors.DeadlineEnforcer(defaultDeadlines),
|
|
interceptors.ResumeTokenUnary(tokenMgr),
|
|
interceptors.OTelUnary(),
|
|
),
|
|
grpc.ChainStreamInterceptor(
|
|
interceptors.ResumeTokenStream(tokenMgr),
|
|
interceptors.OTelStream(),
|
|
),
|
|
)
|
|
|
|
// 4. 서비스 등록
|
|
agentSvc := NewAgentServiceServer(tokenMgr)
|
|
telemetrySvc := NewTelemetryServiceServer(tokenMgr)
|
|
agentv1.RegisterAgentServiceServer(srv, agentSvc)
|
|
telemetryv1.RegisterTelemetryServiceServer(srv, telemetrySvc)
|
|
|
|
// 5. MQTT 클라이언트 초기화
|
|
mqttClient, err := NewMQTTClient(cfg.MQTTBroker)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("MQTT client: %w", err)
|
|
}
|
|
|
|
return &Gateway{
|
|
cfg: cfg,
|
|
grpcServer: srv,
|
|
mqttClient: mqttClient,
|
|
translator: NewTranslator(agentSvc, telemetrySvc),
|
|
tokenMgr: tokenMgr,
|
|
}, nil
|
|
}
|
|
|
|
// ServeGRPCOverQUIC는 quic-go 리스너 위에서 gRPC 서버를 구동한다.
|
|
func (g *Gateway) ServeGRPCOverQUIC(ctx context.Context) error {
|
|
tlsCfg := g.grpcServer.GetServiceInfo() // SPIFFE creds에서 TLS config 추출
|
|
_ = tlsCfg
|
|
|
|
// quic-go 리스너 생성
|
|
quicListener, err := quic.ListenAddr(g.cfg.GRPCAddr, &tls.Config{
|
|
NextProtos: []string{"h2", "h3"}, // gRPC는 h2, HTTP/3는 h3
|
|
MinVersion: tls.VersionTLS13,
|
|
// GetCertificate는 SPIFFE SVID에서 동적으로 제공
|
|
}, &quic.Config{
|
|
EnableDatagrams: true,
|
|
MaxIdleTimeout: 30 * 60 * 1e9, // 30분
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("quic listen: %w", err)
|
|
}
|
|
defer quicListener.Close()
|
|
|
|
// quic.Listener를 net.Listener 인터페이스로 래핑
|
|
netListener := &quicNetListener{inner: quicListener}
|
|
|
|
// gRPC 서버를 quic-go 리스너 위에서 구동
|
|
go func() {
|
|
<-ctx.Done()
|
|
g.grpcServer.GracefulStop()
|
|
}()
|
|
|
|
return g.grpcServer.Serve(netListener)
|
|
}
|
|
|
|
// quicNetListener는 quic.Listener를 net.Listener로 래핑한다.
|
|
type quicNetListener struct {
|
|
inner *quic.Listener
|
|
}
|
|
|
|
func (l *quicNetListener) Accept() (net.Conn, error) {
|
|
conn, err := l.inner.Accept(context.Background())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return &quicConn{conn: conn}, nil
|
|
}
|
|
|
|
func (l *quicNetListener) Close() error { return l.inner.Close() }
|
|
func (l *quicNetListener) Addr() net.Addr { return l.inner.Addr() }
|
|
|
|
// quicConn은 quic.Connection을 net.Conn으로 래핑한다.
|
|
// gRPC-Go는 net.Conn 인터페이스를 사용하므로 첫 번째 스트림을 단일 연결로 노출.
|
|
type quicConn struct {
|
|
conn quic.Connection
|
|
stream quic.Stream
|
|
}
|
|
|
|
func (c *quicConn) Read(b []byte) (int, error) {
|
|
if c.stream == nil {
|
|
var err error
|
|
c.stream, err = c.conn.AcceptStream(context.Background())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
return c.stream.Read(b)
|
|
}
|
|
|
|
func (c *quicConn) Write(b []byte) (int, error) {
|
|
if c.stream == nil {
|
|
return 0, fmt.Errorf("stream not initialized")
|
|
}
|
|
return c.stream.Write(b)
|
|
}
|
|
|
|
func (c *quicConn) Close() error { return c.conn.CloseWithError(0, "closed") }
|
|
func (c *quicConn) LocalAddr() net.Addr { return c.conn.LocalAddr() }
|
|
func (c *quicConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() }
|
|
func (c *quicConn) SetDeadline(t interface{}) error { return nil }
|
|
func (c *quicConn) SetReadDeadline(t interface{}) error { return nil }
|
|
func (c *quicConn) SetWriteDeadline(t interface{}) error { return nil }
|
|
|
|
// defaultDeadlines는 디바이스 클래스별 기본 deadline 정책이다.
|
|
var defaultDeadlines = map[string]int64{
|
|
"agv": 50, // ms
|
|
"robot": 100,
|
|
"sensor": 5000,
|
|
"default": 3000,
|
|
}
|
|
```
|
|
|
|
> **참고**: 실제 quic-go와 gRPC-Go 통합 시 `grpc.WithContextDialer`를 사용하여 클라이언트 측에서 QUIC 다이얼러를 주입하는 방식이 더 실용적이다. 위 코드는 서버 측 리스너 래핑 패턴을 보여준다. 프로덕션에서는 `quic-go/http3` 패키지의 `ServeListener` 방식 또는 별도 gRPC-QUIC 어댑터 라이브러리(`github.com/open-telemetry/opentelemetry-collector-contrib` 참조)를 활용한다.
|
|
|
|
### 4.2 MQTT→gRPC 변환 엔진
|
|
|
|
#### MQTT Topic 네이밍 규칙
|
|
|
|
```
|
|
telemetry/{site}/{device_id}/{metric} → TelemetryService.Push
|
|
control/{site}/{device_id}/ota → ControlService.OTAUpdate
|
|
agent/{site}/{device_id}/task → AgentService.Execute
|
|
```
|
|
|
|
#### `internal/gateway/translator.go`
|
|
|
|
```go
|
|
package gateway
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strings"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"google.golang.org/protobuf/proto"
|
|
|
|
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
|
|
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
)
|
|
|
|
// TopicRoute는 MQTT topic 패턴과 핸들러 함수의 매핑이다.
|
|
type TopicRoute struct {
|
|
Pattern string
|
|
Handler func(ctx context.Context, topic string, payload []byte) error
|
|
}
|
|
|
|
// Translator는 MQTT 메시지를 gRPC 호출로 변환한다.
|
|
type Translator struct {
|
|
agentSvc agentv1.AgentServiceServer
|
|
telemetrySvc telemetryv1.TelemetryServiceServer
|
|
routes []TopicRoute
|
|
propagator propagation.TextMapPropagator
|
|
}
|
|
|
|
// NewTranslator는 변환 엔진을 초기화한다.
|
|
func NewTranslator(
|
|
agentSvc agentv1.AgentServiceServer,
|
|
telemetrySvc telemetryv1.TelemetryServiceServer,
|
|
) *Translator {
|
|
t := &Translator{
|
|
agentSvc: agentSvc,
|
|
telemetrySvc: telemetrySvc,
|
|
propagator: otel.GetTextMapPropagator(),
|
|
}
|
|
t.registerRoutes()
|
|
return t
|
|
}
|
|
|
|
// registerRoutes는 MQTT topic 패턴별 핸들러를 등록한다.
|
|
func (t *Translator) registerRoutes() {
|
|
t.routes = []TopicRoute{
|
|
{
|
|
Pattern: "telemetry/+/+/+",
|
|
Handler: t.handleTelemetry,
|
|
},
|
|
{
|
|
Pattern: "agent/+/+/task",
|
|
Handler: t.handleAgentTask,
|
|
},
|
|
}
|
|
}
|
|
|
|
// HandleMessage는 MQTT 메시지를 수신하고 라우팅한다.
|
|
func (t *Translator) HandleMessage(_ mqtt.Client, msg mqtt.Message) {
|
|
topic := msg.Topic()
|
|
payload := msg.Payload()
|
|
|
|
// MQTT UserProperties에서 OTel trace context 추출
|
|
ctx := context.Background()
|
|
// MQTT 5.0 UserProperties를 통한 traceparent 전파는
|
|
// paho.mqtt.golang v2(MQTT 5.0 지원)에서 msg.Properties()로 접근
|
|
|
|
for _, route := range t.routes {
|
|
if matchTopic(route.Pattern, topic) {
|
|
if err := route.Handler(ctx, topic, payload); err != nil {
|
|
// 에러 로깅 (OTel span에 기록)
|
|
fmt.Printf("translation error [%s]: %v\n", topic, err)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleTelemetry는 telemetry/{site}/{device}/{metric} 토픽을 처리한다.
|
|
func (t *Translator) handleTelemetry(ctx context.Context, topic string, payload []byte) error {
|
|
parts := strings.Split(topic, "/")
|
|
if len(parts) != 4 {
|
|
return fmt.Errorf("invalid telemetry topic: %s", topic)
|
|
}
|
|
site, deviceID, metric := parts[1], parts[2], parts[3]
|
|
|
|
// JSON 페이로드 파싱 (T3 디바이스는 JSON으로 발행)
|
|
var raw struct {
|
|
Value float64 `json:"v"`
|
|
Unit string `json:"u"`
|
|
TS int64 `json:"ts"` // Unix milliseconds
|
|
}
|
|
if err := json.Unmarshal(payload, &raw); err != nil {
|
|
return fmt.Errorf("json unmarshal: %w", err)
|
|
}
|
|
|
|
// Protobuf DataPoint 생성
|
|
dp := &telemetryv1.DataPoint{
|
|
DeviceId: deviceID,
|
|
Metric: metric,
|
|
Value: raw.Value,
|
|
Unit: raw.Unit,
|
|
Labels: map[string]string{"site": site},
|
|
}
|
|
if raw.TS > 0 {
|
|
ts := time.UnixMilli(raw.TS)
|
|
_ = ts // timestamppb 변환 후 dp.Ts에 할당
|
|
}
|
|
|
|
// TelemetryService.Push를 내부 호출
|
|
batch := &telemetryv1.TelemetryBatch{
|
|
SourceGateway: "t2-gateway",
|
|
Points: []*telemetryv1.DataPoint{dp},
|
|
}
|
|
_, err := t.telemetrySvc.(*TelemetryServiceServer).pushBatch(ctx, batch)
|
|
return err
|
|
}
|
|
|
|
// handleAgentTask는 agent/{site}/{device}/task 토픽을 처리한다.
|
|
func (t *Translator) handleAgentTask(ctx context.Context, topic string, payload []byte) error {
|
|
parts := strings.Split(topic, "/")
|
|
if len(parts) != 4 {
|
|
return fmt.Errorf("invalid agent topic: %s", topic)
|
|
}
|
|
deviceID := parts[2]
|
|
|
|
var task agentv1.AgentTask
|
|
if err := proto.Unmarshal(payload, &task); err != nil {
|
|
// JSON fallback
|
|
if err2 := json.Unmarshal(payload, &task); err2 != nil {
|
|
return fmt.Errorf("unmarshal agent task: %w", err)
|
|
}
|
|
}
|
|
task.AgentId = deviceID
|
|
|
|
_, err := t.agentSvc.Execute(ctx, &task)
|
|
return err
|
|
}
|
|
|
|
// matchTopic는 MQTT 와일드카드(+, #) 패턴 매칭을 수행한다.
|
|
func matchTopic(pattern, topic string) bool {
|
|
pp := strings.Split(pattern, "/")
|
|
tp := strings.Split(topic, "/")
|
|
if len(pp) != len(tp) {
|
|
if len(pp) > 0 && pp[len(pp)-1] == "#" {
|
|
return strings.HasPrefix(topic, strings.Join(pp[:len(pp)-1], "/"))
|
|
}
|
|
return false
|
|
}
|
|
for i, p := range pp {
|
|
if p != "+" && p != tp[i] {
|
|
return false
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
```
|
|
|
|
### 4.3 Resume Token 구현
|
|
|
|
Resume Token은 스트리밍 RPC가 중단되었을 때 재연결 후 중단 위치부터 이어받기 위한 서버 발급 토큰이다.
|
|
|
|
#### `internal/gateway/resume_token.go`
|
|
|
|
```go
|
|
package gateway
|
|
|
|
import (
|
|
"crypto/hmac"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// ResumeState는 스트리밍 재개 상태를 표현한다.
|
|
type ResumeState struct {
|
|
StreamID string `json:"sid"`
|
|
SequenceNum uint64 `json:"seq"`
|
|
DeviceID string `json:"did"`
|
|
Timestamp int64 `json:"ts"`
|
|
TraceParent string `json:"tp,omitempty"` // W3C traceparent
|
|
Extra map[string]string `json:"extra,omitempty"`
|
|
}
|
|
|
|
// ResumeTokenManager는 Resume Token을 발급하고 검증한다.
|
|
type ResumeTokenManager struct {
|
|
secret []byte
|
|
store sync.Map // streamID → *ResumeState
|
|
}
|
|
|
|
// NewResumeTokenManager는 새 관리자를 초기화한다.
|
|
func NewResumeTokenManager() *ResumeTokenManager {
|
|
// 프로덕션에서는 SPIFFE SVID 또는 환경 변수에서 시크릿 로드
|
|
return &ResumeTokenManager{
|
|
secret: []byte("change-me-in-production"),
|
|
}
|
|
}
|
|
|
|
// Issue는 현재 스트리밍 상태를 HMAC 서명된 토큰으로 발급한다.
|
|
func (m *ResumeTokenManager) Issue(state ResumeState) (string, error) {
|
|
state.Timestamp = time.Now().Unix()
|
|
data, err := json.Marshal(state)
|
|
if err != nil {
|
|
return "", fmt.Errorf("marshal state: %w", err)
|
|
}
|
|
|
|
// HMAC-SHA256 서명
|
|
mac := hmac.New(sha256.New, m.secret)
|
|
mac.Write(data)
|
|
sig := mac.Sum(nil)
|
|
|
|
// base64url(data) + "." + base64url(sig)
|
|
token := base64.RawURLEncoding.EncodeToString(data) +
|
|
"." +
|
|
base64.RawURLEncoding.EncodeToString(sig)
|
|
|
|
// 상태 저장 (메모리 캐시)
|
|
m.store.Store(state.StreamID, &state)
|
|
|
|
return token, nil
|
|
}
|
|
|
|
// Verify는 토큰을 검증하고 ResumeState를 반환한다.
|
|
func (m *ResumeTokenManager) Verify(token string) (*ResumeState, error) {
|
|
parts := splitOnLast(token, ".")
|
|
if len(parts) != 2 {
|
|
return nil, fmt.Errorf("invalid token format")
|
|
}
|
|
|
|
data, err := base64.RawURLEncoding.DecodeString(parts[0])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decode data: %w", err)
|
|
}
|
|
sig, err := base64.RawURLEncoding.DecodeString(parts[1])
|
|
if err != nil {
|
|
return nil, fmt.Errorf("decode sig: %w", err)
|
|
}
|
|
|
|
// HMAC 검증
|
|
mac := hmac.New(sha256.New, m.secret)
|
|
mac.Write(data)
|
|
expected := mac.Sum(nil)
|
|
if !hmac.Equal(sig, expected) {
|
|
return nil, fmt.Errorf("invalid token signature")
|
|
}
|
|
|
|
var state ResumeState
|
|
if err := json.Unmarshal(data, &state); err != nil {
|
|
return nil, fmt.Errorf("unmarshal state: %w", err)
|
|
}
|
|
|
|
// 24시간 만료 체크
|
|
if time.Now().Unix()-state.Timestamp > 86400 {
|
|
return nil, fmt.Errorf("token expired")
|
|
}
|
|
|
|
return &state, nil
|
|
}
|
|
|
|
func splitOnLast(s, sep string) []string {
|
|
idx := len(s) - 1
|
|
for idx >= 0 && string(s[idx]) != sep {
|
|
idx--
|
|
}
|
|
if idx < 0 {
|
|
return []string{s}
|
|
}
|
|
return []string{s[:idx], s[idx+1:]}
|
|
}
|
|
```
|
|
|
|
#### Resume Token gRPC Interceptor (`internal/gateway/interceptors/resume.go`)
|
|
|
|
```go
|
|
package interceptors
|
|
|
|
import (
|
|
"context"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
|
|
"github.com/your-org/edge-aiot-mas/internal/gateway"
|
|
)
|
|
|
|
const resumeTokenMetaKey = "x-resume-token"
|
|
|
|
// ResumeTokenUnary는 Unary RPC에서 Resume Token을 처리하는 인터셉터다.
|
|
func ResumeTokenUnary(mgr *gateway.ResumeTokenManager) grpc.UnaryServerInterceptor {
|
|
return func(
|
|
ctx context.Context,
|
|
req interface{},
|
|
info *grpc.UnaryServerInfo,
|
|
handler grpc.UnaryHandler,
|
|
) (interface{}, error) {
|
|
ctx = injectResumeState(ctx, mgr)
|
|
resp, err := handler(ctx, req)
|
|
return resp, err
|
|
}
|
|
}
|
|
|
|
// ResumeTokenStream는 Streaming RPC에서 Resume Token을 처리하는 인터셉터다.
|
|
func ResumeTokenStream(mgr *gateway.ResumeTokenManager) grpc.StreamServerInterceptor {
|
|
return func(
|
|
srv interface{},
|
|
ss grpc.ServerStream,
|
|
info *grpc.StreamServerInfo,
|
|
handler grpc.StreamHandler,
|
|
) error {
|
|
// 클라이언트가 재연결 시 보내는 Resume Token 검증
|
|
ctx := ss.Context()
|
|
if md, ok := metadata.FromIncomingContext(ctx); ok {
|
|
if tokens := md.Get(resumeTokenMetaKey); len(tokens) > 0 {
|
|
state, err := mgr.Verify(tokens[0])
|
|
if err == nil {
|
|
// 검증된 상태를 context에 주입
|
|
ctx = context.WithValue(ctx, resumeStateKey{}, state)
|
|
ss = &resumeServerStream{ServerStream: ss, ctx: ctx}
|
|
}
|
|
}
|
|
}
|
|
return handler(srv, ss)
|
|
}
|
|
}
|
|
|
|
// injectResumeState는 incoming metadata에서 Resume Token을 추출하고 context에 주입한다.
|
|
func injectResumeState(ctx context.Context, mgr *gateway.ResumeTokenManager) context.Context {
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if !ok {
|
|
return ctx
|
|
}
|
|
tokens := md.Get(resumeTokenMetaKey)
|
|
if len(tokens) == 0 {
|
|
return ctx
|
|
}
|
|
state, err := mgr.Verify(tokens[0])
|
|
if err != nil {
|
|
return ctx
|
|
}
|
|
return context.WithValue(ctx, resumeStateKey{}, state)
|
|
}
|
|
|
|
// GetResumeState는 context에서 ResumeState를 추출한다.
|
|
func GetResumeState(ctx context.Context) (*gateway.ResumeState, bool) {
|
|
state, ok := ctx.Value(resumeStateKey{}).(*gateway.ResumeState)
|
|
return state, ok
|
|
}
|
|
|
|
type resumeStateKey struct{}
|
|
|
|
// resumeServerStream은 context를 오버라이드한 ServerStream 래퍼다.
|
|
type resumeServerStream struct {
|
|
grpc.ServerStream
|
|
ctx context.Context
|
|
}
|
|
|
|
func (s *resumeServerStream) Context() context.Context { return s.ctx }
|
|
```
|
|
|
|
### 4.4 A2A HTTP/3 엔드포인트
|
|
|
|
#### `internal/gateway/a2a_handler.go`
|
|
|
|
```go
|
|
package gateway
|
|
|
|
import (
|
|
"encoding/json"
|
|
"net/http"
|
|
|
|
"github.com/quic-go/quic-go/http3"
|
|
)
|
|
|
|
// A2ATask는 Google A2A 프로토콜의 태스크 요청 구조체다.
|
|
type A2ATask struct {
|
|
ID string `json:"id"`
|
|
Message A2AMessage `json:"message"`
|
|
Metadata map[string]any `json:"metadata,omitempty"`
|
|
}
|
|
|
|
// A2AMessage는 A2A 메시지 구조체다.
|
|
type A2AMessage struct {
|
|
Role string `json:"role"`
|
|
Parts []A2APart `json:"parts"`
|
|
}
|
|
|
|
// A2APart는 A2A 메시지의 단일 파트(텍스트, 데이터 등)다.
|
|
type A2APart struct {
|
|
Type string `json:"type"`
|
|
Text string `json:"text,omitempty"`
|
|
MimeType string `json:"mimeType,omitempty"`
|
|
Data []byte `json:"data,omitempty"`
|
|
}
|
|
|
|
// ServeA2AHTTP3는 A2A HTTP/3 서버를 구동한다.
|
|
func (g *Gateway) ServeA2AHTTP3() error {
|
|
mux := http.NewServeMux()
|
|
|
|
// A2A 에이전트 카드 (에이전트 능력 공개)
|
|
mux.HandleFunc("/.well-known/agent.json", g.handleAgentCard)
|
|
|
|
// A2A 태스크 수신 엔드포인트
|
|
mux.HandleFunc("/tasks/send", g.handleTaskSend)
|
|
mux.HandleFunc("/tasks/sendSubscribe", g.handleTaskSendSubscribe)
|
|
mux.HandleFunc("/tasks/get", g.handleTaskGet)
|
|
|
|
server := &http3.Server{
|
|
Addr: g.cfg.A2AAddr,
|
|
Handler: mux,
|
|
}
|
|
|
|
return server.ListenAndServeTLS("", "") // TLS config는 SPIFFE에서 주입
|
|
}
|
|
|
|
// handleAgentCard는 에이전트의 능력을 JSON으로 반환한다.
|
|
func (g *Gateway) handleAgentCard(w http.ResponseWriter, r *http.Request) {
|
|
card := map[string]any{
|
|
"name": "T2-Edge-Gateway",
|
|
"description": "엣지 AIoT T2 게이트웨이 에이전트",
|
|
"url": "https://" + g.cfg.A2AAddr,
|
|
"version": "1.0.0",
|
|
"capabilities": map[string]any{
|
|
"streaming": true,
|
|
"pushNotifications": false,
|
|
},
|
|
"skills": []map[string]any{
|
|
{
|
|
"id": "telemetry_query",
|
|
"name": "텔레메트리 조회",
|
|
"description": "T3 디바이스 텔레메트리 데이터를 조회한다",
|
|
},
|
|
{
|
|
"id": "device_control",
|
|
"name": "디바이스 제어",
|
|
"description": "T3 디바이스에 제어 명령을 전달한다",
|
|
},
|
|
},
|
|
}
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(card)
|
|
}
|
|
|
|
// handleTaskSend는 A2A 태스크를 수신하고 처리한다.
|
|
func (g *Gateway) handleTaskSend(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != http.MethodPost {
|
|
http.Error(w, "method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var task A2ATask
|
|
if err := json.NewDecoder(r.Body).Decode(&task); err != nil {
|
|
http.Error(w, "invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 태스크를 gRPC AgentService.Execute로 위임
|
|
ctx := r.Context()
|
|
result, err := g.processA2ATask(ctx, &task)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
|
|
// handleTaskSendSubscribe는 SSE 스트리밍으로 태스크 상태를 푸시한다.
|
|
func (g *Gateway) handleTaskSendSubscribe(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "text/event-stream")
|
|
w.Header().Set("Cache-Control", "no-cache")
|
|
|
|
// SSE 스트리밍 구현 (생략)
|
|
flusher, ok := w.(http.Flusher)
|
|
if !ok {
|
|
http.Error(w, "streaming not supported", http.StatusInternalServerError)
|
|
return
|
|
}
|
|
_ = flusher
|
|
// TODO: gRPC Bidi 스트림 → SSE 변환
|
|
}
|
|
|
|
func (g *Gateway) processA2ATask(ctx interface{}, task *A2ATask) (map[string]any, error) {
|
|
// 구현: task.Message.Parts에서 요청 추출 → gRPC 호출 → 결과 반환
|
|
return map[string]any{
|
|
"id": task.ID,
|
|
"status": map[string]string{"state": "completed"},
|
|
}, nil
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 5. T1 Orchestrator 구현 설계
|
|
|
|
### 5.1 Python LangGraph 에이전트 + A2A 클라이언트
|
|
|
|
#### `python/orchestrator/agent.py`
|
|
|
|
```python
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import os
|
|
from typing import Annotated, TypedDict
|
|
|
|
import httpx
|
|
from langchain_core.messages import AIMessage, HumanMessage, SystemMessage
|
|
from langchain_openai import ChatOpenAI
|
|
from langgraph.checkpoint.memory import MemorySaver
|
|
from langgraph.graph import END, StateGraph
|
|
from langgraph.graph.message import add_messages
|
|
|
|
from .a2a_client import A2AClient
|
|
from .tools.telemetry_tool import TelemetryQueryTool
|
|
from .tools.control_tool import DeviceControlTool
|
|
|
|
|
|
class OrchestratorState(TypedDict):
|
|
"""오케스트레이터 에이전트 상태."""
|
|
messages: Annotated[list, add_messages]
|
|
site_id: str
|
|
active_tasks: dict[str, str] # task_id → status
|
|
|
|
|
|
# 에이전트가 사용할 도구 목록
|
|
tools = [TelemetryQueryTool(), DeviceControlTool()]
|
|
|
|
# LLM 초기화 (모델은 환경 변수로 교체 가능)
|
|
llm = ChatOpenAI(
|
|
model=os.getenv("LLM_MODEL", "gpt-4o"),
|
|
temperature=0,
|
|
).bind_tools(tools)
|
|
|
|
|
|
def should_continue(state: OrchestratorState) -> str:
|
|
"""도구 호출 여부에 따라 다음 노드를 결정한다."""
|
|
last = state["messages"][-1]
|
|
if isinstance(last, AIMessage) and last.tool_calls:
|
|
return "tools"
|
|
return END
|
|
|
|
|
|
async def call_model(state: OrchestratorState) -> OrchestratorState:
|
|
"""LLM을 호출하고 응답을 상태에 추가한다."""
|
|
system = SystemMessage(content=(
|
|
"당신은 엣지 AIoT 오케스트레이터입니다. "
|
|
"T2 게이트웨이 에이전트를 통해 현장 디바이스를 조율합니다. "
|
|
f"현재 사이트: {state['site_id']}"
|
|
))
|
|
messages = [system] + state["messages"]
|
|
response = await llm.ainvoke(messages)
|
|
return {"messages": [response]}
|
|
|
|
|
|
async def call_tools(state: OrchestratorState) -> OrchestratorState:
|
|
"""도구를 실행하고 결과를 상태에 추가한다."""
|
|
from langchain_core.messages import ToolMessage
|
|
|
|
last = state["messages"][-1]
|
|
results = []
|
|
for tool_call in last.tool_calls:
|
|
# 도구 이름으로 실제 도구 인스턴스 검색
|
|
tool = next((t for t in tools if t.name == tool_call["name"]), None)
|
|
if tool is None:
|
|
content = f"Unknown tool: {tool_call['name']}"
|
|
else:
|
|
try:
|
|
content = await tool.arun(tool_call["args"])
|
|
except Exception as e:
|
|
content = f"Tool error: {e}"
|
|
|
|
results.append(ToolMessage(
|
|
content=str(content),
|
|
tool_call_id=tool_call["id"],
|
|
))
|
|
return {"messages": results}
|
|
|
|
|
|
def build_graph() -> StateGraph:
|
|
"""LangGraph 오케스트레이터 그래프를 빌드한다."""
|
|
builder = StateGraph(OrchestratorState)
|
|
|
|
builder.add_node("agent", call_model)
|
|
builder.add_node("tools", call_tools)
|
|
|
|
builder.set_entry_point("agent")
|
|
builder.add_conditional_edges("agent", should_continue)
|
|
builder.add_edge("tools", "agent")
|
|
|
|
return builder.compile(checkpointer=MemorySaver())
|
|
|
|
|
|
# 그래프 싱글턴
|
|
graph = build_graph()
|
|
|
|
|
|
async def run_orchestrator(user_input: str, site_id: str, thread_id: str) -> str:
|
|
"""오케스트레이터를 실행하고 최종 응답을 반환한다."""
|
|
config = {"configurable": {"thread_id": thread_id}}
|
|
initial_state: OrchestratorState = {
|
|
"messages": [HumanMessage(content=user_input)],
|
|
"site_id": site_id,
|
|
"active_tasks": {},
|
|
}
|
|
final_state = await graph.ainvoke(initial_state, config=config)
|
|
last_message = final_state["messages"][-1]
|
|
return last_message.content if hasattr(last_message, "content") else str(last_message)
|
|
```
|
|
|
|
#### `python/orchestrator/a2a_client.py`
|
|
|
|
```python
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import json
|
|
import uuid
|
|
from typing import AsyncIterator
|
|
|
|
import httpx
|
|
|
|
|
|
class A2AClient:
|
|
"""T2 Edge Gateway의 A2A HTTP/3 엔드포인트를 호출하는 클라이언트."""
|
|
|
|
def __init__(self, base_url: str, timeout: float = 30.0):
|
|
self.base_url = base_url.rstrip("/")
|
|
# HTTP/3(QUIC) 지원을 위해 httpx + h3 백엔드 사용
|
|
# 실험 환경에서는 HTTP/2로 폴백 가능
|
|
self._client = httpx.AsyncClient(
|
|
http2=True,
|
|
timeout=timeout,
|
|
verify=False, # 개발 환경 — 프로덕션에서는 SPIFFE SVID CA 적용
|
|
)
|
|
|
|
async def send_task(self, message: str, skill_id: str | None = None) -> dict:
|
|
"""태스크를 T2 게이트웨이로 전송하고 결과를 반환한다."""
|
|
task = {
|
|
"id": str(uuid.uuid4()),
|
|
"message": {
|
|
"role": "user",
|
|
"parts": [{"type": "text", "text": message}],
|
|
},
|
|
"metadata": {"skill_id": skill_id} if skill_id else {},
|
|
}
|
|
|
|
resp = await self._client.post(
|
|
f"{self.base_url}/tasks/send",
|
|
json=task,
|
|
headers={"Content-Type": "application/json"},
|
|
)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
async def send_task_streaming(
|
|
self, message: str, skill_id: str | None = None
|
|
) -> AsyncIterator[dict]:
|
|
"""SSE 스트리밍으로 태스크 진행 상태를 수신한다."""
|
|
task = {
|
|
"id": str(uuid.uuid4()),
|
|
"message": {
|
|
"role": "user",
|
|
"parts": [{"type": "text", "text": message}],
|
|
},
|
|
}
|
|
|
|
async with self._client.stream(
|
|
"POST",
|
|
f"{self.base_url}/tasks/sendSubscribe",
|
|
json=task,
|
|
) as resp:
|
|
resp.raise_for_status()
|
|
async for line in resp.aiter_lines():
|
|
if line.startswith("data: "):
|
|
data = json.loads(line[6:])
|
|
yield data
|
|
|
|
async def get_agent_card(self) -> dict:
|
|
"""T2 게이트웨이의 에이전트 카드를 조회한다."""
|
|
resp = await self._client.get(f"{self.base_url}/.well-known/agent.json")
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
async def close(self):
|
|
await self._client.aclose()
|
|
```
|
|
|
|
### 5.2 Go gRPC-over-QUIC 클라이언트 코드
|
|
|
|
#### `internal/cloud/grpc_quic_client.go`
|
|
|
|
```go
|
|
package cloud
|
|
|
|
import (
|
|
"context"
|
|
"crypto/tls"
|
|
"fmt"
|
|
"net"
|
|
|
|
"github.com/quic-go/quic-go"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials"
|
|
|
|
agentv1 "github.com/your-org/edge-aiot-mas/gen/go/agent/v1"
|
|
telemetryv1 "github.com/your-org/edge-aiot-mas/gen/go/telemetry/v1"
|
|
)
|
|
|
|
// QUICDialer는 quic-go를 사용하여 gRPC 연결을 수립하는 다이얼러다.
|
|
type QUICDialer struct {
|
|
tlsConfig *tls.Config
|
|
}
|
|
|
|
// NewQUICDialer는 TLS 설정으로 QUIC 다이얼러를 초기화한다.
|
|
func NewQUICDialer(tlsConfig *tls.Config) *QUICDialer {
|
|
return &QUICDialer{tlsConfig: tlsConfig}
|
|
}
|
|
|
|
// DialContext는 gRPC의 WithContextDialer에 전달될 다이얼 함수다.
|
|
func (d *QUICDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) {
|
|
// QUIC connection 수립
|
|
conn, err := quic.DialAddr(ctx, addr, d.tlsConfig, &quic.Config{
|
|
EnableDatagrams: true,
|
|
MaxIdleTimeout: 30 * 60 * 1e9,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("quic dial %s: %w", addr, err)
|
|
}
|
|
|
|
// 첫 번째 스트림 개방 (gRPC framing에 사용)
|
|
stream, err := conn.OpenStreamSync(ctx)
|
|
if err != nil {
|
|
conn.CloseWithError(0, "stream open failed")
|
|
return nil, fmt.Errorf("open quic stream: %w", err)
|
|
}
|
|
|
|
return &quicStreamConn{
|
|
conn: conn,
|
|
stream: stream,
|
|
}, nil
|
|
}
|
|
|
|
// quicStreamConn은 QUIC 스트림을 net.Conn으로 노출한다.
|
|
type quicStreamConn struct {
|
|
conn quic.Connection
|
|
stream quic.Stream
|
|
}
|
|
|
|
func (c *quicStreamConn) Read(b []byte) (int, error) { return c.stream.Read(b) }
|
|
func (c *quicStreamConn) Write(b []byte) (int, error) { return c.stream.Write(b) }
|
|
func (c *quicStreamConn) Close() error {
|
|
c.stream.Close()
|
|
return c.conn.CloseWithError(0, "client closed")
|
|
}
|
|
func (c *quicStreamConn) LocalAddr() net.Addr { return c.conn.LocalAddr() }
|
|
func (c *quicStreamConn) RemoteAddr() net.Addr { return c.conn.RemoteAddr() }
|
|
func (c *quicStreamConn) SetDeadline(t interface{}) error { return nil }
|
|
func (c *quicStreamConn) SetReadDeadline(t interface{}) error { return nil }
|
|
func (c *quicStreamConn) SetWriteDeadline(t interface{}) error { return nil }
|
|
|
|
// T1Client는 T1 클라우드에서 T2 게이트웨이로 연결하는 gRPC 클라이언트다.
|
|
type T1Client struct {
|
|
conn *grpc.ClientConn
|
|
agentClient agentv1.AgentServiceClient
|
|
telemetryClient telemetryv1.TelemetryServiceClient
|
|
}
|
|
|
|
// NewT1Client는 T2 게이트웨이에 gRPC over QUIC 연결을 수립한다.
|
|
func NewT1Client(ctx context.Context, gatewayAddr string, tlsConfig *tls.Config) (*T1Client, error) {
|
|
dialer := NewQUICDialer(tlsConfig)
|
|
|
|
conn, err := grpc.DialContext(
|
|
ctx,
|
|
gatewayAddr,
|
|
grpc.WithContextDialer(dialer.DialContext),
|
|
grpc.WithTransportCredentials(credentials.NewTLS(tlsConfig)),
|
|
// Retry policy: 무선 손실 시 지수 백오프
|
|
grpc.WithDefaultServiceConfig(`{
|
|
"methodConfig": [{
|
|
"name": [{}],
|
|
"retryPolicy": {
|
|
"maxAttempts": 5,
|
|
"initialBackoff": "0.5s",
|
|
"maxBackoff": "30s",
|
|
"backoffMultiplier": 2,
|
|
"retryableStatusCodes": ["UNAVAILABLE", "DEADLINE_EXCEEDED"]
|
|
}
|
|
}]
|
|
}`),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("grpc dial: %w", err)
|
|
}
|
|
|
|
return &T1Client{
|
|
conn: conn,
|
|
agentClient: agentv1.NewAgentServiceClient(conn),
|
|
telemetryClient: telemetryv1.NewTelemetryServiceClient(conn),
|
|
}, nil
|
|
}
|
|
|
|
// ExecuteTask는 T2 게이트웨이에 AgentTask를 전송한다.
|
|
func (c *T1Client) ExecuteTask(ctx context.Context, task *agentv1.AgentTask) (*agentv1.AgentResult, error) {
|
|
return c.agentClient.Execute(ctx, task)
|
|
}
|
|
|
|
// StreamTelemetry는 T2 게이트웨이에서 텔레메트리 스트림을 구독한다.
|
|
func (c *T1Client) StreamTelemetry(
|
|
ctx context.Context,
|
|
query *telemetryv1.TelemetryQuery,
|
|
handler func(*telemetryv1.DataPoint) error,
|
|
) error {
|
|
stream, err := c.telemetryClient.Stream(ctx, query)
|
|
if err != nil {
|
|
return fmt.Errorf("stream start: %w", err)
|
|
}
|
|
|
|
for {
|
|
dp, err := stream.Recv()
|
|
if err != nil {
|
|
return fmt.Errorf("stream recv: %w", err)
|
|
}
|
|
if err := handler(dp); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close는 gRPC 연결을 종료한다.
|
|
func (c *T1Client) Close() error { return c.conn.Close() }
|
|
```
|
|
|
|
---
|
|
|
|
## 6. T3 Field Agent 구현 설계
|
|
|
|
### 6.1 MQTT 텔레메트리 발행
|
|
|
|
#### `internal/field/mqtt_publisher.go`
|
|
|
|
```go
|
|
package field
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
)
|
|
|
|
// TelemetryPayload는 T3 디바이스가 발행하는 텔레메트리 JSON 구조체다.
|
|
type TelemetryPayload struct {
|
|
Value float64 `json:"v"`
|
|
Unit string `json:"u"`
|
|
TS int64 `json:"ts"` // Unix milliseconds
|
|
Props map[string]string `json:"p,omitempty"` // MQTT 5.0 UserProperties 대체
|
|
}
|
|
|
|
// FieldConfig는 T3 Field Agent 설정을 담는다.
|
|
type FieldConfig struct {
|
|
DeviceID string
|
|
SiteID string
|
|
MQTTBroker string
|
|
MQTTPort int
|
|
OfflineDB string // BoltDB 파일 경로
|
|
}
|
|
|
|
// FieldAgent는 T3 현장 에이전트다.
|
|
type FieldAgent struct {
|
|
cfg FieldConfig
|
|
mqttClient mqtt.Client
|
|
offlineQueue *OfflineQueue
|
|
tracer interface{} // OTel tracer
|
|
}
|
|
|
|
// NewFieldAgent는 Field Agent를 초기화한다.
|
|
func NewFieldAgent(cfg FieldConfig) (*FieldAgent, error) {
|
|
// MQTT 클라이언트 옵션 설정
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(fmt.Sprintf("tcp://%s:%d", cfg.MQTTBroker, cfg.MQTTPort)).
|
|
SetClientID(fmt.Sprintf("field-%s", cfg.DeviceID)).
|
|
SetAutoReconnect(true).
|
|
SetConnectRetry(true).
|
|
SetConnectRetryInterval(5 * time.Second).
|
|
SetMaxReconnectInterval(60 * time.Second).
|
|
SetCleanSession(false). // 오프라인 중 QoS 1 메시지 보존
|
|
SetResumeSubs(true)
|
|
|
|
// 연결 로스트 핸들러
|
|
opts.SetConnectionLostHandler(func(client mqtt.Client, err error) {
|
|
fmt.Printf("[%s] MQTT connection lost: %v\n", cfg.DeviceID, err)
|
|
})
|
|
|
|
// 재연결 핸들러
|
|
opts.SetOnConnectHandler(func(client mqtt.Client) {
|
|
fmt.Printf("[%s] MQTT reconnected\n", cfg.DeviceID)
|
|
})
|
|
|
|
client := mqtt.NewClient(opts)
|
|
if token := client.Connect(); token.Wait() && token.Error() != nil {
|
|
return nil, fmt.Errorf("mqtt connect: %w", token.Error())
|
|
}
|
|
|
|
offlineQueue, err := NewOfflineQueue(cfg.OfflineDB)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("offline queue: %w", err)
|
|
}
|
|
|
|
return &FieldAgent{
|
|
cfg: cfg,
|
|
mqttClient: client,
|
|
offlineQueue: offlineQueue,
|
|
}, nil
|
|
}
|
|
|
|
// PublishTelemetry는 텔레메트리 데이터를 MQTT로 발행한다.
|
|
// 오프라인 상태라면 로컬 큐에 저장한다.
|
|
func (a *FieldAgent) PublishTelemetry(ctx context.Context, metric string, value float64, unit string) error {
|
|
topic := fmt.Sprintf("telemetry/%s/%s/%s", a.cfg.SiteID, a.cfg.DeviceID, metric)
|
|
|
|
// OTel trace context를 페이로드 Props에 포함
|
|
carrier := propagation.MapCarrier{}
|
|
otel.GetTextMapPropagator().Inject(ctx, carrier)
|
|
|
|
payload := TelemetryPayload{
|
|
Value: value,
|
|
Unit: unit,
|
|
TS: time.Now().UnixMilli(),
|
|
Props: carrier, // traceparent 등 포함
|
|
}
|
|
|
|
data, err := json.Marshal(payload)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal payload: %w", err)
|
|
}
|
|
|
|
// MQTT 연결 상태 확인
|
|
if !a.mqttClient.IsConnected() {
|
|
// 오프라인 큐에 저장
|
|
return a.offlineQueue.Enqueue(topic, data)
|
|
}
|
|
|
|
// QoS 1로 발행 (전달 보장)
|
|
token := a.mqttClient.Publish(topic, 1, false, data)
|
|
token.Wait()
|
|
if err := token.Error(); err != nil {
|
|
// 발행 실패 시 오프라인 큐로 폴백
|
|
if qErr := a.offlineQueue.Enqueue(topic, data); qErr != nil {
|
|
return fmt.Errorf("publish failed and queue failed: %v, %v", err, qErr)
|
|
}
|
|
return nil // 큐에 저장 성공
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// FlushOfflineQueue는 재연결 후 오프라인 큐를 드레인한다.
|
|
func (a *FieldAgent) FlushOfflineQueue(ctx context.Context) error {
|
|
if !a.mqttClient.IsConnected() {
|
|
return fmt.Errorf("not connected")
|
|
}
|
|
|
|
return a.offlineQueue.Drain(func(topic string, data []byte) error {
|
|
token := a.mqttClient.Publish(topic, 1, false, data)
|
|
token.Wait()
|
|
return token.Error()
|
|
})
|
|
}
|
|
```
|
|
|
|
### 6.2 오프라인 큐 (BoltDB)
|
|
|
|
#### `internal/field/offline_queue.go`
|
|
|
|
```go
|
|
package field
|
|
|
|
import (
|
|
"encoding/binary"
|
|
"fmt"
|
|
"time"
|
|
|
|
bolt "go.etcd.io/bbolt"
|
|
)
|
|
|
|
var queueBucket = []byte("offline_queue")
|
|
|
|
// QueueEntry는 오프라인 큐의 단일 항목이다.
|
|
type QueueEntry struct {
|
|
Topic string
|
|
Payload []byte
|
|
EnqueuedAt time.Time
|
|
}
|
|
|
|
// OfflineQueue는 BoltDB 기반 오프라인 메시지 큐다.
|
|
type OfflineQueue struct {
|
|
db *bolt.DB
|
|
}
|
|
|
|
// NewOfflineQueue는 BoltDB 파일을 열고 큐 버킷을 초기화한다.
|
|
func NewOfflineQueue(path string) (*OfflineQueue, error) {
|
|
db, err := bolt.Open(path, 0600, &bolt.Options{Timeout: 1 * time.Second})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("bolt open %s: %w", path, err)
|
|
}
|
|
|
|
if err := db.Update(func(tx *bolt.Tx) error {
|
|
_, err := tx.CreateBucketIfNotExists(queueBucket)
|
|
return err
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("create bucket: %w", err)
|
|
}
|
|
|
|
return &OfflineQueue{db: db}, nil
|
|
}
|
|
|
|
// Enqueue는 메시지를 큐에 추가한다.
|
|
func (q *OfflineQueue) Enqueue(topic string, payload []byte) error {
|
|
return q.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(queueBucket)
|
|
|
|
// 자동 증가 시퀀스 키 생성
|
|
seq, err := b.NextSequence()
|
|
if err != nil {
|
|
return fmt.Errorf("next sequence: %w", err)
|
|
}
|
|
|
|
key := make([]byte, 8)
|
|
binary.BigEndian.PutUint64(key, seq)
|
|
|
|
// topic + null + payload 직렬화
|
|
value := append([]byte(topic+"\x00"), payload...)
|
|
|
|
return b.Put(key, value)
|
|
})
|
|
}
|
|
|
|
// Drain은 큐의 모든 항목을 순서대로 처리하고 성공한 항목을 삭제한다.
|
|
func (q *OfflineQueue) Drain(handler func(topic string, payload []byte) error) error {
|
|
return q.db.Update(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(queueBucket)
|
|
|
|
var toDelete [][]byte
|
|
|
|
if err := b.ForEach(func(k, v []byte) error {
|
|
// topic 분리
|
|
for i, c := range v {
|
|
if c == 0x00 {
|
|
topic := string(v[:i])
|
|
payload := v[i+1:]
|
|
if err := handler(topic, payload); err != nil {
|
|
return err // 실패 시 드레인 중단
|
|
}
|
|
toDelete = append(toDelete, append([]byte{}, k...))
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("invalid queue entry")
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
|
|
// 성공한 항목 삭제
|
|
for _, key := range toDelete {
|
|
if err := b.Delete(key); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// Size는 큐에 저장된 항목 수를 반환한다.
|
|
func (q *OfflineQueue) Size() (int, error) {
|
|
var count int
|
|
err := q.db.View(func(tx *bolt.Tx) error {
|
|
b := tx.Bucket(queueBucket)
|
|
count = b.Stats().KeyN
|
|
return nil
|
|
})
|
|
return count, err
|
|
}
|
|
|
|
// Close는 BoltDB를 닫는다.
|
|
func (q *OfflineQueue) Close() error { return q.db.Close() }
|
|
```
|
|
|
|
---
|
|
|
|
## 7. SPIFFE/SPIRE 통합
|
|
|
|
### 7.1 아키텍처 개요
|
|
|
|
```
|
|
SPIRE Server (T1)
|
|
├── Registration Entry: T1 에이전트 (k8s SA)
|
|
├── Registration Entry: T2 게이트웨이 (HW serial)
|
|
└── Registration Entry: T3 디바이스 (TPM attestation)
|
|
|
|
SPIRE Agent (각 노드에 DaemonSet/프로세스로 실행)
|
|
└── Workload API (Unix socket)
|
|
└── Go Workload (SVID 자동 갱신)
|
|
```
|
|
|
|
### 7.2 Go SDK 사용법
|
|
|
|
#### `internal/security/spiffe.go`
|
|
|
|
```go
|
|
package security
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/spiffe/go-spiffe/v2/spiffeid"
|
|
"github.com/spiffe/go-spiffe/v2/spiffetls/tlsconfig"
|
|
"github.com/spiffe/go-spiffe/v2/workloadapi"
|
|
"google.golang.org/grpc/credentials"
|
|
)
|
|
|
|
// SPIFFECredentials는 SPIRE Workload API에서 SVID를 가져와
|
|
// gRPC TransportCredentials를 생성한다.
|
|
type SPIFFECredentials struct {
|
|
source *workloadapi.X509Source
|
|
}
|
|
|
|
// NewSPIFFECredentials는 SPIRE Agent 소켓에 연결하고
|
|
// X.509 SVID 소스를 초기화한다.
|
|
func NewSPIFFECredentials(ctx context.Context, socketPath string) (*SPIFFECredentials, error) {
|
|
// SPIRE Agent Workload API에 연결
|
|
source, err := workloadapi.NewX509Source(
|
|
ctx,
|
|
workloadapi.WithClientOptions(
|
|
workloadapi.WithAddr("unix://" + socketPath),
|
|
),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("workload API connect: %w", err)
|
|
}
|
|
|
|
return &SPIFFECredentials{source: source}, nil
|
|
}
|
|
|
|
// ServerCredentials는 gRPC 서버용 mTLS 자격증명을 반환한다.
|
|
// 클라이언트 SVID를 trustDomain으로 검증한다.
|
|
func (s *SPIFFECredentials) ServerCredentials(trustDomain string) (credentials.TransportCredentials, error) {
|
|
td, err := spiffeid.TrustDomainFromString(trustDomain)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("trust domain: %w", err)
|
|
}
|
|
|
|
tlsCfg := tlsconfig.MTLSServerConfig(
|
|
s.source,
|
|
s.source,
|
|
tlsconfig.AuthorizeMemberOf(td),
|
|
)
|
|
return credentials.NewTLS(tlsCfg), nil
|
|
}
|
|
|
|
// ClientCredentials는 gRPC 클라이언트용 mTLS 자격증명을 반환한다.
|
|
// 서버 SPIFFE ID를 serverID로 검증한다.
|
|
func (s *SPIFFECredentials) ClientCredentials(serverSpiffeID string) (credentials.TransportCredentials, error) {
|
|
serverID, err := spiffeid.IDFromString(serverSpiffeID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("server SPIFFE ID: %w", err)
|
|
}
|
|
|
|
tlsCfg := tlsconfig.MTLSClientConfig(
|
|
s.source,
|
|
s.source,
|
|
tlsconfig.AuthorizeID(serverID),
|
|
)
|
|
return credentials.NewTLS(tlsCfg), nil
|
|
}
|
|
|
|
// Close는 X509Source를 닫는다.
|
|
func (s *SPIFFECredentials) Close() error { return s.source.Close() }
|
|
```
|
|
|
|
### 7.3 SVID 발급 흐름
|
|
|
|
```
|
|
T3 Field Agent 부팅
|
|
│
|
|
├─► TPM Quote 생성 (PCR 측정값)
|
|
│
|
|
├─► SPIRE Agent에 Attestation 요청 (TPM 플러그인)
|
|
│ └── SPIRE Agent → SPIRE Server 검증
|
|
│
|
|
├─► SPIRE Server: Registration Entry 매칭
|
|
│ └── spiffe://iot/{org}/{factory}/device/{serial}
|
|
│
|
|
├─► X.509-SVID 발급 (TTL: 1시간)
|
|
│ ├── Subject: spiffe://iot/...
|
|
│ └── SAN: URI = SPIFFE ID
|
|
│
|
|
└─► Workload API를 통해 Go 워크로드에 SVID 제공
|
|
└── go-spiffe SDK가 자동 갱신 (만료 10분 전)
|
|
```
|
|
|
|
### 7.4 SPIRE Registration Entry 예시
|
|
|
|
```bash
|
|
# T2 게이트웨이 등록 (하드웨어 시리얼 기반)
|
|
spire-server entry create \
|
|
-spiffeID spiffe://edge-aiot/t2/gateway/GW-001 \
|
|
-parentID spiffe://edge-aiot/spire-agent \
|
|
-selector unix:uid:1000 \
|
|
-selector aws_iid:instance-id:i-0abc123
|
|
|
|
# T3 Field Agent 등록 (TPM attestation)
|
|
spire-server entry create \
|
|
-spiffeID spiffe://edge-aiot/t3/factory-a/device/SENSOR-001 \
|
|
-parentID spiffe://edge-aiot/spire-agent \
|
|
-selector tpm:pub_hash:sha256:abcdef...
|
|
```
|
|
|
|
---
|
|
|
|
## 8. OpenTelemetry 통합
|
|
|
|
### 8.1 TracerProvider 초기화
|
|
|
|
#### `internal/observability/tracer.go`
|
|
|
|
```go
|
|
package observability
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/sdk/resource"
|
|
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
|
semconv "go.opentelemetry.io/otel/semconv/v1.26.0"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
)
|
|
|
|
// InitTracer는 OTel TracerProvider를 초기화하고 글로벌에 등록한다.
|
|
// otlpEndpoint: OTel Collector OTLP gRPC 엔드포인트 (예: "otel-collector:4317")
|
|
func InitTracer(ctx context.Context, serviceName, otlpEndpoint string) (func(context.Context) error, error) {
|
|
// OTLP gRPC exporter 생성
|
|
conn, err := grpc.DialContext(ctx, otlpEndpoint,
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("grpc dial otlp: %w", err)
|
|
}
|
|
|
|
exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("otlp exporter: %w", err)
|
|
}
|
|
|
|
// 리소스 속성 (서비스 이름, 버전, 배포 환경)
|
|
res, err := resource.New(ctx,
|
|
resource.WithAttributes(
|
|
semconv.ServiceName(serviceName),
|
|
semconv.ServiceVersion("0.1.0"),
|
|
),
|
|
resource.WithOS(),
|
|
resource.WithHost(),
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("resource: %w", err)
|
|
}
|
|
|
|
// TracerProvider 생성
|
|
tp := sdktrace.NewTracerProvider(
|
|
sdktrace.WithBatcher(exporter),
|
|
sdktrace.WithResource(res),
|
|
sdktrace.WithSampler(sdktrace.AlwaysSample()), // 실험 환경 — 프로덕션은 ParentBased
|
|
)
|
|
|
|
// 글로벌 TracerProvider 및 Propagator 설정
|
|
otel.SetTracerProvider(tp)
|
|
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(
|
|
propagation.TraceContext{}, // W3C traceparent/tracestate
|
|
propagation.Baggage{},
|
|
))
|
|
|
|
// 종료 함수 반환 (defer로 호출)
|
|
return tp.Shutdown, nil
|
|
}
|
|
```
|
|
|
|
### 8.2 gRPC Interceptor에서 Trace Propagation
|
|
|
|
#### `internal/gateway/interceptors/otel.go`
|
|
|
|
```go
|
|
package interceptors
|
|
|
|
import (
|
|
"context"
|
|
|
|
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
|
|
"go.opentelemetry.io/otel"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/codes"
|
|
"go.opentelemetry.io/otel/propagation"
|
|
"go.opentelemetry.io/otel/trace"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/metadata"
|
|
)
|
|
|
|
// OTelUnary는 Unary RPC에 OTel trace를 주입하는 인터셉터다.
|
|
// otelgrpc.UnaryServerInterceptor()를 래핑하여 Resume Token trace 연속성을 추가한다.
|
|
func OTelUnary() grpc.UnaryServerInterceptor {
|
|
base := otelgrpc.UnaryServerInterceptor()
|
|
return func(
|
|
ctx context.Context,
|
|
req interface{},
|
|
info *grpc.UnaryServerInfo,
|
|
handler grpc.UnaryHandler,
|
|
) (interface{}, error) {
|
|
// Resume Token에 포함된 traceparent로 trace 연속성 복원
|
|
ctx = restoreTraceFromResumeToken(ctx)
|
|
return base(ctx, req, info, handler)
|
|
}
|
|
}
|
|
|
|
// OTelStream는 Streaming RPC에 OTel trace를 주입하는 인터셉터다.
|
|
func OTelStream() grpc.StreamServerInterceptor {
|
|
base := otelgrpc.StreamServerInterceptor()
|
|
return func(
|
|
srv interface{},
|
|
ss grpc.ServerStream,
|
|
info *grpc.StreamServerInfo,
|
|
handler grpc.StreamHandler,
|
|
) error {
|
|
ctx := restoreTraceFromResumeToken(ss.Context())
|
|
wrapped := &otelServerStream{ServerStream: ss, ctx: ctx}
|
|
return base(srv, wrapped, info, handler)
|
|
}
|
|
}
|
|
|
|
// restoreTraceFromResumeToken은 Resume Token에 포함된 traceparent를
|
|
// context에 복원하여 단절-재연결 후에도 trace 체인이 유지되도록 한다.
|
|
func restoreTraceFromResumeToken(ctx context.Context) context.Context {
|
|
state, ok := GetResumeState(ctx)
|
|
if !ok || state.TraceParent == "" {
|
|
return ctx
|
|
}
|
|
|
|
// traceparent를 metadata carrier로 변환하여 propagate
|
|
carrier := propagation.MapCarrier{
|
|
"traceparent": state.TraceParent,
|
|
}
|
|
return otel.GetTextMapPropagator().Extract(ctx, carrier)
|
|
}
|
|
|
|
// InjectResumeTraceParent는 현재 span의 traceparent를 ResumeState에 저장한다.
|
|
// Resume Token 발급 시 호출한다.
|
|
func InjectResumeTraceParent(ctx context.Context) string {
|
|
carrier := propagation.MapCarrier{}
|
|
otel.GetTextMapPropagator().Inject(ctx, carrier)
|
|
return carrier.Get("traceparent")
|
|
}
|
|
|
|
// T2FanOutSpan은 T3 디바이스에서 전달된 trace를 T2 게이트웨이에서
|
|
// fan-out 지점으로 처리하는 span을 생성한다.
|
|
func T2FanOutSpan(ctx context.Context, deviceID, metric string) (context.Context, trace.Span) {
|
|
tracer := otel.Tracer("t2-gateway")
|
|
return tracer.Start(ctx, "t2.fanout",
|
|
trace.WithAttributes(
|
|
attribute.String("device.id", deviceID),
|
|
attribute.String("telemetry.metric", metric),
|
|
),
|
|
)
|
|
}
|
|
|
|
// RecordError는 gRPC 에러를 span에 기록한다.
|
|
func RecordError(span trace.Span, err error) {
|
|
if err != nil {
|
|
span.RecordError(err)
|
|
span.SetStatus(codes.Error, err.Error())
|
|
}
|
|
}
|
|
|
|
type otelServerStream struct {
|
|
grpc.ServerStream
|
|
ctx context.Context
|
|
}
|
|
|
|
func (s *otelServerStream) Context() context.Context { return s.ctx }
|
|
```
|
|
|
|
### 8.3 MQTT→gRPC Trace 연속성 패턴
|
|
|
|
T3 디바이스가 MQTT로 발행할 때 UserProperties(MQTT 5.0) 또는 페이로드 필드로 `traceparent`를 포함시키고, T2 게이트웨이의 `Translator`가 이를 gRPC metadata로 변환하여 T1까지 trace 체인을 연결한다.
|
|
|
|
```go
|
|
// MQTT 페이로드의 Props 필드에서 traceparent 추출 후 context 복원 예시
|
|
func extractTraceFromMQTTPayload(payload TelemetryPayload) context.Context {
|
|
ctx := context.Background()
|
|
if len(payload.Props) == 0 {
|
|
return ctx
|
|
}
|
|
carrier := propagation.MapCarrier(payload.Props)
|
|
return otel.GetTextMapPropagator().Extract(ctx, carrier)
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 9. Docker Compose 실험 환경 설계
|
|
|
|
### `deployments/docker-compose.yml`
|
|
|
|
```yaml
|
|
version: "3.9"
|
|
|
|
networks:
|
|
t1-net:
|
|
driver: bridge
|
|
t2-net:
|
|
driver: bridge
|
|
t3-net:
|
|
driver: bridge
|
|
obs-net:
|
|
driver: bridge
|
|
|
|
volumes:
|
|
spire-data:
|
|
prometheus-data:
|
|
grafana-data:
|
|
|
|
services:
|
|
# ─────────────────────────────────────────────────────
|
|
# T1: 클라우드 계층
|
|
# ─────────────────────────────────────────────────────
|
|
cloud-server:
|
|
build:
|
|
context: ..
|
|
dockerfile: deployments/Dockerfile.go
|
|
args:
|
|
CMD: cmd/cloud-server
|
|
image: edge-aiot/cloud-server:dev
|
|
container_name: t1-cloud-server
|
|
networks: [t1-net, obs-net]
|
|
ports:
|
|
- "9443:9443" # gRPC over QUIC
|
|
environment:
|
|
- GRPC_ADDR=:9443
|
|
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
|
- SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
|
|
volumes:
|
|
- /run/spire/t1/sockets:/run/spire/sockets:ro
|
|
depends_on:
|
|
- spire-server
|
|
- otel-collector
|
|
|
|
t1-orchestrator:
|
|
build:
|
|
context: ../python
|
|
dockerfile: ../deployments/Dockerfile.python
|
|
image: edge-aiot/orchestrator:dev
|
|
container_name: t1-orchestrator
|
|
networks: [t1-net, obs-net]
|
|
ports:
|
|
- "8080:8080" # FastAPI 관리 인터페이스
|
|
environment:
|
|
- GATEWAY_A2A_URL=https://t2-gateway:8443
|
|
- CLOUD_GRPC_ADDR=cloud-server:9443
|
|
- LLM_MODEL=gpt-4o
|
|
- OPENAI_API_KEY=${OPENAI_API_KEY}
|
|
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
|
depends_on:
|
|
- cloud-server
|
|
|
|
# ─────────────────────────────────────────────────────
|
|
# T2: 엣지 계층
|
|
# ─────────────────────────────────────────────────────
|
|
t2-gateway:
|
|
build:
|
|
context: ..
|
|
dockerfile: deployments/Dockerfile.go
|
|
args:
|
|
CMD: cmd/gateway
|
|
image: edge-aiot/gateway:dev
|
|
container_name: t2-gateway
|
|
networks: [t1-net, t2-net, t3-net, obs-net]
|
|
ports:
|
|
- "19443:9443" # gRPC over QUIC (T1 방향)
|
|
- "18443:8443" # A2A HTTP/3
|
|
environment:
|
|
- GRPC_ADDR=:9443
|
|
- A2A_ADDR=:8443
|
|
- MQTT_BROKER=mqtt-broker
|
|
- MQTT_PORT=1883
|
|
- T1_GRPC_ADDR=cloud-server:9443
|
|
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
|
- SPIFFE_ENDPOINT_SOCKET=/run/spire/sockets/agent.sock
|
|
volumes:
|
|
- /run/spire/t2/sockets:/run/spire/sockets:ro
|
|
depends_on:
|
|
- mqtt-broker
|
|
- spire-server
|
|
- otel-collector
|
|
|
|
# ─────────────────────────────────────────────────────
|
|
# T3: 현장 계층 (복수 인스턴스)
|
|
# ─────────────────────────────────────────────────────
|
|
t3-field-agent-1:
|
|
build:
|
|
context: ..
|
|
dockerfile: deployments/Dockerfile.go
|
|
args:
|
|
CMD: cmd/field-agent
|
|
image: edge-aiot/field-agent:dev
|
|
container_name: t3-field-agent-1
|
|
networks: [t3-net, obs-net]
|
|
environment:
|
|
- DEVICE_ID=SENSOR-001
|
|
- SITE_ID=factory-a
|
|
- MQTT_BROKER=mqtt-broker
|
|
- MQTT_PORT=1883
|
|
- OFFLINE_DB=/data/queue.db
|
|
- OTEL_EXPORTER_OTLP_ENDPOINT=http://otel-collector:4317
|
|
volumes:
|
|
- ./data/agent-1:/data
|
|
depends_on:
|
|
- mqtt-broker
|
|
|
|
t3-field-agent-2:
|
|
extends:
|
|
service: t3-field-agent-1
|
|
container_name: t3-field-agent-2
|
|
environment:
|
|
- DEVICE_ID=SENSOR-002
|
|
- SITE_ID=factory-a
|
|
- MQTT_BROKER=mqtt-broker
|
|
- MQTT_PORT=1883
|
|
- OFFLINE_DB=/data/queue.db
|
|
volumes:
|
|
- ./data/agent-2:/data
|
|
|
|
t3-field-agent-3:
|
|
extends:
|
|
service: t3-field-agent-1
|
|
container_name: t3-field-agent-3
|
|
environment:
|
|
- DEVICE_ID=AGV-001
|
|
- SITE_ID=factory-a
|
|
- MQTT_BROKER=mqtt-broker
|
|
- MQTT_PORT=1883
|
|
- OFFLINE_DB=/data/queue.db
|
|
volumes:
|
|
- ./data/agent-3:/data
|
|
|
|
# ─────────────────────────────────────────────────────
|
|
# 인프라: MQTT 브로커
|
|
# ─────────────────────────────────────────────────────
|
|
mqtt-broker:
|
|
image: eclipse-mosquitto:2.0
|
|
container_name: mqtt-broker
|
|
networks: [t2-net, t3-net]
|
|
ports:
|
|
- "1883:1883"
|
|
- "9001:9001" # WebSocket
|
|
volumes:
|
|
- ./mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
|
|
|
|
# ─────────────────────────────────────────────────────
|
|
# 보안: SPIRE
|
|
# ─────────────────────────────────────────────────────
|
|
spire-server:
|
|
image: ghcr.io/spiffe/spire-server:1.9.0
|
|
container_name: spire-server
|
|
networks: [t1-net]
|
|
ports:
|
|
- "8081:8081"
|
|
volumes:
|
|
- ./spire/server.conf:/etc/spire/server/server.conf:ro
|
|
- spire-data:/var/lib/spire
|
|
command: ["-config", "/etc/spire/server/server.conf"]
|
|
|
|
spire-agent-t1:
|
|
image: ghcr.io/spiffe/spire-agent:1.9.0
|
|
container_name: spire-agent-t1
|
|
networks: [t1-net]
|
|
volumes:
|
|
- ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
|
|
- /run/spire/t1/sockets:/run/spire/sockets
|
|
depends_on: [spire-server]
|
|
command: ["-config", "/etc/spire/agent/agent.conf"]
|
|
|
|
spire-agent-t2:
|
|
image: ghcr.io/spiffe/spire-agent:1.9.0
|
|
container_name: spire-agent-t2
|
|
networks: [t2-net]
|
|
volumes:
|
|
- ./spire/agent.conf:/etc/spire/agent/agent.conf:ro
|
|
- /run/spire/t2/sockets:/run/spire/sockets
|
|
depends_on: [spire-server]
|
|
command: ["-config", "/etc/spire/agent/agent.conf"]
|
|
|
|
# ─────────────────────────────────────────────────────
|
|
# 관측성: OTel Collector + Prometheus + Grafana + Jaeger
|
|
# ─────────────────────────────────────────────────────
|
|
otel-collector:
|
|
image: otel/opentelemetry-collector-contrib:0.100.0
|
|
container_name: otel-collector
|
|
networks: [t1-net, t2-net, t3-net, obs-net]
|
|
ports:
|
|
- "4317:4317" # OTLP gRPC
|
|
- "4318:4318" # OTLP HTTP
|
|
- "8889:8889" # Prometheus metrics exporter
|
|
volumes:
|
|
- ./otel/otel-collector-config.yaml:/etc/otelcol/config.yaml:ro
|
|
command: ["--config", "/etc/otelcol/config.yaml"]
|
|
|
|
jaeger:
|
|
image: jaegertracing/all-in-one:1.57
|
|
container_name: jaeger
|
|
networks: [obs-net]
|
|
ports:
|
|
- "16686:16686" # Jaeger UI
|
|
- "14250:14250" # gRPC (OTel Collector → Jaeger)
|
|
|
|
prometheus:
|
|
image: prom/prometheus:v2.51.0
|
|
container_name: prometheus
|
|
networks: [obs-net]
|
|
ports:
|
|
- "9090:9090"
|
|
volumes:
|
|
- ./otel/prometheus.yml:/etc/prometheus/prometheus.yml:ro
|
|
- prometheus-data:/prometheus
|
|
|
|
grafana:
|
|
image: grafana/grafana:10.4.0
|
|
container_name: grafana
|
|
networks: [obs-net]
|
|
ports:
|
|
- "3000:3000"
|
|
environment:
|
|
- GF_SECURITY_ADMIN_PASSWORD=admin
|
|
- GF_DASHBOARDS_DEFAULT_HOME_DASHBOARD_PATH=/var/lib/grafana/dashboards/edge-aiot.json
|
|
volumes:
|
|
- grafana-data:/var/lib/grafana
|
|
- ./grafana/dashboards:/var/lib/grafana/dashboards:ro
|
|
depends_on:
|
|
- prometheus
|
|
- jaeger
|
|
```
|
|
|
|
### `deployments/Dockerfile.go`
|
|
|
|
```dockerfile
|
|
# 멀티 스테이지 빌드 — 최소 런타임 이미지
|
|
FROM golang:1.22-alpine AS builder
|
|
|
|
WORKDIR /app
|
|
COPY go.mod go.sum ./
|
|
RUN go mod download
|
|
|
|
COPY . .
|
|
|
|
ARG CMD=cmd/gateway
|
|
RUN CGO_ENABLED=0 GOOS=linux go build -o /bin/app ./${CMD}/...
|
|
|
|
FROM gcr.io/distroless/static-debian12
|
|
COPY --from=builder /bin/app /app
|
|
ENTRYPOINT ["/app"]
|
|
```
|
|
|
|
### `deployments/otel/otel-collector-config.yaml` (핵심 스니펫)
|
|
|
|
```yaml
|
|
receivers:
|
|
otlp:
|
|
protocols:
|
|
grpc:
|
|
endpoint: 0.0.0.0:4317
|
|
http:
|
|
endpoint: 0.0.0.0:4318
|
|
|
|
processors:
|
|
batch:
|
|
timeout: 1s
|
|
send_batch_size: 1024
|
|
memory_limiter:
|
|
check_interval: 1s
|
|
limit_mib: 512
|
|
|
|
exporters:
|
|
jaeger:
|
|
endpoint: jaeger:14250
|
|
tls:
|
|
insecure: true
|
|
prometheus:
|
|
endpoint: "0.0.0.0:8889"
|
|
|
|
service:
|
|
pipelines:
|
|
traces:
|
|
receivers: [otlp]
|
|
processors: [memory_limiter, batch]
|
|
exporters: [jaeger]
|
|
metrics:
|
|
receivers: [otlp]
|
|
processors: [memory_limiter, batch]
|
|
exporters: [prometheus]
|
|
```
|
|
|
|
---
|
|
|
|
## 10. 구현 단계별 계획 (Phase)
|
|
|
|
### Phase 1: Proto 정의 + 기본 gRPC over QUIC (2주)
|
|
|
|
**목표**: 컴파일 가능한 Proto + quic-go 위에서 gRPC 서버/클라이언트 통신 검증
|
|
|
|
| 주 | 작업 항목 | 산출물 |
|
|
|----|---------|--------|
|
|
| 1 | Proto 파일 설계 및 `buf generate` 설정 | `proto/`, `gen/` 디렉터리 |
|
|
| 1 | Go 모듈 초기화, 의존성 설치 | `go.mod`, `go.sum` |
|
|
| 2 | quic-go + gRPC-Go 통합 (QuicNetListener 구현) | `internal/gateway/server.go` |
|
|
| 2 | T1 gRPC-over-QUIC 클라이언트 구현 | `internal/cloud/grpc_quic_client.go` |
|
|
| 2 | 기본 AgentService Unary RPC 동작 검증 | `tests/integration/basic_grpc_test.go` |
|
|
|
|
**완료 기준**: `go test ./tests/integration/... -run TestBasicUnaryRPC` 통과
|
|
|
|
### Phase 2: MQTT 변환 게이트웨이 (2주)
|
|
|
|
**목표**: T3 MQTT 발행 → T2 변환 → T1 gRPC 전달 end-to-end 동작
|
|
|
|
| 주 | 작업 항목 | 산출물 |
|
|
|----|---------|--------|
|
|
| 3 | MQTT 클라이언트 구현 + Topic 라우팅 | `internal/gateway/mqtt_client.go` |
|
|
| 3 | MQTT→gRPC 변환 엔진 (`Translator`) | `internal/gateway/translator.go` |
|
|
| 4 | T3 Field Agent (MQTT 발행 + BoltDB 오프라인 큐) | `internal/field/` 패키지 |
|
|
| 4 | Docker Compose 기본 환경 (T1+T2+T3+Mosquitto) | `deployments/docker-compose.yml` |
|
|
| 4 | 오프라인 시나리오 테스트 (MQTT 브로커 중단 후 재연결) | `tests/integration/offline_test.go` |
|
|
|
|
**완료 기준**: T3 → MQTT → T2 Translator → gRPC → T1 텔레메트리 수신 확인
|
|
|
|
### Phase 3: A2A 통합 + Resume Token (2주)
|
|
|
|
**목표**: A2A HTTP/3 엔드포인트 + Resume Token 기반 스트리밍 재개 동작
|
|
|
|
| 주 | 작업 항목 | 산출물 |
|
|
|----|---------|--------|
|
|
| 5 | Resume Token Manager + gRPC Interceptor | `internal/gateway/resume_token.go`, `interceptors/resume.go` |
|
|
| 5 | A2A HTTP/3 핸들러 (quic-go http3 패키지) | `internal/gateway/a2a_handler.go` |
|
|
| 6 | Python A2A 클라이언트 + LangGraph 에이전트 | `python/orchestrator/` |
|
|
| 6 | 스트리밍 중단-재개 통합 테스트 | `tests/integration/resume_test.go` |
|
|
|
|
**완료 기준**: 스트리밍 중 네트워크 차단 → 재연결 → Resume Token으로 이어받기 검증
|
|
|
|
### Phase 4: SPIFFE/SPIRE + OTel (2주)
|
|
|
|
**목표**: mTLS(SPIFFE SVID) + 분산 트레이싱 end-to-end 동작
|
|
|
|
| 주 | 작업 항목 | 산출물 |
|
|
|----|---------|--------|
|
|
| 7 | SPIRE Server/Agent Docker Compose 통합 | `deployments/docker-compose.spire.yml` |
|
|
| 7 | go-spiffe SDK 래퍼 + gRPC credentials 교체 | `internal/security/spiffe.go` |
|
|
| 8 | OTel TracerProvider + otelgrpc 인터셉터 | `internal/observability/tracer.go` |
|
|
| 8 | MQTT→gRPC trace propagation (traceparent in payload Props) | `internal/gateway/interceptors/otel.go` |
|
|
| 8 | Jaeger에서 T3→T2→T1 trace 체인 시각화 확인 | - |
|
|
|
|
**완료 기준**: Jaeger에서 단일 MQTT 발행의 전체 트레이스(T3→T2→T1) 확인
|
|
|
|
### Phase 5: 실험 검증 (2주)
|
|
|
|
**목표**: 성능 지표 측정, 단절 내성 검증, 문서화
|
|
|
|
| 주 | 작업 항목 | 산출물 |
|
|
|----|---------|--------|
|
|
| 9 | 성능 벤치마크 (gRPC vs REST 비교, latency/throughput) | `tests/perf/benchmark_test.go` |
|
|
| 9 | 네트워크 파티션 시나리오 (tc netem으로 손실·지연 주입) | chaos 테스트 스크립트 |
|
|
| 10 | Grafana 대시보드 구성 (latency, throughput, error rate) | `deployments/grafana/dashboards/` |
|
|
| 10 | 실험 결과 분석 및 구현 설계서 업데이트 | 본 문서 v1.0 |
|
|
|
|
**완료 기준**: 모든 KPI 측정값 수집 완료, Grafana 대시보드 정상 동작
|
|
|
|
---
|
|
|
|
## 11. 테스트 전략
|
|
|
|
### 11.1 단위 테스트
|
|
|
|
```
|
|
대상 패키지 테스트 항목
|
|
───────────────────────────────── ────────────────────────────────────
|
|
internal/gateway/resume_token.go 토큰 발급/검증, 서명 위조 감지, 만료 처리
|
|
internal/gateway/translator.go MQTT 토픽 패턴 매칭, JSON→Protobuf 변환
|
|
internal/field/offline_queue.go Enqueue/Drain 순서 보장, 동시 접근
|
|
internal/security/spiffe.go SVID 갱신 모킹, 인증 실패 처리
|
|
```
|
|
|
|
```go
|
|
// 예시: Resume Token 단위 테스트
|
|
func TestResumeTokenRoundTrip(t *testing.T) {
|
|
mgr := NewResumeTokenManager()
|
|
state := ResumeState{
|
|
StreamID: "stream-abc",
|
|
SequenceNum: 42,
|
|
DeviceID: "SENSOR-001",
|
|
TraceParent: "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01",
|
|
}
|
|
|
|
token, err := mgr.Issue(state)
|
|
require.NoError(t, err)
|
|
require.NotEmpty(t, token)
|
|
|
|
restored, err := mgr.Verify(token)
|
|
require.NoError(t, err)
|
|
assert.Equal(t, state.StreamID, restored.StreamID)
|
|
assert.Equal(t, state.SequenceNum, restored.SequenceNum)
|
|
assert.Equal(t, state.TraceParent, restored.TraceParent)
|
|
}
|
|
|
|
func TestResumeTokenTampering(t *testing.T) {
|
|
mgr := NewResumeTokenManager()
|
|
state := ResumeState{StreamID: "s1", SequenceNum: 1, DeviceID: "d1"}
|
|
|
|
token, _ := mgr.Issue(state)
|
|
// 마지막 바이트 변조
|
|
tampered := token[:len(token)-2] + "ZZ"
|
|
|
|
_, err := mgr.Verify(tampered)
|
|
assert.Error(t, err)
|
|
}
|
|
```
|
|
|
|
### 11.2 통합 테스트
|
|
|
|
```go
|
|
// 예시: MQTT→gRPC 변환 통합 테스트 (testcontainers-go 사용)
|
|
func TestMQTTToGRPCTranslation(t *testing.T) {
|
|
ctx := context.Background()
|
|
|
|
// Mosquitto 컨테이너 기동
|
|
mosquitto, err := testcontainers.GenericContainer(ctx,
|
|
testcontainers.GenericContainerRequest{
|
|
ContainerRequest: testcontainers.ContainerRequest{
|
|
Image: "eclipse-mosquitto:2.0",
|
|
ExposedPorts: []string{"1883/tcp"},
|
|
WaitingFor: wait.ForListeningPort("1883/tcp"),
|
|
},
|
|
Started: true,
|
|
},
|
|
)
|
|
require.NoError(t, err)
|
|
defer mosquitto.Terminate(ctx)
|
|
|
|
host, _ := mosquitto.Host(ctx)
|
|
port, _ := mosquitto.MappedPort(ctx, "1883")
|
|
brokerAddr := fmt.Sprintf("%s:%s", host, port.Port())
|
|
|
|
// Gateway 초기화
|
|
gw, err := gateway.New(ctx, gateway.GatewayConfig{
|
|
GRPCAddr: ":0",
|
|
MQTTBroker: brokerAddr,
|
|
})
|
|
require.NoError(t, err)
|
|
|
|
// 텔레메트리 수신 채널
|
|
received := make(chan *telemetryv1.DataPoint, 1)
|
|
gw.SetTelemetryHandler(func(dp *telemetryv1.DataPoint) {
|
|
received <- dp
|
|
})
|
|
|
|
// T3 역할: MQTT 발행
|
|
publisher, _ := newTestMQTTClient(brokerAddr)
|
|
payload := `{"v":42.5,"u":"celsius","ts":1234567890000}`
|
|
publisher.Publish("telemetry/factory-a/SENSOR-001/temperature", 1, false, payload)
|
|
|
|
// 변환 결과 검증
|
|
select {
|
|
case dp := <-received:
|
|
assert.Equal(t, "SENSOR-001", dp.DeviceId)
|
|
assert.Equal(t, "temperature", dp.Metric)
|
|
assert.InDelta(t, 42.5, dp.Value, 0.001)
|
|
case <-time.After(5 * time.Second):
|
|
t.Fatal("timeout waiting for telemetry")
|
|
}
|
|
}
|
|
```
|
|
|
|
### 11.3 성능 테스트
|
|
|
|
```go
|
|
// tests/perf/benchmark_test.go
|
|
func BenchmarkUnaryRPCOverQUIC(b *testing.B) {
|
|
// 환경: Docker Compose로 T1+T2 기동 후 실행
|
|
client, err := cloud.NewT1Client(
|
|
context.Background(),
|
|
"localhost:9443",
|
|
testTLSConfig(),
|
|
)
|
|
require.NoError(b, err)
|
|
defer client.Close()
|
|
|
|
task := &agentv1.AgentTask{
|
|
TaskId: "bench",
|
|
AgentId: "t1",
|
|
TaskType: "query",
|
|
Payload: make([]byte, 256), // 256B 페이로드
|
|
}
|
|
|
|
b.ResetTimer()
|
|
b.RunParallel(func(pb *testing.PB) {
|
|
for pb.Next() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
|
_, err := client.ExecuteTask(ctx, task)
|
|
cancel()
|
|
if err != nil {
|
|
b.Errorf("RPC error: %v", err)
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
func BenchmarkTelemetryFanIn(b *testing.B) {
|
|
// 1000개 T3 에이전트에서 동시 텔레메트리 발행
|
|
b.SetParallelism(1000)
|
|
// 구현 생략 (위와 유사한 패턴)
|
|
}
|
|
```
|
|
|
|
---
|
|
|
|
## 12. 예상 성능 지표 및 측정 방법
|
|
|
|
### 12.1 목표 성능 지표 (KPI)
|
|
|
|
| 지표 | 목표값 | 측정 방법 |
|
|
|------|--------|---------|
|
|
| T1↔T2 gRPC Unary p50 latency | < 5ms (LAN) | `go test -bench=BenchmarkUnaryRPC` + `otelgrpc` 자동 계측 |
|
|
| T1↔T2 gRPC Unary p99 latency | < 20ms (LAN) | Jaeger → Grafana 히스토그램 |
|
|
| T2↔T3 MQTT→gRPC 변환 지연 | < 10ms | MQTT 발행 TS → gRPC 수신 TS 차분 |
|
|
| T3 텔레메트리 동시 처리 | 1,000 노드 @ 1Hz | `BenchmarkTelemetryFanIn` |
|
|
| Resume Token 재연결 후 재개 지연 | < 500ms | 네트워크 차단 → 재연결 시간 측정 |
|
|
| gRPC over QUIC vs TCP 핸드오버 | QUIC -50% RTT | `tc netem`으로 IP 변경 시뮬레이션 |
|
|
| SPIFFE SVID 갱신 오버헤드 | < 1ms/RPC | `pprof` + Prometheus `grpc_server_handling_seconds` |
|
|
|
|
### 12.2 측정 방법 상세
|
|
|
|
#### Prometheus 메트릭 수집
|
|
|
|
```yaml
|
|
# deployments/otel/prometheus.yml
|
|
scrape_configs:
|
|
- job_name: 'cloud-server'
|
|
static_configs:
|
|
- targets: ['cloud-server:2112']
|
|
scrape_interval: 5s
|
|
|
|
- job_name: 't2-gateway'
|
|
static_configs:
|
|
- targets: ['t2-gateway:2112']
|
|
|
|
- job_name: 'otel-collector'
|
|
static_configs:
|
|
- targets: ['otel-collector:8889']
|
|
```
|
|
|
|
#### 주요 모니터링 메트릭
|
|
|
|
```
|
|
# gRPC 레이턴시 히스토그램 (otelgrpc 자동 생성)
|
|
grpc_server_handling_seconds_bucket{grpc_method="Execute",grpc_service="agent.v1.AgentService"}
|
|
|
|
# MQTT 메시지 처리량
|
|
mqtt_messages_received_total{topic="telemetry"}
|
|
mqtt_translation_duration_seconds_bucket
|
|
|
|
# Resume Token 재연결
|
|
resume_token_reconnect_total
|
|
resume_token_reconnect_latency_seconds
|
|
|
|
# 오프라인 큐 크기
|
|
offline_queue_size{device_id="SENSOR-001"}
|
|
```
|
|
|
|
#### 네트워크 장애 주입 (tc netem)
|
|
|
|
```bash
|
|
# T2 게이트웨이에서 T1으로 가는 경로에 30% 패킷 손실 + 100ms 지연 주입
|
|
docker exec t2-gateway tc qdisc add dev eth0 root netem loss 30% delay 100ms
|
|
|
|
# 30초 후 정상 복구
|
|
sleep 30
|
|
docker exec t2-gateway tc qdisc del dev eth0 root
|
|
|
|
# IP 변경 시뮬레이션 (QUIC connection migration 테스트)
|
|
docker network disconnect t1-net t2-gateway
|
|
docker network connect t1-net t2-gateway
|
|
```
|
|
|
|
#### 핵심 그래프 (Grafana 대시보드)
|
|
|
|
1. **레이턴시 p50/p95/p99 시계열**: tier별 gRPC 호출 지연 추이
|
|
2. **처리량 시계열**: 초당 RPC 수, MQTT 메시지 수
|
|
3. **에러율**: gRPC 상태 코드별 에러율 (UNAVAILABLE, DEADLINE_EXCEEDED 등)
|
|
4. **Resume Token 이벤트**: 단절/재연결 빈도 및 재개 지연
|
|
5. **오프라인 큐 크기**: T3 디바이스별 버퍼 누적 현황
|
|
6. **Jaeger 트레이스 뷰**: T3→T2→T1 E2E 호출 그래프
|
|
|
|
---
|
|
|
|
## 부록 A. Go 모듈 의존성 (`go.mod` 핵심 부분)
|
|
|
|
```go
|
|
module github.com/your-org/edge-aiot-mas
|
|
|
|
go 1.22
|
|
|
|
require (
|
|
// gRPC
|
|
google.golang.org/grpc v1.64.0
|
|
google.golang.org/protobuf v1.34.1
|
|
|
|
// QUIC
|
|
github.com/quic-go/quic-go v0.44.0
|
|
|
|
// MQTT
|
|
github.com/eclipse/paho.mqtt.golang v1.4.3
|
|
|
|
// SPIFFE/SPIRE
|
|
github.com/spiffe/go-spiffe/v2 v2.3.0
|
|
|
|
// OpenTelemetry
|
|
go.opentelemetry.io/otel v1.27.0
|
|
go.opentelemetry.io/otel/sdk v1.27.0
|
|
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.52.0
|
|
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.27.0
|
|
|
|
// BoltDB (오프라인 큐)
|
|
go.etcd.io/bbolt v1.3.10
|
|
|
|
// 테스트
|
|
github.com/testcontainers/testcontainers-go v0.31.0
|
|
github.com/stretchr/testify v1.9.0
|
|
)
|
|
```
|
|
|
|
## 부록 B. Python 의존성 (`python/pyproject.toml` 핵심 부분)
|
|
|
|
```toml
|
|
[project]
|
|
name = "edge-aiot-orchestrator"
|
|
version = "0.1.0"
|
|
requires-python = ">=3.11"
|
|
|
|
dependencies = [
|
|
# LangGraph 에이전트
|
|
"langgraph>=0.1.0",
|
|
"langchain-openai>=0.1.0",
|
|
"langchain-core>=0.2.0",
|
|
|
|
# gRPC
|
|
"grpcio>=1.64.0",
|
|
"grpcio-tools>=1.64.0",
|
|
"protobuf>=5.27.0",
|
|
|
|
# HTTP/A2A 클라이언트
|
|
"httpx[http2]>=0.27.0",
|
|
|
|
# 관측성
|
|
"opentelemetry-sdk>=1.27.0",
|
|
"opentelemetry-exporter-otlp>=1.27.0",
|
|
"opentelemetry-instrumentation-grpc>=0.48b0",
|
|
]
|
|
|
|
[project.optional-dependencies]
|
|
dev = [
|
|
"pytest>=8.0",
|
|
"pytest-asyncio>=0.23",
|
|
"pytest-cov>=5.0",
|
|
]
|
|
```
|