RDS streams — infrastructure setup
You’ve probably heard about DynamoDB Streams, in this article we will check how we can achieve similar behaviour using cloud optimised database Amazon Aurora. (You don’t care how to set it up? Check out analysis of that approach in our other article about it.)
If this is a first time you’ve heard about DynamoDB Streams here is quick recap. DynamoDB Streams is mechanism from Amazon DynamoDB which captures all document level changes and expose them in the stream.
Motivation
We’ve heard a lot of times that teams are missing event sourcing feature in SQL databases. Especially after switching from DynamoDB. It would be cool to make that easier for them. Right? Let’s see what we can do about it.
Plan
It would be great to stream all records changes to the stream exactly like in DynamoDB. How to do it? It would be awesome to put those events into some streaming solution like Kinesis or messaging like SQS or even SNS. Yet, it’s not possible directly.
Amazon Aurora has integration with other AWS services (like S3, Lambda, ML with SageMaker and Comprehend). In this article we will focus on the integration with AWS Lambda as it looks like best solution for this particular case. One of the exclusive features in Aurora is SQL function responsible for Lambda invocation. This function is called lambda_async
which has signature presented below:
lambda_async(<Your lambda function arn>, <Your JSON payload>)
Sounds promising. Let’s see what we can achieve with it.
Environment setup
First things first. To start playing with it we have to setup our infrastructure. We won’t use AWS Console for achieving that goal to give you opportunity to play with exactly same setup as we have. This is why we have prepared IaaC using Cloudformation that can be found here.
What we are going to setup? Something like this.
Lambda function
RDS Events consumer that will log incoming events from Aurora. It’s basic function that only logs input in NodeJS. Nothing fancy here.
Network setup
We didn’t want to put any mess in your AWS account. This is why we will setup my own VPC for this scenario. Network setup which can be found here. Network layer consist of:
- VPC
- Public subnet for bastion host with Internet gateway to enable your public subnet to communicate with the Internet
- Private subnets for Aurora with Single NAT Gateway with Elastic IP attached to enable communication from private subnets to the Internet
- Routing for private and public subnets
If you already have some VPC which has colliding CIDR blocks you will have to adjust them.
Aurora
RDS with MySQL compatible aurora cluster along with all necessary roles and bastion host setup.
Triggering lambda functions from SQL level doesn’t work with Aurora Serverless.
Networking
There are couple of things worth to note regarding setting up Aurora with AWS services. Aurora calls your lambda — and to make it possible, it has to communicate with AWS Lambda through internet. For that particular reason it is important where exactly you are going to deploy your Aurora cluster. Below there is small set of rules.
Public subnet
If you are going to setup public database you are already good to go.
On the other hand if you are going to use private database despite that DB is in public subnet it still is private. Yet it has to communicate with the internet. You have to route your traffic through NAT or VPC endpoints for integration with other AWS Services. More info can be found in AWS documentation.
Private subnet
You have to setup NAT in your public subnet and route your private subnet traffic through NAT to enable communication between Aurora instances and the Internet. (This is done in the example Cloudformation)
RDS IAM Setup
Whenever Aurora invokes lambda function it must have privileges to do it. It’s done through IAM roles. You’ve got to create a role with lambda invocation policy and assign it to the RDS as well as to put it into cluster custom parameter group.
Such role can look like this:
RDSAWSServicesRole:
Type: AWS::IAM::Role
Properties:
RoleName: RDSToAwsLambdaAccess
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- rds.amazonaws.com
Action:
- sts:AssumeRole
Path: /service-role/
Policies:
- PolicyName: LambdasAccessPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Action:
- lambda:InvokeFunction
Resource:
- !Ref RDSEventsLambdaArn
Effect: Allow
and it has to be attached to the RDS cluster through associated roles:
DbCluster:
Type: AWS::RDS::DBCluster
Properties:
...
AssociatedRoles:
- RoleArn: !GetAtt RDSAWSServicesRole.Arn
also it has to be part of parameter group
ClusterParameterGroup:
Type: AWS::RDS::DBClusterParameterGroup
Properties:
Family: "aurora-mysql5.7"
Parameters:
...
aws_default_lambda_role: !GetAtt RDSAWSServicesRole.Arn
How to deploy it?
- Log in the CLI to your AWS account
- Clone repo from here
- Update parameters in
bin/deploy.sh
- password
- bastion key - Run
./bin/deploy.sh
If you don’t have ssh key create it using this guide
How to access it?
You’ve got to connect to your RDS cluster through bastion tunnel as it is in the private subnet.
If you like to use terminal
I have occupied local 3306 port so have replaced it with 33306.
In bin/openTunnel.sh
replace ~/.ssh/bastion-key.pem
with path to your ssh key and run it. Now you can connect to mysql through localhost:33306 with the root
user and password that you have set up.
If you don’t like to use terminal
This setup will be based on usage of IntelliJ.
In IntelliJ add your data source. You can get RDS endpoint by running:
aws rds describe-db-clusters --db-cluster-identifier rds-events-cluster | jq -r '.DBClusters[].Endpoint'
Then setup your SSH tunnel
Put there bastion IP as well as path to your ssh key. You can get bastion IP by running:
aws ec2 describe-instances --filters Name=tag:Name,Values='RDS Events Bastion' | jq -r '.Reservations[].Instances[].PublicIpAddress'
Setting up DB schema
For simplicity of the example we have created simple database names flights
. It’s just example table which doesn’t matter that much.
We would like to setup scenario in which there will be notification sent whenever new route will be registered in the SQL database.
CREATE DATABASE IF NOT EXISTS `flights` DEFAULT CHARACTER SET latin1;
USE `flights`;
DROP TABLE IF EXISTS routes;
CREATE TABLE routes
(
`airportFrom` varchar(10) NOT NULL,
`airportTo` varchar(20) NOT NULL,
`connectingAirport` varchar(10),
`newRoute` bit DEFAULT 0,
`seasonalRoute` bit DEFAULT 0,
`operator` varchar(10) NOT NULL,
`group` varchar(10) NOT NULL,
`tags` TEXT,
`similarArrivalAirportCodes` TEXT,
`carrierCode` varchar(10) NOT NULL,
PRIMARY KEY (airportFrom, airportTo),
INDEX (airportFrom),
INDEX (airportTo)
) ENGINE = InnoDB
DEFAULT CHARSET = latin1;
We didn’t want to create custom resource to make Cloudformation scripts easy to read. If you are interested in creating such custom resource write a comment. We can extend Cloudformation with such custom resource if you are curious how to create one.
Worth to note that if you have specific user on Aurora which is used for connection. You’ve got to grant him permissions to invoke lambda functions with:
GRANT INVOKE LAMBDA ON *.* TO user@domain-or-ip-address
Setting up triggers
The whole purpose of this article was to create SQL mechanism to track all changes in the data. We can use native SQL mechanism for that and setup triggers. DynamoDB emits events for every data change with old and new values. How can we achieve such mechanism?
Unfortunately there is no out-of-the-box solution like in DynamoDB. We’ve got to set it up on our own. We are going to define three separate triggers for this action. It could be cumbersome for really big entities. Yet, not sure if there is any possible solution to make it work dynamically for the whole object. Fortunately we are working with structured data.
We can setup 3 types of triggers here:
- Insert trigger — to capture new items
- Delete trigger — to capture deleted items
- Update items — to capture item before and after modification. We have to add additional condition to not trigger events in case of UPDATE action which is not modifying the data itself.
Invoking lambda
Before setting up our triggers, let’s first check if we are able to invoke our lambda function. Let’s try to invoke it from SQL level:
CALL mysql.lambda_async('RDS-EVENTS-CONSUMER', '{"foo": "bar"}')
You should be able to see log in your Cloudwatch logs here
INSERT
First we have to create our trigger in the DB.
Then run test insert. You should see log in Cloudwatch Logs event payload.
DELETE
First we have to create our trigger in the DB.
Then run test delete. You should see log in Cloudwatch Logs event payload.
UPDATE
First we have to create our trigger in the DB. You could use MD5 for checking if value of any field have changed to limit number of events triggered.
Then run test update. You should see log in Cloudwatch Logs event payload.
Wrap up
In this article we have set up Aurora with AWS lambda triggers for data changes. At a first glance this looks like great and pretty useful solution. You might wonder if this is as good as it looks like.
What about usability and guarantees of this setup? Please check in my article about it here.
Many thanks to my fellow CloudDude Michal Woronowicz!