Channel

Channel #

Overview #

Channel is an AsyncAPI entity that represents a communication channel using one or more Servers. Channel is protocol-agnostic, the concrete protocol is set by the Server object that this channel is bound to. However, the channel may contain protocol-specific properties, see Channel bindings.

Typically, a channel may be one-directional or bidirectional. It also may have no direction, which means it acts mostly like a stub. This depends on publish and subscribe sections in document definition.

The generated channel code contains some common methods and fields. Depending on the channel direction, the channel code contains proper publisher/subscriber fields and methods to send/receive Message envelopes.

The channel code also contains an interface for servers bound to this channel. All servers that are bound to this channel in the AsyncAPI document are complied to this interface (we automatically append the needed methods to the server code during generation).

And finally, the channel code contains a convenience method to open the channel to given servers.

By default, the channel code is generated in the channels package.

Minimal example
channels:
  myChannel:
    description: My channel
    publish:
      message:
        payload:
          type: object
          properties:
            value:
              type: string

servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka
package channels

import (
	"context"
	"errors"
	run "github.com/bdragon300/go-asyncapi/run"
	kafka "github.com/bdragon300/go-asyncapi/run/kafka"
)

func MyChannelName() run.ParamString {
	return run.ParamString{Expr: "myChannel"}
}

type myChannelKafkaServer interface {
	OpenMyChannelKafka(ctx context.Context) (*MyChannelKafka, error)
	Producer() kafka.Producer
}

func OpenMyChannelKafka(ctx context.Context, servers ...myChannelKafkaServer) (*MyChannelKafka, error) {
	if len(servers) == 0 {
		return nil, run.ErrEmptyServers
	}
	name := MyChannelName()
	var prod []kafka.Producer
	for _, srv := range servers {
		prod = append(prod, srv.Producer())
	}
	pubs, err := run.GatherPublishers[kafka.EnvelopeWriter, kafka.Publisher, kafka.ChannelBindings](ctx, name, nil, prod)

	if err != nil {
		return nil, err
	}
	pub := run.PublisherFanOut[kafka.EnvelopeWriter, kafka.Publisher]{Publishers: pubs}
	ch := NewMyChannelKafka(pub)
	return ch, nil
}
func NewMyChannelKafka(publisher kafka.Publisher) *MyChannelKafka {
	res := MyChannelKafka{
		name:      MyChannelName(),
		publisher: publisher,
	}
	res.topic = res.name.String()
	return &res
}

// MyChannelKafka -- my channel
type MyChannelKafka struct {
	name      run.ParamString
	publisher kafka.Publisher
	topic     string
}

func (m MyChannelKafka) Name() run.ParamString {
	return m.name
}
func (m MyChannelKafka) Close() (err error) {
	if m.publisher != nil {
		err = errors.Join(err, m.publisher.Close())
	}
	return
}
func (m MyChannelKafka) Topic() string {
	return m.topic
}
func (m MyChannelKafka) Publisher() kafka.Publisher {
	return m.publisher
}
func (m MyChannelKafka) Publish(ctx context.Context, envelopes ...kafka.EnvelopeWriter) error {
	return m.publisher.Send(ctx, envelopes...)
}
func (m MyChannelKafka) SealEnvelope(envelope kafka.EnvelopeWriter, message *MyChannelMessageOut) error {
	envelope.ResetPayload()

	if err := message.MarshalKafkaEnvelope(envelope); err != nil {
		return err
	}
	envelope.SetTopic(m.topic)
	return nil
}

Pay attention to OpenMyChannelKafka method, generated to comply the myChannelKafkaServer interface in channel code.

package servers

import (
	"context"
	channels "myproject/channels"
	run "github.com/bdragon300/go-asyncapi/run"
	kafka "github.com/bdragon300/go-asyncapi/run/kafka"
)

func MyServerURL() run.ParamString {
	return run.ParamString{Expr: "kafka://localhost:9092"}
}

type MyServerBindings struct{}

func (m MyServerBindings) Kafka() kafka.ServerBindings {
	b := kafka.ServerBindings{SchemaRegistryURL: "http://localhost:8081"}
	return b
}
func NewMyServer(producer kafka.Producer, consumer kafka.Consumer) *MyServer {
	return &MyServer{
		consumer: consumer,
		producer: producer,
	}
}

type MyServer struct {
	producer kafka.Producer
	consumer kafka.Consumer
}

func (m MyServer) Name() string {
	return "myServer"
}
func (m MyServer) OpenMyChannelKafka(ctx context.Context) (*channels.MyChannelKafka, error) {
	return channels.OpenMyChannelKafka(ctx, m)
}
func (m MyServer) Producer() kafka.Producer {
	return m.producer
}
func (m MyServer) Consumer() kafka.Consumer {
	return m.consumer
}

Document scope #

A channel can be defined in two places in the AsyncAPI document:

  • channels section
  • components.channels section

Channels in channels produce the code. Channels in components.channels are just reusable objects, that produce the code only being referred somewhere in the channels section. Therefore, if such a channel is not referred anywhere, it will not be generated at all.

So, in the example below, only the spam and eggs are considered, fooChannel will be ignored:

Example
channels:
  spam:  # <-- will be generated
    publish:
      message:
        payload:
        type: string
  eggs:  # <-- will be generated
    $ref: '#/components/channels/eggsChannel'
    
