Message

Message #

Overview #

Message is the protocol-agnostic structure sent from one service to another. It is generated from message definition in the AsyncAPI document. Message can have a payload, headers, and other properties.

To be able to send or receive a message over a channel of specific protocol and specific library, it’s wrapped in the implementation-specific structure – Envelope.

The generated message code contains only payload and headers fields and methods to work with them, and also marshaling/unmarshalling methods for particular protocols. Every message has two structures: one for inbound messages and one for outbound messages.

Minimal example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  mychannel:
    description: My channel
    publish:
      message:
        $ref: '#/components/messages/myMessage'

components:
  messages:
    myMessage:
      payload:
        type: object
        properties:
          value:
            type: string
// Code generated by go-asyncapi tool. DO NOT EDIT.

package messages

import (
	encoding "myproject/encoding"
	run "github.com/bdragon300/go-asyncapi/run"
	kafka "github.com/bdragon300/go-asyncapi/run/kafka"
)

type MyMessagePayload string

func NewMyMessageOut() *MyMessageOut {
	return &MyMessageOut{}
}

// MyMessageOut --  (Outbound Message)
type MyMessageOut struct {
	Payload string
	Headers map[string]any
}

func (m *MyMessageOut) MarshalKafkaEnvelope(envelope kafka.EnvelopeWriter) error {
	enc := encoding.NewEncoder("application/json", envelope)

	if err := enc.Encode(m.Payload); err != nil {
		return err
	}
	envelope.SetContentType("application/json")
	envelope.SetHeaders(run.Headers(m.Headers))
	return nil
}
func (m *MyMessageOut) WithPayload(payload string) *MyMessageOut {

	m.Payload = payload
	return m
}
func (m *MyMessageOut) WithHeaders(headers map[string]any) *MyMessageOut {

	m.Headers = headers
	return m
}
func NewMyMessageIn() *MyMessageIn {
	return &MyMessageIn{}
}

// MyMessageIn --  (Inbound Message)
type MyMessageIn struct {
	Payload string
	Headers map[string]any
}

func (m *MyMessageIn) UnmarshalKafkaEnvelope(envelope kafka.EnvelopeReader) error {
	dec := encoding.NewDecoder("application/json", envelope)

	if err := dec.Decode(&m.Payload); err != nil {
		return err
	}
	m.Headers = map[string]any(envelope.Headers())
	return nil
}
func (m *MyMessageIn) MessagePayload() string {
	return m.Payload
}
func (m *MyMessageIn) MessageHeaders() map[string]any {
	return m.Headers
}

Envelope #

Envelope is the protocol-specific “wrapper” for the incoming or outgoing message along with its metadata. In other words, the envelope is a message suitable for sending/receiving the data by a particular library.

See the Implementation page for more details.

Usage example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  mychannel:
    description: My channel
    publish:
      message:
        $ref: '#/components/messages/myMessage'
    subscribe:
      message:
        $ref: '#/components/messages/myMessage'

components:
  messages:
    myMessage:
      payload:
        type: object
        properties:
          value:
            type: string
import (
	implKafka "myproject/impl/kafka"
	messages "myproject/messages"
)

//...

message := messages.NewMyMessageOut().WithPayload("Hello, world!")
envelope := implKafka.NewEnvelopeOut()
if err := channel.SealEnvelope(envelope, message); err != nil {
    log.Fatalf("failed to make envelope: %v", err)
}
if err := channel.Publish(cancelCtx, envelope); err != nil {
    log.Fatalf("failed to publish message: %v", err)
}
import (
	implKafka "myproject/impl/kafka"
	messages "myproject/messages"
)

//...

err := channel.Subscribe(cancelCtx, func(envelope runKafka.EnvelopeReader) {
	message := messages.NewMyMessageIn()
    if err := channel.ExtractEnvelope(envelope, message); err != nil {
        log.Fatalf("failed to extract a message from envelope: %v", err)
    }
    log.Printf("received message: %s", message.MessagePayload())
})
if err != nil {
    log.Fatalf("failed to subscribe: %v", err)
}

