Streams partial failures
I will try not only to tell you how to do partial failure processing on streams, but also to explain other mechanisms in lambda stream processing and show how they affect it.
This is second part of articles about partial failures focused on streams processing. First part about SQS processing is here.
If you’re not interested in reading about SQS, no worries — I have duplicated some of my thoughts here to keep it an independent piece. Although I recommend you read the first part about SQS.
Pre-requisites
We will use a table with super simple structure that has only primary key named PK and has DynamoDB stream enabled as in the template below. Changes can be triggered e.g. by updating the document in DynamoDB.
TestTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: partial-failures-table
BillingMode: PAY_PER_REQUEST
AttributeDefinitions:
- AttributeName: PK
AttributeType: S
KeySchema:
- AttributeName: PK
KeyType: HASH
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
Batch Size: 1
Let’s just process 1 message at once. Then it will be super easy for us.
When processing of 1 message at once won’t work?
In case that we have massive amount of messages and e.g. batch api at our disposal that can improve our cost and message processing performance.
It’s not the worst option to choose this solution as it has a super simple cognitive and implementation model. You have to be careful about picking this option as it might slow down your processing and you might end up with suboptimal solution. Example message flow:
In a real world scenario we could even end up with an infinite loop of processing these messages — e.g. in the case of processing a malformed item. In such case processing will only stop when the number of retries is exhausted or the item retention period has expired. It’s worth noting that in the case of streams, this can even be up to 1 year, and the processing failure of a single item will block the entire shard.
Even with a shorter retention period, a terrible thing will happen. Most likely, after the bad item that blocked the whole stream has expired, other items that were waiting to be processed will also expire. Yes, unfortunately you will lose them and you will be in big trouble.
This is why it’s critical to ALWAYS define dead letter policy on any event processing.
Why do I need partial failure?
Let’s say that we don’t want to process messages one-by-one with BatchSize: 1
and want to process them in batches. The normal behaviour of the stream consumer lambda is to replay the whole batch on failure.
Having random failures we will successfully proces whole batch at some point if we will have small enough batch size. We can solve that issue using BisectOnFunctionError
property, but first things first. Let’s understand how that processing works based on examples.
Consistent failure
Let’s consider a scenario with a malformed item as in the examples above and with batches. Our setup for event source mapping will have batch size 8 and test will send 10 item updates with failure on item with value 5. We want to retry up to 3 times (just to quickly achieve dropping this event) and have defined DLQ.
Events:
DynamoStreamEvent:
Type: DynamoDB
Properties:
Stream: !GetAtt TestTable.StreamArn
BatchSize: 8
StartingPosition: LATEST
MaximumRetryAttempts: 3
MaximumBatchingWindowInSeconds: 5
DestinationConfig:
OnFailure:
Destination: !GetAtt DynamoDeadLetterQueue.Arn
With such a setup and a defined DLQ strategy, you will still end up processing the same items multiple times. If our processing is not idempotent, we are in big trouble. Even if we assume that our processing is idempotent, it can increase the cost of the solution and cause a lot of problems with stream processing.
Once the retries have been exhausted, the processing goes to the DLQ, but not only failed item. ALL ITEMS FROM THAT BATCH. Even items that were not processed even once. Message that you will receive on your DLQ may look like the one below:
{
"requestContext": {
"requestId": "3b6e466a-25d6-46ca-8902-463ec41c73f6",
"functionArn": "arn:aws:lambda:eu-west-1:123456789012:function:partial-failure-sqs-DynamoStreamConsumerFunction-ep3sUCZDIhcz:live",
"condition": "RetryAttemptsExhausted",
"approximateInvokeCount": 4
},
"responseContext": {
"statusCode": 200,
"executedVersion": "22",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2023-03-20T11:14:13.396Z",
"DDBStreamBatchInfo": {
"shardId": "shardId-00000001679310199978-66d38aa0",
"startSequenceNumber": "57621500000000018690092446",
"endSequenceNumber": "57622200000000018690092729",
"approximateArrivalOfFirstRecord": "2023-03-20T11:14:11Z",
"approximateArrivalOfLastRecord": "2023-03-20T11:14:11Z",
"batchSize": 8,
"streamArn": "arn:aws:dynamodb:eu-west-1:123456789012:table/partial-failures-table/stream/2023-03-18T20:15:01.013"
}
}
As you can see you have to process items from the range between startSequenceNumber
and endSequenceNumber
on your own. In case that order matters for your processing you might end up with corrupted state. In addition, processing of the same items multiple times simply makes no sense, and we need to address this with partial failures. Example flow:
Bisect on failure without partial failure
What we can do in this case is to bisect batches on lambda failure. This means that the processed batch is split in half and sent back for processing in Lambda. Bisection in the case of a consistent failure is repeated until the batch has a single item.
We can activate it by using BisectBatchOnFunctionError: true
in our lambda event source mapping.
Thanks to this, we can limit the number of items that we will have to re-process and we can put into DLQ only malformed items. It’s worth noting that an item may be processed more than the number of times set in the MaximumRetryAttempts
field.
Now failed item that you will receive on DLQ might look like on snippet below. It’s worth to note that now startSequenceNumber
is equal to endSequenceNumber
. Again, the DLQ message can only be processed prior to the retention of a stream item.
{
"requestContext": {
"requestId": "252fb486-02e7-40f5-be9f-642b5b2d4e87",
"functionArn": "arn:aws:lambda:eu-west-1:123456789012:function:partial-failure-sqs-DynamoStreamConsumerFunction-ep3sUCZDIhcz:live",
"condition": "RetryAttemptsExhausted",
"approximateInvokeCount": 4
},
"responseContext": {
"statusCode": 200,
"executedVersion": "22",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2023-03-20T15:47:42.548Z",
"DDBStreamBatchInfo": {
"shardId": "shardId-00000001679325392464-13074b2e",
"startSequenceNumber": "58459900000000037126079068",
"endSequenceNumber": "58459900000000037126079068",
"approximateArrivalOfFirstRecord": "2023-03-20T15:47:40Z",
"approximateArrivalOfLastRecord": "2023-03-20T15:47:40Z",
"batchSize": 1,
"streamArn": "arn:aws:dynamodb:eu-west-1:123456789012:table/partial-failures-table/stream/2023-03-18T20:15:01.013"
}
}
Partial failures
Partial failures can help you minimise the use of your resources and make your processing more efficient. It can obviously be used together with bisections, and that’s great! In this demo case it is not so spectacular because we are saving single calls, but in case of batch processing of hundreds of items this solution shines. Let’s have a look at this processing with partial failures.
How to use partial failures?
We need to return a special response (in the format shown below). Based on this response, AWS will either move stream cursor for your stream consumer or not. In some programming languages, the model may be missing in the SDK, so you will need to create this model yourself.
Partial failures are supported for DynamoDB Streams and Kinesis where we are using SequenceNumber
as <id>, but in case of SQS we were using MessageId
.
{
"batchItemFailures": [
{
"itemIdentifier": "<id>"
},
{
"itemIdentifier": "<id>"
}
]
}
Difference to SQS FIFO Processing
With SQS FIFO, we focused on one part of the process where we had to drop all unprocessed messages in the batch. Streams processing is handled differently. We can only return the id of the first failed item as the lowest value in the array will be used as the new value for our stream cursor.
Things to watch out
There are couple of things that you have to watch out for in your implementation.
Forget about returning errors from handler
My demo is written in go, but equivalent of returning error will be throwing exception in other types of runtimes / returning rejected promise. You have to change your habits and in case of any error you have to map it to partial failure records. Error can be returned only in case of error that is related to all batch items — for example due to failed processing pre-requisiste.
You can’t use lambda errors metric anymore
Since we are no longer returning errors, these executions are not counted as lambda errors anymore. This means that you have to react to alarms that are triggered based on processing lag measured by IteratorAge
Lambda Metric or messages that are coming to Dead-Letter. If you want to react upon lambda errors you can also create custom metric by reporting it or even using metric filters.
It’s a really important point to keep in mind for system observability after refactoring to partial failures.
Make sure that you report only failed items
Sounds obvious, right? So let’s say a developer implements a function that processes the SQS record. If this is the first contact with partial failures, it could cause issues. Let’s assume that solution will be similar to code below. In that case any of the options to be returned on success will cause whole batch to reprocess as it will trigger one of rules from below: invalid JSON response, null or empty string itemIdentifier
.
async function (record) {
try {
await doSomething(record)
return null / return { itemIdentifier: "" } / return { itemIdentifier: null }
} catch(e) {
return { itemIdentifier: record.change.sequenceNumber }
}
}
According to documentation:
Lambda treats a batch as a complete success if your function returns any of the following:
- An emptybatchItemFailures
list
- A nullbatchItemFailures
list
- An emptyEventResponse
- A nullEventResponse
Lambda treats a batch as a complete failure if your function returns any of the following:
- An invalid JSON response
- An empty stringitemIdentifier
- A nullitemIdentifier
- AnitemIdentifier
with a bad key name
- AnitemIdentifier
value with a message ID that doesn't exist
Another case might be function that will process stream records in parallel. What one might do is to return partial record from it. There is an issue about it — what will be returned in case of success? Most probably one of invalid variants that has to be filtered out.
Make sure you’ve turned it on
Remember to turn that feature on in your template. Without it, you won’t be able to reprocess those failed messages. Why? For AWS your response is “just some response”, not a partial failure response, so you can lose events.
This feature needs to be enabled by adding additional attribute to your Events
section — please check the docs. I know it sounds like an obvious thing, but please make sure you turned it on and keep an eye on it when reviewing PRs. It’s super easy to miss.
Events:
DynamoStreamEvent:
Type: DynamoDB
Properties:
Stream: !GetAtt SomeTable.StreamArn
BatchSize: 100
BisectBatchOnFunctionError: true
StartingPosition: LATEST
MaximumBatchingWindowInSeconds: 5
DestinationConfig:
OnFailure:
Destination: !GetAtt DynamoDeadLetterQueue.Arn
FunctionResponseTypes:
- ReportBatchItemFailures
Processing items in batches
Processing items in batches depends on the API that you are using. For example:
Kinesis Firehose
The batch response has all the items in the response in the same order as they were sent in the request. So you can easily match the batch request item to the response, making error reporting easy.
SNS Batch
The batch response has failed items in the response. All the failures are marked by the batch item id that we used in the batch request. So you think you could use the SQS MessageId as the Id in the batch request? Not really. SNS batch item id has a limit of up to 80 characters and SQS MessageId has a limit of 100 characters — so it won’t work as it will result in a request failure. The downside here is that you have to somehow match the id used in the request to your SQS MessageId.
Dynamo Batch
The batch response has UnprocessedItems in the response that needs to be processed. In case you want to retry processing these messages, you have to map UnprocessedItems from Dynamo response to partial failure response.
Simplest implementation
Simplest possible implementation in go might look like that:
func handler(ctx context.Context, event events.DynamoDBEvent) (events.DynamoDBEventResponse, error) {
var failures []events.DynamoDBBatchItemFailure
for _, record := range event.Records {
err := processRecord(record)
if err != nil {
reportFailure(ctx, record, err)
failures = append(failures, events.DynamoDBBatchItemFailure{
ItemIdentifier: record.Change.SequenceNumber,
})
break
}
}
return events.DynamoDBEventResponse{BatchItemFailures: failures}, nil
}
When to use partial failures?
- Processing batches in consumer? Must have
- Using
batch
APIs? Must have. Especially that only some of the items might fail. Literally partial failure. - Costly processing of an item. For example, heavy processing that takes a long time, or using 3rd party APIs where you have a quota and you don’t want to “retry on successes”.
Wrap up
This was the last part of small series about partial failures in events processing. I hope you have learned something new reading this article.
Let me know what you think about it in comments or just PM me.