Message #
Overview #
Message is the protocol-agnostic structure sent from one service to another. It is generated from message definition in the AsyncAPI document. Message can have a payload, headers, and other properties.
To be able to send or receive a message over a channel of specific protocol and specific library, it’s wrapped in the implementation-specific structure – Envelope.
The generated message code contains only payload and headers fields and methods to work with them, and also marshaling/unmarshalling methods for particular protocols. Every message has two structures: one for inbound messages and one for outbound messages.
Minimal example
servers:
myServer:
url: 'kafka://localhost:9092'
protocol: kafka
channels:
mychannel:
description: My channel
publish:
message:
$ref: '#/components/messages/myMessage'
components:
messages:
myMessage:
payload:
type: object
properties:
value:
type: string
// Code generated by go-asyncapi tool. DO NOT EDIT.
package messages
import (
encoding "myproject/encoding"
run "github.com/bdragon300/go-asyncapi/run"
kafka "github.com/bdragon300/go-asyncapi/run/kafka"
)
type MyMessagePayload string
func NewMyMessageOut() *MyMessageOut {
return &MyMessageOut{}
}
// MyMessageOut -- (Outbound Message)
type MyMessageOut struct {
Payload string
Headers map[string]any
}
func (m *MyMessageOut) MarshalKafkaEnvelope(envelope kafka.EnvelopeWriter) error {
enc := encoding.NewEncoder("application/json", envelope)
if err := enc.Encode(m.Payload); err != nil {
return err
}
envelope.SetContentType("application/json")
envelope.SetHeaders(run.Headers(m.Headers))
return nil
}
func (m *MyMessageOut) WithPayload(payload string) *MyMessageOut {
m.Payload = payload
return m
}
func (m *MyMessageOut) WithHeaders(headers map[string]any) *MyMessageOut {
m.Headers = headers
return m
}
func NewMyMessageIn() *MyMessageIn {
return &MyMessageIn{}
}
// MyMessageIn -- (Inbound Message)
type MyMessageIn struct {
Payload string
Headers map[string]any
}
func (m *MyMessageIn) UnmarshalKafkaEnvelope(envelope kafka.EnvelopeReader) error {
dec := encoding.NewDecoder("application/json", envelope)
if err := dec.Decode(&m.Payload); err != nil {
return err
}
m.Headers = map[string]any(envelope.Headers())
return nil
}
func (m *MyMessageIn) MessagePayload() string {
return m.Payload
}
func (m *MyMessageIn) MessageHeaders() map[string]any {
return m.Headers
}
Envelope #
Envelope is the protocol-specific “wrapper” for the incoming or outgoing message along with its metadata. In other words, the envelope is a message suitable for sending/receiving the data by a particular library.
See the Implementation page for more details.
Usage example
servers:
myServer:
url: 'kafka://localhost:9092'
protocol: kafka
channels:
mychannel:
description: My channel
publish:
message:
$ref: '#/components/messages/myMessage'
subscribe:
message:
$ref: '#/components/messages/myMessage'
components:
messages:
myMessage:
payload:
type: object
properties:
value:
type: string
import (
implKafka "myproject/impl/kafka"
messages "myproject/messages"
)
//...
message := messages.NewMyMessageOut().WithPayload("Hello, world!")
envelope := implKafka.NewEnvelopeOut()
if err := channel.SealEnvelope(envelope, message); err != nil {
log.Fatalf("failed to make envelope: %v", err)
}
if err := channel.Publish(cancelCtx, envelope); err != nil {
log.Fatalf("failed to publish message: %v", err)
}
import (
implKafka "myproject/impl/kafka"
messages "myproject/messages"
)
//...
err := channel.Subscribe(cancelCtx, func(envelope runKafka.EnvelopeReader) {
message := messages.NewMyMessageIn()
if err := channel.ExtractEnvelope(envelope, message); err != nil {
log.Fatalf("failed to extract a message from envelope: %v", err)
}
log.Printf("received message: %s", message.MessagePayload())
})
if err != nil {
log.Fatalf("failed to subscribe: %v", err)
}
Content type #
The encoder and decoder are used to marshal/unmarshal the message payload depends on the message content type.
By default, the content type is application/json
, but it can be changed in the message definition or globally in the
AsyncAPI document in defaultContentType
field.
Encoders and decoders #
go-asyncapi
generates the encoder and decoder code for all message types of all messages in document and
puts it to the encoding
package by default.
They are used automatically during marshaling/unmarshalling the message. You can use them manually or provide your own implementation if you like. You may also choose not to generate the encoder/decoder code at all.
Example
asyncapi: '2.6.0'
info:
title: My API
version: '1.0.0'
defaultContentType: 'application/yaml'
channels:
mychannel:
description: My channel
publish:
message:
$ref: '#/components/messages/myMessage'
components:
messages:
myMessage:
payload:
type: string
contentType: 'application/json'
myMessage2:
payload:
type: string
encode.go
package encoding
import (
"encoding/json"
yamlv3 "gopkg.in/yaml.v3"
"io"
)
type Encoder interface {
Encode(v any) error
}
var Encoders = map[string]func(w io.Writer) Encoder{
"application/json": func(w io.Writer) Encoder {
return json.NewEncoder(w)
},
"application/yaml": func(w io.Writer) Encoder {
return yamlv3.NewEncoder(w)
},
}
func NewEncoder(contentType string, w io.Writer) Encoder {
if v, ok := Encoders[contentType]; ok {
return v(w)
}
panic("Unknown content type " + contentType)
}
decode.go
package encoding
import (
"encoding/json"
yamlv3 "gopkg.in/yaml.v3"
"io"
)
type Decoder interface {
Decode(v any) error
}
var Decoders = map[string]func(r io.Reader) Decoder{
"application/json": func(r io.Reader) Decoder {
return json.NewDecoder(r)
},
"application/yaml": func(r io.Reader) Decoder {
return yamlv3.NewDecoder(r)
},
}
func NewDecoder(contentType string, r io.Reader) Decoder {
if v, ok := Decoders[contentType]; ok {
return v(r)
}
panic("Unknown content type " + contentType)
}
// ...
func (m *MyMessageOut) MarshalKafkaEnvelope(envelope kafka.EnvelopeWriter) error {
enc := encoding.NewEncoder("application/json", envelope)
if err := enc.Encode(m.Payload); err != nil {
return err
}
envelope.SetContentType("application/json")
envelope.SetHeaders(run.Headers(m.Headers))
return nil
}
// ...
func (m *MyMessageIn) UnmarshalKafkaEnvelope(envelope kafka.EnvelopeReader) error {
dec := encoding.NewDecoder("application/json", envelope)
if err := dec.Decode(&m.Payload); err != nil {
return err
}
m.Headers = map[string]any(envelope.Headers())
return nil
}
// ...
Message bindings #
Message bindings are the protocol-specific properties that are used to describe how the message is sent or received over
the channel. They are defined in the bindings
section of the message definition.
Message bindings example
servers:
myServer:
url: 'kafka://localhost:9092'
protocol: kafka
channels:
mychannel:
description: My channel
publish:
message:
$ref: '#/components/messages/myMessage'
components:
messages:
myMessage:
payload:
type: string
bindings:
kafka:
schemaIdPayloadEncoding: 'confluent'
package messages
//...
type MyMessageBindings struct{}
func (m MyMessageBindings) Kafka() kafka.MessageBindings {
b := kafka.MessageBindings{SchemaIDPayloadEncoding: "confluent"}
return b
}
//...
Message bindings are automatically used in SealEnvelope
call:
import (
implKafka "myproject/impl/kafka"
messages "myproject/messages"
)
//...
message := messages.NewMyMessageOut().WithPayload("Hello, world!")
envelope := implKafka.NewEnvelopeOut()
if err := channel.SealEnvelope(envelope, message); err != nil {
log.Fatalf("failed to make envelope: %v", err)
}
You can also use bindings manually:
import (
implKafka "myproject/impl/kafka"
)
//...
envelope := implKafka.NewEnvelopeOut()
envelope.SetBindings(messages.MyMessageBindings().Kafka())
Correlation ID #
Correlation ID is a user-defined unique identifier used to correlate the request and response messages.
Its location
field determines where the value is stored in the message—in its payload or headers.
Briefly, the location has the format:
$message.<payload|header>#/a/json/pointer
where $message
is a special keyword that means the message itself, payload
or header
is the target field, and
#/a/json/pointer
is the path to the field in JSON Pointer (RFC 6901) format.
E.g. $message.payload#/field1/10/field2
.
The go-asyncapi
is able to generate the code that sets and retrieves a correlation ID value from a message.
Despite the name, the JSONPointer path works with all formats, not only JSON—it is interpreted by the
tool as just a path through generated nested Go’s types.
Correlation ID example
components:
messages:
myMessage:
payload:
type: object
properties:
name:
type: string
age:
type: integer
correlationId:
type: string
correlationId:
location: '$message.payload#/correlationId'
package messages
//...
// MyMessageOut -- (Outbound Message)
type MyMessageOut struct {
Payload struct {
Name string
Age int
CorrelationID string
}
Headers map[string]any
}
//...
func (m MyMessageOut) SetCorrelationID(value string) {
v0 := m.Payload
v0.CorrelationID = value
m.Payload = v0
}
//...
// MyMessageIn -- (Inbound Message)
type MyMessageIn struct {
Payload struct {
Name string
Age int
CorrelationID string
}
Headers map[string]any
}
//...
func (m MyMessageIn) CorrelationID() (value string, err error) {
v0 := m.Payload
v1 := v0.CorrelationID
value = v1
return
}
message := messages.NewMyMessageOut().WithPayload("Hello, world!")
message.SetCorrelationID("123")
Symbols encoding #
AsyncAPI specification states that Correlation id location contains the
JSON Pointer after the #
symbol. According to the specification,
the JSON Pointer is a string that contains a sequence of encoded symbols separated by /
.
Encoding rules are:
- Alphanumeric characters and symbols
-
,.
,_
are written as is. - Tilda symbol
~
must be written as~0
. - Forward slash
/
must be written as~1
. - Quotes at the beginning and at the end of path item (single or double) have special meaning, see below.
- Other symbols must be percent-encoded as described in RFC 3986
using the
%
character followed by two hexadecimal digits (encoding table)
Path items wrapped in quotes (single or double) are always treated as strings. Quotes are stripped before path evaluation. The only use-case for this is to force the path item to be treated as a string, not as an integer.
E.g., $message.payload#/~0field%20_1/'10'/%22field2%22~1foo
contains three fields: ~field _1
, 10
(a string, not an integer) and "field2"/foo
.
x-go-ignore #
If set to true, the correlation id code will be ignored.
Example
components:
messages:
myMessage:
payload:
type: object
properties:
name:
type: string
age:
type: integer
correlationId:
type: string
correlationId:
location: '$message.payload#/correlationId'
x-go-ignore: true
x-go-name #
This extra field is used to explicitly set the name of the message in generated code. By default, the Go name is generated from the AsyncAPI message name by converting it to CamelCase.
Example
components:
messages:
myMessage:
x-go-name: FooBar
description: MyMessage
payload:
type: string
//...
/*
FooBarOut -- (Outbound Message)
MyMessage
*/
type FooBarOut struct {
Payload string
Headers map[string]any
}
//...
/*
FooBarIn -- (Inbound Message)
MyMessage
*/
type FooBarIn struct {
Payload string
Headers map[string]any
}
//...
x-go-ignore #
If this extra field is set to true, the message will not be generated.
All references to this message in the generated code (if any) are replaced by Go any
type.
Example
components:
messages:
myMessage:
x-go-ignore: true
description: MyMessage
payload:
type: string