Content type #

The encoder and decoder are used to marshal/unmarshal the message payload depends on the message content type. By default, the content type is application/json, but it can be changed in the message definition or globally in the AsyncAPI document in defaultContentType field.

Encoders and decoders #

go-asyncapi generates the encoder and decoder code for all message types of all messages in document and puts it to the encoding package by default.

They are used automatically during marshaling/unmarshalling the message. You can use them manually or provide your own implementation if you like. You may also choose not to generate the encoder/decoder code at all.

Example
asyncapi: '2.6.0'
info:
  title: My API
  version: '1.0.0'
defaultContentType: 'application/yaml'

channels:
  mychannel:
    description: My channel
    publish:
      message:
        $ref: '#/components/messages/myMessage'

components:
  messages:
    myMessage:
      payload:
        type: string
      contentType: 'application/json'
    myMessage2:
      payload:
        type: string

encode.go

package encoding

import (
	"encoding/json"
	yamlv3 "gopkg.in/yaml.v3"
	"io"
)

type Encoder interface {
	Encode(v any) error
}

var Encoders = map[string]func(w io.Writer) Encoder{
	"application/json": func(w io.Writer) Encoder {
		return json.NewEncoder(w)
	},
	"application/yaml": func(w io.Writer) Encoder {
		return yamlv3.NewEncoder(w)
	},
}

func NewEncoder(contentType string, w io.Writer) Encoder {
	if v, ok := Encoders[contentType]; ok {
		return v(w)
	}
	panic("Unknown content type " + contentType)
}

decode.go

package encoding

import (
	"encoding/json"
	yamlv3 "gopkg.in/yaml.v3"
	"io"
)

type Decoder interface {
	Decode(v any) error
}

var Decoders = map[string]func(r io.Reader) Decoder{
	"application/json": func(r io.Reader) Decoder {
		return json.NewDecoder(r)
	},
	"application/yaml": func(r io.Reader) Decoder {
		return yamlv3.NewDecoder(r)
	},
}

func NewDecoder(contentType string, r io.Reader) Decoder {
	if v, ok := Decoders[contentType]; ok {
		return v(r)
	}
	panic("Unknown content type " + contentType)
}
// ...
func (m *MyMessageOut) MarshalKafkaEnvelope(envelope kafka.EnvelopeWriter) error {
	enc := encoding.NewEncoder("application/json", envelope)

	if err := enc.Encode(m.Payload); err != nil {
		return err
	}
	envelope.SetContentType("application/json")
	envelope.SetHeaders(run.Headers(m.Headers))
	return nil
}

// ...

func (m *MyMessageIn) UnmarshalKafkaEnvelope(envelope kafka.EnvelopeReader) error {
    dec := encoding.NewDecoder("application/json", envelope)
    
    if err := dec.Decode(&m.Payload); err != nil {
        return err
    }
    m.Headers = map[string]any(envelope.Headers())
    return nil
}

// ...

Message bindings #

Message bindings are the protocol-specific properties that are used to describe how the message is sent or received over the channel. They are defined in the bindings section of the message definition.

Message bindings example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  mychannel:
    description: My channel
    publish:
      message:
        $ref: '#/components/messages/myMessage'

components:
  messages:
    myMessage:
      payload:
        type: string
      bindings:
        kafka:
          schemaIdPayloadEncoding: 'confluent'
package messages

//...

type MyMessageBindings struct{}

func (m MyMessageBindings) Kafka() kafka.MessageBindings {
	b := kafka.MessageBindings{SchemaIDPayloadEncoding: "confluent"}
	return b
}

//...

Message bindings are automatically used in SealEnvelope call:


import (
    implKafka "myproject/impl/kafka"
    messages "myproject/messages"
)

//...

message := messages.NewMyMessageOut().WithPayload("Hello, world!")
envelope := implKafka.NewEnvelopeOut()
if err := channel.SealEnvelope(envelope, message); err != nil {
    log.Fatalf("failed to make envelope: %v", err)
}

