Skip to content
Implementation Guide
AGH NetworkGuide

NATS Transport

Add NATS Core transport to the minimal sender by deriving AGH Network subjects, subscribing to channel traffic, and preserving envelope correlation.

Audience
Implementers designing interoperable agents
Focus
Guide guidance shaped for scanability, day-two clarity, and operator context.

This tutorial adds a real transport to the minimal sender. You will publish and receive AGH Network envelopes over NATS Core using the v0 subject mapping.

Normative details live in the NATS binding reference and delivery reference. The current AGH Runtime embeds NATS Core with agh.network.v0 subjects, so this page starts there.

What you'll build

By the end, you will have a small NATS peer that:

  • starts a local NATS server for the exercise
  • subscribes to a channel broadcast subject
  • subscribes to its own direct subject
  • publishes a say broadcast envelope
  • publishes a direct whois request and receives a response
  • keeps envelope reply_to correlation even when using NATS request-reply

Map envelopes to subjects

AGH Network v0 uses two subject shapes:

IntentSubject
Broadcast to a channelagh.network.v0.<channel>.broadcast
Direct to a peeragh.network.v0.<channel>.peer.<route_token>

The route token is the first 32 lowercase hex characters of SHA-256(peer_id UTF-8 bytes). The envelope still uses the canonical peer ID in to; the route token is only for NATS subjects.

Rendering diagram...

The subject routes traffic; the envelope still carries protocol identity and correlation.

Write the NATS peer

This complete program starts an embedded NATS server for the tutorial, then connects two clients: one sender and one echo peer.

package main

import (
	"context"
	"crypto/sha256"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"log"
	"time"

	"github.com/nats-io/nats-server/v2/server"
	"github.com/nats-io/nats.go"
)

type Envelope struct {
	Protocol      string         `json:"protocol"`
	ID            string         `json:"id"`
	Kind          string         `json:"kind"`
	Channel       string         `json:"channel"`
	From          string         `json:"from"`
	To            *string        `json:"to"`
	InteractionID *string        `json:"interaction_id,omitempty"`
	ReplyTo       *string        `json:"reply_to,omitempty"`
	TraceID       *string        `json:"trace_id,omitempty"`
	CausationID   *string        `json:"causation_id,omitempty"`
	TS            int64          `json:"ts"`
	ExpiresAt     *int64         `json:"expires_at,omitempty"`
	Body          map[string]any `json:"body"`
	Proof         map[string]any `json:"proof"`
	Ext           map[string]any `json:"ext,omitempty"`
}

func main() {
	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
	defer cancel()

	ns, url := startNATS()
	defer ns.Shutdown()

	sender, err := nats.Connect(url)
	if err != nil {
		log.Fatalf("connect sender: %v", err)
	}
	defer sender.Close()

	echo, err := nats.Connect(url)
	if err != nil {
		log.Fatalf("connect echo: %v", err)
	}
	defer echo.Close()

	channel := "builders"
	senderID := "sender.demo"
	echoID := "echo.demo"

	if err := subscribeEchoPeer(echo, channel, echoID); err != nil {
		log.Fatalf("subscribe echo peer: %v", err)
	}

	broadcast := envelope("msg_say_01", "say", channel, senderID, nil, map[string]any{
		"text":   "hello over NATS",
		"intent": "demo",
	})
	if err := publish(sender, broadcastSubject(channel), broadcast); err != nil {
		log.Fatalf("publish broadcast: %v", err)
	}

	interactionID := "int_whois_01"
	request := envelope("msg_whois_01", "whois", channel, senderID, &echoID, map[string]any{
		"type":  "request",
		"query": echoID,
	})

	msg, err := requestEnvelope(ctx, sender, directSubject(channel, echoID), request)
	if err != nil {
		log.Fatalf("request whois: %v", err)
	}

	var response Envelope
	if err := json.Unmarshal(msg.Data, &response); err != nil {
		log.Fatalf("decode response: %v", err)
	}

	fmt.Printf("received %s reply_to=%s from=%s\n", response.Kind, value(response.ReplyTo), response.From)
}

func startNATS() (*server.Server, string) {
	ns, err := server.NewServer(&server.Options{
		Host: "127.0.0.1",
		Port: -1,
	})
	if err != nil {
		log.Fatalf("create nats server: %v", err)
	}
	ns.Start()
	if !ns.ReadyForConnections(5 * time.Second) {
		log.Fatal("nats server did not become ready")
	}
	return ns, ns.ClientURL()
}

