SQS Partial failures
This is first part of articles about partial failures which focuses on SQS. We will discuss here why we need partial failures, how to use them and when you should use them.
Pre-requisites
First of all, let’s get some perspective on what we’re talking about here. Let’s say our business logic is that every third message should fail. This gives us a very predictable rule that simulates frequent failures. Our business logic might look like this:
func processRecord(batchMsgIndex int, sqsEvent events.SQSMessage) error {
if rand.Float32() <= 0.33 {
return fmt.Errorf("xxx> %d failure of: %s", batchMsgIndex, sqsEvent.Body)
}
fmt.Println(fmt.Sprintf("---> %d: %s", batchMsgIndex, sqsEvent.Body))
return nil
}
Let’s say we have a very simple setup. The SQS Consumer Lambda will consume batches of up to 10 items at a time. However, the maximum batch size that can be used is 10000 — which will only make failures more frequent and more severe.
Resources:
TestQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeout: 5
SqsConsumerFunction:
Type: AWS::Serverless::Function
Metadata:
BuildMethod: go1.x
Properties:
CodeUri: sqs-consumer/
Handler: bootstrap
Runtime: provided.al2
Architectures:
- arm64
Events:
SQSQueueEvent:
Type: SQS
Properties:
Queue: !GetAtt TestQueue.Arn
BatchSize: 10
MaximumBatchingWindowInSeconds: 5
Batch Size: 1
Let’s just process 1 message at once. Then it will be super easy for us.
When processing of 1 message at once won’t work?
In case that we have massive amount of messages and e.g. batch api at our disposal that can improve our cost and message processing performance.
Another case is situation where we want to limit concurrency of our lambdas and have easily estimated processing capacity.
It’s not the worst option to choose this solution as it has a super simple cognitive and implementation model. Better to choose this option and reprocess only failed items than to reprocess hundreds of messages multiple times with bad batching implementation and “fingers crossed” implementation. Example message flow:
Why do I need partial failure?
Let’s say that we don’t want to process messages one-by-one with BatchSize: 1
and want to process them in batches. The normal behaviour of the SQS consumer lambda is to replay the whole batch on failure.
Given our test business logic, this will cause a lot of failures and replay of whole batches. This implementation doesn’t make sense because it’s a “fingers crossed” implementation. Let’s do this experiment just to understand what we’re dealing with.
We could even end up with an infinite loop of processing these messages, only stopping when the amount of retires is exhausted or the message retention period expires. At least in theory, in practice there is a non-zero chance that messages will be processed at some point.
The worst thing about such an implementation is that we have processed the same messages several times. If our processing is not idempotent, we are in big trouble. But even assuming that our processing is idempotent, it may increase the cost of the solution. In addition, processing the same items multiple times simply makes no sense, and we need to address this with partial failures. Example flow:
How to use partial failures?
We need to return a special response (in the format shown below). Based on this response, AWS will either delete your message from the queue or put it back into the queue. In some programming languages, the model may be missing in the SDK, so you will need to create this model yourself.
Partial failures are also supported for DynamoDB Streams and Kinesis where we are using SequenceNumber
, but in case of SQS <id> that we are looking for is MessageId
.
{
"batchItemFailures": [
{
"itemIdentifier": "<id>"
},
{
"itemIdentifier": "<id>"
}
]
}
How same run could look like with partial failures?
Things to watch out
There are couple of things that you have to watch out for in your implementation.
Forget about returning errors from handler
My demo is written in go, but equivalent of returning error will be throwing exception in other types of runtimes / returning rejected promise. You have to change your habits and in case of any error you have to map it to partial failure records. Error can be returned only in case of error that is related to all batch items — for example due to failed processing pre-requisiste.
You can’t use lambda errors metric anymore
Since we are no longer returning errors, these executions are not counted as lambda errors anymore. This means that you have to react to alarms that are triggered based on processing lag measured by ApproximateAgeOfOldestMessage
SQS Metric or messages that are coming to Dead-Letter Queue. If you want to react upon lambda errors you can also create custom metric by reporting it or even using metric filters.
It’s a really important point to keep in mind for system observability after refactoring to partial failures.
Make sure that you report only failed items
Sounds obvious, right? So let’s say a developer implements a function that processes the SQS record. If this is the first contact with partial failures, it could cause issues. Let’s assume that solution will be similar to code below. In that case any of the options to be returned on success will cause whole batch to reprocess as it will trigger one of rules from below: invalid JSON response, null or empty string itemIdentifier
.
async function (record) {
try {
await doSomething(record)
return null / return { itemIdentifier: "" } / return { itemIdentifier: null }
} catch(e) {
return { itemIdentifier: record.messageId }
}
}
According to documentation:
Lambda treats a batch as a complete success if your function returns any of the following:
- An emptybatchItemFailures
list
- A nullbatchItemFailures
list
- An emptyEventResponse
- A nullEventResponse
Lambda treats a batch as a complete failure if your function returns any of the following:
- An invalid JSON response
- An empty stringitemIdentifier
- A nullitemIdentifier
- AnitemIdentifier
with a bad key name
- AnitemIdentifier
value with a message ID that doesn't exist
Another case might be function that will process SQS records in parallel. What one might do is to return partial record from it. There is an issue about it — what will be returned in case of success? Most probably one of invalid variants that has to be filtered out.
Make sure you’ve turned it on
Remember to turn that feature on in your template. Without it, you won’t be able to reprocess those failed messages. Why? For AWS your response is “just some response”, not a partial failure response, so you can lose events.
This feature needs to be enabled by adding additional attribute to your Events
section — please check the docs. I know it sounds like an obvious thing, but please make sure you turned it on and keep an eye on it when reviewing PRs. It’s super easy to miss.
Events:
SQSQueueEvent:
Type: SQS
Properties:
Queue: !GetAtt TestQueue.Arn
BatchSize: 10
MaximumBatchingWindowInSeconds: 5
FunctionResponseTypes:
- ReportBatchItemFailures
Processing items in batches
Processing items in batches depends on the API that you are using. For example:
Kinesis Firehose
The batch response has all the items in the response in the same order as they were sent in the request. So you can easily match the batch request item to the response, making error reporting easy.
SNS Batch
The batch response has failed items in the response. All the failures are marked by the batch item id that we used in the batch request. So you think you could use the SQS MessageId as the Id in the batch request? Not really. SNS batch item id has a limit of up to 80 characters and SQS MessageId has a limit of 100 characters — so it won’t work as it will result in a request failure. The downside here is that you have to somehow match the id used in the request to your SQS MessageId.
Dynamo Batch
The batch response has UnprocessedItems in the response that needs to be processed. In case you want to retry processing these messages, you have to map UnprocessedItems from Dynamo response to partial failure response.
Simplest implementation
First let start with simple implementation that will process items synchronously:
func handler(event events.SQSEvent) (events.SQSEventResponse, error) {
fmt.Println(fmt.Sprintf("<--- Loaded %d messages", len(event.Records)))
var failures []events.SQSBatchItemFailure
for _, record := range event.Records {
err := processRecord(record)
if err != nil {
fmt.Println(fmt.Sprintf("%s", err.Error()))
failures = append(failures, events.SQSBatchItemFailure{
ItemIdentifier: record.MessageId,
})
}
}
return events.SQSEventResponse{BatchItemFailures: failures}, nil
}
It’s quite simple implementation that would just work. There is once case when such implementation would not work. When? In case that you need…
Processing order
In case that you are using SQS FIFO — then the order in which items are processed is important. You should process the batch record in order and return the failed message AND ALL unprocessed messages in the batch. Let’s first have a look at what can go wrong.
Same implementation
With the implementation as above, we will end up with items being processed out of order. Let’s take a look at the flow below on the animation, where we got the items processed in order: 2,5, 1, 3, 7, 9, 4, 6, 8, 10.
What if we will fail after first error?
This partial failures mechanism works just fine in streams processing. What about queues? Let’s take a look.
So what are the “Lost” messages? Those messages are messages that were sent to lambda for processing, but were not reported as failures. What happens is that they won’t be returned to the queue as we are not reporting successes, but failures. What we can do about it?
Failing rest of items after failure
In order for SQS FIFO to work correctly in batches, you must return failures for all consecutive items in the batch. This is what our example implementation might look like:
func handler(event events.SQSEvent) (events.SQSEventResponse, error) {
fmt.Println(fmt.Sprintf("<--- Loaded %d messages", len(event.Records)))
var failures []events.SQSBatchItemFailure
for i, record := range event.Records {
err := processRecord(record)
if err != nil {
fmt.Println(fmt.Sprintf("%s", err.Error()))
failures = append(failures, failAllItems(event.Records[i:])...)
break
}
}
return events.SQSEventResponse{BatchItemFailures: failures}, nil
}
func failAllItems(records []events.SQSMessage) []events.SQSBatchItemFailure {
var failures []events.SQSBatchItemFailure
for _, record := range records {
fmt.Println(fmt.Sprintf("xxx> %s", record.Body))
failures = append(failures, events.SQSBatchItemFailure{
ItemIdentifier: record.MessageId,
})
}
return failures
}
And then our test run message processing will look like that:
When to use partial failures?
- Processing batches in consumer? Must have
- Using
batch
APIs? Must have. Especially that only some of the items might fail. Literally partial failure. - Costly processing of an item. For example, heavy processing that takes a long time, or using 3rd party APIs where you have a quota and you don’t want to “retry on successes”.
- Works great with batch processing in parallel with batch APIs. One thought though — from my experience of implementing partial failures in blazingly fast lambdas, I am not sure it was worth it. The complexity of the code increases, but the economic benefit is not that great. Of course it depends on the nature of your processing (as in the bullet point above) and the amount of messages your lambdas handle.
When to use partial failures in FIFO?
Not sure what the best use case is here in the case of FIFO, to be honest. Possibly limit concurrent lambda executions. Processing larger batches in the same lambda execution for some reasons like loading data that will be used for the whole message group… maybe. Usually processing FIFO items in parallel makes no sense if we want to have order of processing items in the same MessageGroup. If the order of processing doesn’t matter, maybe we don’t need FIFO at all? If you have a good use case, please take a second to put it in a comment or PM me. I would really appreciate it.
Wrap up
There are not many resources on partial failure, so I wanted to write a series of articles on it, with some simple and comprehensive examples. I hope I got it right. Let me know what you think in the comments or PM me.
What might be more tricky is dealing with partial failures in streams. This will be part of the next article in the series.