You can also use bindings manually:


import (
    implKafka "myproject/impl/kafka"
)

//...

envelope := implKafka.NewEnvelopeOut()
envelope.SetBindings(messages.MyMessageBindings().Kafka())

Correlation ID #

Correlation ID is a user-defined unique identifier used to correlate the request and response messages. Its location field determines where the value is stored in the message—in its payload or headers. Briefly, the location has the format:

$message.<payload|header>#/a/json/pointer

where $message is a special keyword that means the message itself, payload or header is the target field, and #/a/json/pointer is the path to the field in JSON Pointer (RFC 6901) format. E.g. $message.payload#/field1/10/field2.

The go-asyncapi is able to generate the code that sets and retrieves a correlation ID value from a message. Despite the name, the JSONPointer path works with all formats, not only JSON—it is interpreted by the tool as just a path through generated nested Go’s types.

Correlation ID example
components:
  messages:
    myMessage:
      payload:
        type: object
        properties:
          name:
            type: string
          age:
            type: integer
          correlationId:
            type: string
      correlationId:
        location: '$message.payload#/correlationId'
package messages

//...

// MyMessageOut --  (Outbound Message)
type MyMessageOut struct {
	Payload struct {
		Name          string
		Age           int
		CorrelationID string
	}
	Headers map[string]any
}

//...

func (m MyMessageOut) SetCorrelationID(value string) {
	v0 := m.Payload
	v0.CorrelationID = value
	m.Payload = v0
}

//...

// MyMessageIn --  (Inbound Message)
type MyMessageIn struct {
	Payload struct {
		Name          string
		Age           int
		CorrelationID string
	}
	Headers map[string]any
}

//...

func (m MyMessageIn) CorrelationID() (value string, err error) {
	v0 := m.Payload
	v1 := v0.CorrelationID
	value = v1
	return
}
message := messages.NewMyMessageOut().WithPayload("Hello, world!")
message.SetCorrelationID("123")

Symbols encoding #

AsyncAPI specification states that Correlation id location contains the JSON Pointer after the # symbol. According to the specification, the JSON Pointer is a string that contains a sequence of encoded symbols separated by /.

Encoding rules are:

  • Alphanumeric characters and symbols -, ., _ are written as is.
  • Tilda symbol ~ must be written as ~0.
  • Forward slash / must be written as ~1.
  • Quotes at the beginning and at the end of path item (single or double) have special meaning, see below.
  • Other symbols must be percent-encoded as described in RFC 3986 using the % character followed by two hexadecimal digits (encoding table)

Path items wrapped in quotes (single or double) are always treated as strings. Quotes are stripped before path evaluation. The only use-case for this is to force the path item to be treated as a string, not as an integer.

E.g., $message.payload#/~0field%20_1/'10'/%22field2%22~1foo contains three fields: ~field _1, 10 (a string, not an integer) and "field2"/foo.

x-go-ignore #

If set to true, the correlation id code will be ignored.

Example
components:
  messages:
    myMessage:
      payload:
        type: object
        properties:
          name:
            type: string
          age:
            type: integer
          correlationId:
            type: string
      correlationId:
        location: '$message.payload#/correlationId'
        x-go-ignore: true

x-go-name #

This extra field is used to explicitly set the name of the message in generated code. By default, the Go name is generated from the AsyncAPI message name by converting it to CamelCase.

Example
components:
  messages:
    myMessage:
      x-go-name: FooBar
      description: MyMessage
      payload:
        type: string
//...

/*
FooBarOut --  (Outbound Message)
MyMessage
*/
type FooBarOut struct {
	Payload string
	Headers map[string]any
}

//...

/*
FooBarIn --  (Inbound Message)
MyMessage
*/
type FooBarIn struct {
	Payload string
	Headers map[string]any
}

//...

x-go-ignore #

If this extra field is set to true, the message will not be generated. All references to this message in the generated code (if any) are replaced by Go any type.

Example
components:
  messages:
    myMessage:
      x-go-ignore: true
      description: MyMessage
      payload:
        type: string