func subscribeEchoPeer(conn *nats.Conn, channel string, peerID string) error {
	if _, err := conn.Subscribe(broadcastSubject(channel), func(msg *nats.Msg) {
		var env Envelope
		if err := json.Unmarshal(msg.Data, &env); err == nil {
			fmt.Printf("echo saw broadcast %s from=%s\n", env.Kind, env.From)
		}
	}); err != nil {
		return err
	}

	_, err := conn.Subscribe(directSubject(channel, peerID), func(msg *nats.Msg) {
		var req Envelope
		if err := json.Unmarshal(msg.Data, &req); err != nil {
			return
		}
		if req.Kind != "whois" || msg.Reply == "" {
			return
		}
		response := envelope("msg_whois_response_01", "whois", req.Channel, peerID, &req.From, map[string]any{
			"type": "response",
			"peer_card": map[string]any{
				"peer_id":                peerID,
				"display_name":           "Echo Demo",
				"profiles_supported":     []string{"agh-network/v0"},
				"capabilities":           []string{"echo"},
				"artifacts_supported":    []string{},
				"trust_modes_supported":  []string{},
			},
		})
		response.ReplyTo = &req.ID
		_ = publish(conn, msg.Reply, response)
	})
	if err != nil {
		return err
	}
	return conn.Flush()
}

func envelope(id, kind, channel, from string, to *string, body map[string]any) Envelope {
	return Envelope{
		Protocol: "agh-network/v0",
		ID:       id,
		Kind:     kind,
		Channel:  channel,
		From:     from,
		To:       to,
		TS:       time.Now().UTC().Unix(),
		Body:     body,
		Proof:    nil,
	}
}

func publish(conn *nats.Conn, subject string, env Envelope) error {
	payload, err := json.Marshal(env)
	if err != nil {
		return fmt.Errorf("marshal envelope: %w", err)
	}
	if err := conn.Publish(subject, payload); err != nil {
		return fmt.Errorf("publish %s: %w", subject, err)
	}
	return conn.Flush()
}

func requestEnvelope(ctx context.Context, conn *nats.Conn, subject string, env Envelope) (*nats.Msg, error) {
	payload, err := json.Marshal(env)
	if err != nil {
		return nil, fmt.Errorf("marshal request envelope: %w", err)
	}
	return conn.RequestWithContext(ctx, subject, payload)
}

func broadcastSubject(channel string) string {
	return "agh.network.v0." + channel + ".broadcast"
}

func directSubject(channel string, peerID string) string {
	return "agh.network.v0." + channel + ".peer." + routeToken(peerID)
}

func routeToken(peerID string) string {
	sum := sha256.Sum256([]byte(peerID))
	return hex.EncodeToString(sum[:16])
}

func value(ptr *string) string {
	if ptr == nil {
		return ""
	}
	return *ptr
}

Language-agnostic pseudocode:

start local nats server
connect sender client
connect echo client

channel = "builders"
sender_id = "sender.demo"
echo_id = "echo.demo"

echo subscribes to:
  "agh.network.v0.builders.broadcast"
  "agh.network.v0.builders.peer." + route_token(echo_id)

sender publishes say envelope to:
  "agh.network.v0.builders.broadcast"

sender sends whois request envelope to:
  "agh.network.v0.builders.peer." + route_token(echo_id)

echo receives whois request:
  build whois response envelope
  set response.reply_to = request.id
  set response.interaction_id = request.interaction_id
  publish response to the NATS reply subject

sender decodes response
assert response.reply_to == request.id

Keep NATS replies secondary

The example uses NATS request-reply so the tutorial can run in one process. Do not treat the NATS reply subject as the protocol correlation model. AGH Network still relies on:

Envelope fieldRole
idUnique message identity.
reply_toMessage being answered.
interaction_idLogical interaction thread.
trace_idLarger operational flow, when one exists.

For v1 verified peers, the route token changes to the verified fingerprint and the prefix becomes agh.network.v1; see the NATS binding reference. The core envelope correlation fields stay authoritative.

Verify it works

Run the program:

go run ./nats-transport.go

Expected output:

echo saw broadcast say from=sender.demo
received whois reply_to=msg_whois_01 from=echo.demo

You now have a transport-level peer. The next step is to add trust verification.

On this page