AWS & OpenTelemetry: SQS context propagation

Marcin Sodkiewicz
6 min readMar 17, 2024

--

How to instrument your AWS async processing based on SQS with OpenTelemetry.

Motivation

The majority of resources on context propagation in distributed tracing relate to synchronous communication. The situation there is fairly straightforward, as we are dealing with highly coupled services communicating directly over HTTP.

In the case of HTTP communication, after instrumenting your http client of choice, it is as simple as injecting headers at the sender end and injecting them into the context at the receiver end.

Which headers? It depends on the propagation standard, but typically it’s W3C standard. In other words: traceparent and tracestate .

Such header contains information about:

  • version
  • trace-id
  • parent-id
  • trace-flags
source: https://doordash.engineering/wp-content/uploads/2021/06/image5-1.png

Async communication instrumentation

In the case of async communication, we have to do exactly the same. Send over information about our traceparent and tracestate on the producer end and apply it on the context of the consumer.

How can we send it?

Asynchronous communication is not as standardized as HTTP communication. We have 2 options in here. Either we are sending it over a payload or through context (when it’s available).

Sending over payload

There is a lack of industry-wide standards for having a unified message “envelope”. In my opinion, it is best practice to stick to some organization-wide format that could contain event metadata such as:

  • event version
  • producer
  • event type
  • tracing context

If that’s interesting to you, take a look at https://cloudevents.io/ and https://www.asyncapi.com/blog/asyncapi-cloud-events

Why do I think it’s good to send over that information? Because sending tracing information in the event attributes works inconsistently. Therefore it’s good practice to add that metadata.

Sending over context

What in case you already have an existing event-based system and there is no room for refactoring to add metadata? You can try to send your tracing information alongside your payload. This article focuses on SQS. So, given this service, we can send this information in message attributes. This is exactly what X-Ray integration does.

The big difference here is that our communication parties are not coupled together and communications are not standardized. It means that it will vary depending on whether we are sending events over SNS, SQS, EventBridge, or Kinesis. We are going to focus on SQS now.

In OpenTelemetry terms, we can perform two operations:

  • Inject — put info about the current tracing context into some carrier
  • Extract — apply info from some carrier into the current tracing context

How to serialize the current tracing context?

Usually, you can just register propagators and they will inject context on your behalf. In case you would like to get W3C headers programmatically and with prepared headers you can simply use such a call that will inject those headers into a map.

import (
"go.opentelemetry.io/otel/propagation"
)

...

carrier := make(map[string]string)
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(carrier))

// NOW carrier contains 'traceparent' and 'tracestate' that can be accessed with:
// carrier["traceparent"]
// carrier["tracestate"]

Since we are in the context of SQS. How we can send that information?

We can do it like in the example below and simply build message attributes that can be injected into the SQS request payload.

import (
"context"
"github.com/aws/aws-lambda-go/events"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"go.opentelemetry.io/otel/propagation"
)

const (
traceparentHeader = "traceparent"
tracestateHeader = "tracestate"
)

type TracingContext struct {
TraceParent string `json:"traceparent"`
TraceState string `json:"tracestate"`
}

func RetrieveContext(ctx context.Context) TracingContext {
carrier := make(map[string]string)
propagation.TraceContext{}.Inject(ctx, propagation.MapCarrier(carrier))

return TracingContext{
TraceParent: carrier[traceparentHeader],
TraceState: carrier[tracestateHeader],
}
}

func (receiver TracingContext) BuildMessageAttributes() map[string]types.MessageAttributeValue {
attributes := make(map[string]types.MessageAttributeValue)
if receiver.TraceParent != "" {
attributes[traceparentHeader] = types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: &receiver.TraceParent,
}
}
if receiver.TraceState != "" {
attributes[tracestateHeader] = types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: &receiver.TraceState,
}
}
return attributes
}

Example of sending in a single message:

sqsClient.SendMessage(ctx, &sqs.SendMessageInput{
QueueUrl: &queueurl,
MessageBody: aws.String(body),
MessageAttributes: RetrieveContext(ctx).BuildMessageAttributes(),
})

Example of applying into a batch message entry:

return types.SendMessageBatchRequestEntry{
Id: aws.String(&messageId),
MessageBody: aws.String(body),
MessageAttributes: RetrieveContext(ctx).BuildMessageAttributes(),
}

How to deserialize and apply it to the current tracing context?

We know already how to pass that information alongside our request. How to then apply it to processing context? Every message loaded from SQS has a payload, as well as, message attributes. In our case, tracing information that was added on the producer end. Now we have to use Extract method of our OTEL propagator. Which might look like that:

import (
"context"
"github.com/aws/aws-lambda-go/events"
"go.opentelemetry.io/otel/propagation"
)

const (
traceparentHeader = "traceparent"
tracestateHeader = "tracestate"
)

func ExtractContextFromMessageAttributes(ctx context.Context, event events.SQSMessage) context.Context {
attributes := make(map[string]string)
if val, ok := event.MessageAttributes[traceparentHeader]; ok {
attributes[traceparentHeader] = *val.StringValue
}
if val, ok := event.MessageAttributes[tracestateHeader]; ok {
attributes[tracestateHeader] = *val.StringValue
}

if len(attributes) == 0 {
return ctx
}

return propagation.TraceContext{}.Extract(ctx, propagation.MapCarrier(attributes))
}

Then we can use returned context in processing. All instrumentations and operations done based on that new context are going to be assigned to the span that was used during sending an event.

Test scenario

So right now let’s talk a bit about root spans in async processing. This is super important as it defines whether you are going to see the whole trace and whether it will contain meaningful information.

Let’s consider such a scenario:

Test scenario

We are going to schedule x tasks amount of tasks that will be then just proxied over and at the end, we are going to have a consumer that will fail as many times as it was planned during task scheduling — for simplicity let's say it will be x amount of times.

This is how it’s going to look like in our waterfall.

Span names are set like that only for ease of reading this trace. Do not name spans like that. Use attributes for context instrumentation.

Imagine now that there were 10 tasks. It will be barely readable. What in this case we would create a new root span for each “logical” processing?

Logical root spans

After creating a new root span for each of the messages generated it might look like on the screens below. We have 3 separate traces with full history for each of them. We should instrument our trace with proper attributes specific to our domain. For example: orderId, flightId, shoppingCartId, customerId, whatever is relevant.

Message x failing x times with trace per message

With the previous approach if we are going to look for traces annotated with our domain value we are going to find huge traces with many irrelevant processing flows, so with the new approach we are improving readability.

Another important thing here is sampling , especially in the case of head-based sampling. In that case, if our trace is not tracked we are going to lose visibility from all of the child spans. In case of splitting them into multiple logical traces, you will have a sample of your processing.

Single trace vs trace per logical processing

You can start a new root span in Go using trace.WithNewRoot()

tracer.Start(ctx, spanName, trace.WithNewRoot())

Summary

This was 1st part of a series of articles about the instrumentation of async communication using AWS Services. The next articles are going to cover services like SNS, EventBridge, Kinesis, and others.

I hope you liked it and you have learned something new.

Happy instrumenting!

--

--

No responses yet