Building a custom node in Go
Write, test, and register a custom m9m node in Go — the interface, the lifecycle, the gotchas, and a complete example node.
m9m ships 32 built-in node types and supports inline JavaScript / Python for one-off logic. When you need something more — a vendor SDK that doesn’t expose clean HTTP, a performance-critical transform, or a protocol like Kafka or NATS — you write a Go node. This guide walks through the interface, a complete example, testing, and how to register the node with m9m.
The node interface
Every node satisfies three methods:
type Node interface {
// Describe returns the node's type name, input/output schema, and parameter schema.
Describe() NodeDescriptor
// Validate returns an error if the supplied params are malformed.
// Called at workflow-load time, before any Execute call.
Validate(params map[string]any) error
// Execute runs the node against the provided context and inputs.
// Returns one or more output items, or an error.
Execute(ctx context.Context, input ExecInput) (ExecOutput, error)
}
Three methods, no hidden lifecycle. Nodes are stateless — all state lives in the context and inputs, and is passed forward via outputs.
A complete example: a Kafka producer node
Say you need to publish messages to Kafka from a workflow. The built-in nodes don’t cover it; the inline JS node is awkward for long-lived producer connections. A Go node is the right tool.
package kafka
import (
"context"
"encoding/json"
"fmt"
"github.com/neul-labs/m9m/sdk/node"
"github.com/segmentio/kafka-go"
)
type ProducerNode struct {
writers map[string]*kafka.Writer // keyed by broker+topic
}
func New() *ProducerNode {
return &ProducerNode{writers: map[string]*kafka.Writer{}}
}
func (n *ProducerNode) Describe() node.Descriptor {
return node.Descriptor{
Type: "kafka.produce",
DisplayName: "Kafka: produce",
Description: "Publish a message to a Kafka topic.",
Params: node.ParamsSchema{
"brokers": node.Param{Type: "string[]", Required: true},
"topic": node.Param{Type: "string", Required: true},
"key": node.Param{Type: "string", Expression: true},
"value": node.Param{Type: "any", Expression: true, Required: true},
},
}
}
func (n *ProducerNode) Validate(params map[string]any) error {
if _, ok := params["brokers"]; !ok {
return fmt.Errorf("brokers is required")
}
if _, ok := params["topic"]; !ok {
return fmt.Errorf("topic is required")
}
return nil
}
func (n *ProducerNode) Execute(ctx context.Context, in node.ExecInput) (node.ExecOutput, error) {
brokers := in.Params.StringSlice("brokers")
topic := in.Params.String("topic")
key := in.Eval("key").String()
val := in.Eval("value").Any()
payload, err := json.Marshal(val)
if err != nil {
return node.ExecOutput{}, fmt.Errorf("marshal value: %w", err)
}
w := n.writerFor(brokers, topic)
if err := w.WriteMessages(ctx, kafka.Message{Key: []byte(key), Value: payload}); err != nil {
return node.ExecOutput{}, fmt.Errorf("kafka write: %w", err)
}
return node.ExecOutput{Items: []node.Item{{Data: map[string]any{
"topic": topic, "key": key, "bytes": len(payload),
}}}}, nil
}
func (n *ProducerNode) writerFor(brokers []string, topic string) *kafka.Writer {
key := fmt.Sprintf("%v/%s", brokers, topic)
if w, ok := n.writers[key]; ok {
return w
}
w := &kafka.Writer{
Addr: kafka.TCP(brokers...),
Topic: topic,
Balancer: &kafka.Hash{},
}
n.writers[key] = w
return w
}
Two things worth naming:
in.Eval("value")evaluates an expression parameter against the workflow context. If the workflow declares"value": "{{ $json.payload }}", this is the value that comes out.- The producer is cached per
brokers+topic. m9m reuses the same node instance across runs, so expensive resources (connections, clients) should live on the struct, not be re-created per Execute.
Testing
Table-driven tests are the right shape:
func TestProducer_Execute(t *testing.T) {
cases := []struct {
name string
params map[string]any
input any
wantErr bool
}{
{"simple", map[string]any{"brokers": []string{"localhost:9092"}, "topic": "test", "value": map[string]any{"hello": "world"}}, nil, false},
{"missing topic", map[string]any{"brokers": []string{"localhost:9092"}}, nil, true},
}
for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
n := New()
err := n.Validate(tc.params)
if (err != nil) != tc.wantErr {
t.Fatalf("Validate: wantErr=%v got %v", tc.wantErr, err)
}
})
}
}
For integration tests, m9m ships an in-process runner (node.Runner) that lets you execute a node against a fake workflow context without a Kafka cluster.
Registering the node with m9m
Two paths.
Compile-in registration — add your node to a private m9m build. This is the simplest option for internal tools.
// in your custom main.go
package main
import (
"github.com/neul-labs/m9m/cmd/m9m"
"example.com/kafka"
)
func main() {
m9m.RegisterNode(kafka.New())
m9m.Run()
}
Plugin registration — compile your node as a Go plugin (buildmode=plugin) and drop it into m9m’s plugins directory. m9m loads it at startup. This is the right path for nodes you want to ship independently or share across multiple m9m installs without recompiling.
See m9m’s plugin architecture docs for the details.
Gotchas
- Don’t block Execute on external locks. Workflows can run 500 concurrently; a shared mutex becomes the bottleneck.
- Respect
ctx. Workflows can be cancelled. Any long operation should honorctx.Done(). - Return structured data. Items should be
map[string]anywith JSON-serialisable values. Don’t return Go types that only make sense inside your node. - Fail loudly at Validate, not Execute. Validate runs at workflow-load time, so users see errors immediately. Execute failures appear only when the workflow actually runs.
Related
Need help shipping agents or migrating off n8n?
Neul Labs — the team behind m9m — takes on a limited number of consulting engagements each quarter. We help teams migrate n8n workflows, build custom Go nodes, sandbox AI agents in production, and design automation platforms that don't collapse under load.