components:
  channels:
    eggsChannel:  # <-- will be generated as `eggs` (it's referred by the `eggs` channel)
      subscribe:
        message:
          payload:
            type: string
    fooChannel:  # <-- will NOT be generated (does not appear in the `channels` section)
      publish:
        message:
          payload:
            type: integer

In a similar way, only the servers from the servers section are considered. See the Servers for more details.

Operation #

x-ignore #

If this extra field is set to true, the operation will not be generated. All references to objects in this operation in the generated code (if any) are replaced by Go any type.

Example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  myChannel:
    publish:
      x-ignore: true
      message:
        payload:
          type: string

Channel parameters #

Channel parameters are variables that are substituted in the channel name during channel opening. This is useful when the channel name is not known at the time of the code generation. For example, when channel name sets the topic or queue to use, and you want to determine it at runtime.

The channel parameters are defined in the parameters section and are substituted to the appropriate placeholders, enclosed in curly braces.

Channel parameters example
channels:
  mychannel_{variant}:
    description: My channel
    parameters:
      variant:
        schema:
          type: string
    publish:
      message:
        payload:
          type: object
          properties:
            value:
              type: string

variant.go:

package channels

import "fmt"

type Variant struct {
	Value string
}

func (v Variant) Name() string {
	return "variant"
}
func (v Variant) String() string {
	return fmt.Sprint(v.Value)
}

mychannel_variant.go:

package channels

//...

type MychannelVariantParameters struct {
	Variant Variant
}

func MychannelVariantName(params MychannelVariantParameters) run.ParamString {
	paramMap := map[string]string{params.Variant.Name(): params.Variant.String()}
	return run.ParamString{
		Expr:       "mychannel_{variant}",
		Parameters: paramMap,
	}
}

type mychannelVariantKafkaServer interface {
	OpenMychannelVariantKafka(ctx context.Context, params MychannelVariantParameters) (*MychannelVariantKafka, error)
	Producer() kafka.Producer
}

func OpenMychannelVariantKafka(ctx context.Context, params MychannelVariantParameters, servers ...mychannelVariantKafkaServer) (*MychannelVariantKafka, error) {
//...
}
func NewMychannelVariantKafka(params MychannelVariantParameters, publisher kafka.Publisher) *MychannelVariantKafka {
	res := MychannelVariantKafka{
		name:      MychannelVariantName(params),
		publisher: publisher,
	}
	res.topic = res.name.String()
	return &res
}

//...
channelParams := MychannelVariantParameters{
	Variant: Variant{Value: "foo"}
}
channelName := MychannelVariantName(channelParams)
fmt.Println(channelName) 
// Output: mychannel_foo
channel, err := OpenMychannelVariantKafka(context.Background(), channelParams, myServer)
if err != nil {
    log.Fatalf("Failed to open channel: %v", err)
}
defer channel.Close()

x-go-name #

Explicitly set the name of the parameter in generated code. By default, the Go name is taken from a parameter name.

Example
channels:
  mychannel_{variant}:
    description: My channel
    parameters:
      variant:
        schema:
          type: string
        x-go-name: VariantName
    publish:
      message:
        payload:
          type: object
          properties:
            value:
              type: string

Channel bindings, operation bindings #

Channel bindings are protocol-specific properties that are used to define the channel behavior. They are defined in the bindings section of the channel definition.

Channel bindings example
channels:
  mychannel:
    description: My channel
    publish:
      message:
        payload:
          type: object
          properties:
            value:
              type: string
      bindings:
        kafka:
          clientId:  # This should contain jsonschema definition according to AsyncAPI spec 
            type: string
            default: "my-client"
    bindings:
      kafka:
        partitions: 64
package channels

//...

type MychannelBindings struct{}

func (m MychannelBindings) Kafka() kafka.ChannelBindings {
	b := kafka.ChannelBindings{Partitions: 64}
	clientID := "{\"default\":\"my-client\",\"type\":\"string\"}"
	_ = json.Unmarshal([]byte(clientID), &b.PublisherBindings.ClientID)
	return b
}

//...

The “Open channel” function automatically uses the bindings, if any:

channel, err := OpenMychannelKafka(context.Background(), myServer)
if err != nil {
    log.Fatalf("Failed to open channel: %v", err)
}
defer channel.Close()

At a lower level, the channel bindings are used to make a Publisher/Subscriber object:

publisher, err := producer.NewPublisher(ctx, "mychannel", MychannelBindings.Kafka())
if err != nil {
    log.Fatalf("Failed to create Kafka publisher: %v", err)
}
defer publisher.Close()

x-go-name #

This extra field is used to explicitly set the name of the channel in generated code. By default, the Go name is generated from the AsyncAPI channel name.

Example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  myChannel:
    x-go-name: FooBar
    description: My channel
    publish:
      message:
        payload:
          type: string

//...

// FooBarKafka -- my channel
type FooBarKafka struct {
    name      run.ParamString
    publisher kafka.Publisher
    topic     string
}

//...

x-ignore #

If this extra field is set to true, the channel will not be generated. All references to this channel in the generated code (if any) are replaced by Go any type.

Example
servers:
  myServer:
    url: 'kafka://localhost:9092'
    protocol: kafka

channels:
  myChannel:
    x-ignore: true
    publish:
      message:
        payload:
          type: string