• Skip to main content
  • Skip to primary sidebar
BMA

BeMyAficionado

Inspire Affection

AWS Serverless Event Driven Data Ingestion from Multiple and Diverse Sources

April 15, 2024 by varunshrivastava Leave a Comment

At times there is a requirement to centralize data from different sources. This could be required for various needs,

  • maybe you have to aggregate different data sources to perform better analytics
  • Or, you are building a system ground up that utilizes the old data, performs transformations and stores it into the new system in near real time

There could be any number of reason behind data ingestion. And in this article I will explore a very cost effective, scalable and completely serverless architecture that could do just that. I have used AWS cloud for the same but I’m sure you can find a similar services with your native cloud provider. The architecture and principles remains the same.

Table of Contents

  • Prerequisite
  • Limitations
  • Architecture
    • Data Flow in the System
      • 1. Data Sources and Initial Trigger
      • 2. API Gateway and Lambda Function
      • 3. AWS Lambda Functions as APIs
      • 4. Data Routing with SNS to Queues
      • 5. Individual Queue Per Data Source
      • 6. Data Processing
      • 7. Data Storage
      • Error Handling
      • Key Architectural Benefits
  • Code – Infrastructure with Terraform
    • lambda.tf
    • apis.tf
    • roles.tf
    • s3.tf
    • sqs.tf
    • sns.tf
    • dynamodb.tf
  • Code – Ingestion API Lambda
  • Code – SQS Worker Lambda
  • Conclusion

Prerequisite

  • An active AWS Account or any Cloud provider
  • A user with sufficient permissions to create the assets
  • Python 3.9 or higher
  • Terraform

I have chosen terraform to keep it generic, although I will be using AWS Provider but you can use any other provider and their respective resources using similar setup.

Limitations

  • This architecture only takes basic security considerations and is not meant for a production system. However, it can easily be transformed into a production system.
  • There is no frontend to control the ingestion configuration

Architecture

This diagram outlines a high-level architecture for a serverless event-driven data ingestion system implemented within the AWS Cloud environment, designed to accommodate data from multiple sources. It follows a common pattern called “Content Based Filtering” where the individual messages are routed to the workers based on the message type (in this case ‘source’ as type) rather than broadcasting all message to all workers. That way worker doesn’t have the responsibility of filtering and just focus on processing.

This architecture might come naturally to someone who has worked with Microservices. If you are interested in microservices, these are the good starting points:

  • Principles of Microservices – Deploy Independently
  • Principles of Microservices – Decentralize Everything
  • Principles of Microservices – Hide Implementation Details
AWS Event Driven Data Ingestion Architecture
Event Driven Data Ingestion

Data Flow in the System

1. Data Sources and Initial Trigger

At the onset, we have various data sources, namely Source A, B, C and D. These sources represent different systems or applications that generate data which needs to be ingested into the cloud infrastructure.

2. API Gateway and Lambda Function

The entry point for data is through the Amazon API Gateway, which provides a unified interface to trigger the ingestion process. The API Gateway is configured to receive data in a specific format, ensuring that the data from multiple sources conforms to a standard protocol.

3. AWS Lambda Functions as APIs

Upon receiving data, the API Gateway triggers AWS Lambda functions. These serverless compute services run code in response to triggers such as the API Gateway and automatically manage the underlying compute resources.

4. Data Routing with SNS to Queues

The Lambda functions then publish the incoming events to Amazon Simple Notification Service (SNS), a managed service that provides message delivery from publishers to subscribers (in this case, the message queues). SNS acts as a dispatcher, efficiently handling the throughput of messages.

5. Individual Queue Per Data Source

Following SNS, there are individual queues for each data source created using Amazon SQS (Simple Queue Service). There are separate queues for different data sources such as A, B, C and D, allowing for organized and isolated handling of messages from each source. This setup also includes a dead-letter queue to handle failed messages, ensuring that message processing can be retried or inspected without losing the information.

6. Data Processing

Additional Lambda functions are linked to each SQS queue as workers. These functions are invoked to process messages from the respective queues asynchronously. The processing might involve data transformation, validation, and preparation for storage.

7. Data Storage

The processed data is then stored in an AWS data storage service which could be Amazon S3 for object storage, Amazon RDS or DynamoDB for database storage, or any other suitable AWS data storage solution, depending on the requirements.

Error Handling

  • The architecture includes a strategy for error handling, where failed messages that cannot be processed after several attempts are routed to a dead-letter queue. This allows for isolating and troubleshooting erroneous data without interrupting the normal flow of data ingestion.

Key Architectural Benefits

  • The use of AWS services provides a scalable, reliable, and secure infrastructure that can handle large volumes of data and variable loads with ease.
  • Serverless components like Lambda ensure that you only pay for the compute time you consume, contributing to a cost-effective solution.
  • Event-driven architecture facilitates real-time data processing, making the system reactive and dynamic in response to incoming data.

Overall, this architecture provides a robust framework for ingesting data from multiple sources using serverless technologies, ensuring scalability, reliability, and maintainability.

Code – Infrastructure with Terraform

The way I like to structure my terraform code is in modules. Leveraging modules keeps the code quite maintainable and isolated. Another thing I like to leverage are the variables for creating repeatable infra units.

For this structure, here is the code structure that I followed.

The root directory contains following directories:

.
├── config
└── modules
    ├── common-services
    └── ingestion

From the root till level 2 including files it is structured as below.

.
├── backend.tf
├── config
│   ├── backend.conf
│   ├── backend_local.conf
│   ├── terraform.tfvars
│   └── terraform_local.tfvars
├── main.tf
├── modules
│   ├── common-services
│   └── ingestion
├── output.tf
├── providers.tf
├── terraform.tf
└── variables.tf

Now, I will enter inside the ingestion module and you will see different files for different services and stacks. I like to create different .tf files for different components and services. That way I know where to go for what. And as you can see, these are pretty self explanatory.

modules/ingestion
├── apis.tf
├── glue.tf
├── lambda.tf
├── roles.tf
├── s3.tf
├── sns.tf
├── sqs.tf
└── variables.tf

I would like to take you through each file and its contents one by one and explain the bits along the way if needed.

lambda.tf

This file contains only the code for creating lambdas and their related resources in AWS. Here, you will find the creation of two type of lambda functions:

  • Ingestion API Lambda
  • SQS Worker Lambdas

This is a good time to take a quick look at the architecture to understand where these lambdas are positioned.

locals {
  common_ingestion_api_lambda = {
    s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
    s3_key            = aws_s3_object.lambda_zip.key
    function_name     = "ingestion_api_lambda",
    handler           = "ingestion_api_function.lambda_handler",
    runtime           = "python3.9"
    source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
  }

  sqs_worker_lambdas   = {
    sourceA = {
      s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
      s3_key            = aws_s3_object.lambda_zip.key
      function_name     = "sqs_sourceA_worker_${var.suffix}"
      handler           = "sqs_worker_function.handle_sourceA_message"
      runtime           = "python3.9"
      source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
      timeout           = 60
    },
    sourceB = {
      s3_bucket        = aws_s3_bucket.ingestion_lambda_bucket.bucket
      s3_key           = aws_s3_object.lambda_zip.key
      function_name    = "sqs_sourceB_worker_${var.suffix}"
      handler          = "sqs_worker_function.handle_sourceB_message"
      runtime          = "python3.9"
      source_code_hash = data.archive_file.ingestion_lambda_zip.output_base64sha256
      timeout          = 60
    },
    sourceC = {
      s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
      s3_key            = aws_s3_object.lambda_zip.key
      function_name     = "sqs_sourceC_worker_${var.suffix}"
      handler           = "sqs_worker_function.handle_sourceC_message"
      runtime           = "python3.9"
      source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
      timeout           = 60
    }
  }
}



# Create Lambda functions Ingestion API
resource "aws_lambda_function" "ingestion_api" {
  s3_bucket        = local.common_ingestion_api_lambda.s3_bucket
  s3_key           = local.common_ingestion_api_lambda.s3_key
  function_name    = local.common_ingestion_api_lambda.function_name
  handler          = local.common_ingestion_api_lambda.handler
  runtime          = local.common_ingestion_api_lambda.runtime
  role             = aws_iam_role.lambda_execution_role.arn

  layers = [
    "arn:aws:lambda:${var.region}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:32",
    var.lambda_dependency_layer_arn
  ]
  source_code_hash = local.common_ingestion_api_lambda.source_code_hash
  depends_on = [data.archive_file.ingestion_lambda_zip, aws_s3_bucket.ingestion_lambda_bucket, aws_s3_object.lambda_zip]
}


resource "aws_cloudwatch_log_group" "ingestion_api" {
  #checkov:skip=CKV_AWS_338:Ensure CloudWatch log groups retains logs for at least 1 year
  #checkov:skip=CKV_AWS_158:Ensure that CloudWatch Log Group is encrypted by KMS
  name              = "/aws/lambda/${aws_lambda_function.ingestion_api.function_name}"
  retention_in_days = 30
}





# Create Lambda functions SQS_WORKER
resource "aws_lambda_function" "sqs_worker" {
  for_each = local.sqs_worker_lambdas
  s3_bucket        = each.value.s3_bucket
  s3_key           = each.value.s3_key
  function_name    = each.value.function_name
  handler          = each.value.handler
  runtime          = each.value.runtime
  role             = aws_iam_role.sqs_lambda.arn
  timeout          = each.value.timeout

  layers = [
    "arn:aws:lambda:${var.region}:017000801446:layer:AWSLambdaPowertoolsPythonV2-Arm64:32",
    var.lambda_dependency_layer_arn
  ]
  source_code_hash = each.value.source_code_hash
  depends_on = [data.archive_file.ingestion_lambda_zip, aws_s3_bucket.ingestion_lambda_bucket, aws_s3_object.lambda_zip]
}


resource "aws_cloudwatch_log_group" "sqs_worker" {
  for_each = local.sqs_worker_lambdas
  #checkov:skip=CKV_AWS_338:Ensure CloudWatch log groups retains logs for at least 1 year
  #checkov:skip=CKV_AWS_158:Ensure that CloudWatch Log Group is encrypted by KMS
  name              = "/aws/lambda/${aws_lambda_function.sqs_worker[each.key].function_name}"
  retention_in_days = 30
}

apis.tf

An API Gateway in AWS acts as the front door to your serverless applications. It directs incoming requests to the appropriate functions based on pre-defined paths and methods. Let’s break down the key elements that make up an API Gateway:

  1. Base API: This is the foundation, the starting point for your API. Think of it as the root directory on your computer.
  2. Resources: These represent specific functionalities within your API, similar to folders within a directory structure. You define paths to access these resources, allowing users to interact with different parts of your application.
  3. Methods: These define how users can interact with a resource. In REST API, these are mapped directly to POST, PUT, GET, DELETE, PATCH etc.
  4. Integration: Here’s where the magic happens! You connect your API Gateway resources with the actual logic that handles requests. In this case, we’re integrating with a Lambda function, but API Gateway can connect to various backend services.
  5. Deployment: Once everything is configured, it’s time to make your API accessible! Deployment publishes your API Gateway, making it live and ready to receive requests from users.

With the above information in mind, just take a look at the following resource. Ignore the locals for now. This is the variable that holds the information for creating different APIs in the system. Essentially, one API Endpoint per data source.

locals {
  endpoints = {
    "sourceA" = {
      path = "a"
      lambda_config = {
        s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
        s3_key            = aws_s3_object.lambda_zip.key
        function_name     = "ingestion_api_lambda",
        handler           = "ingestion_api_function.lambda_handler",
        runtime           = "python3.9"
        source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
      }
    },
    "sourceB" = {
      path = "b"
      lambda_config = {
        s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
        s3_key            = aws_s3_object.lambda_zip.key
        function_name     = "ingestion_api_lambda",
        handler           = "ingestion_api_function.lambda_handler",
        runtime           = "python3.9"
        source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
      }
    },
    "sourceC" = {
      path = "c"
      lambda_config = {
        s3_bucket         = aws_s3_bucket.ingestion_lambda_bucket.bucket
        s3_key            = aws_s3_object.lambda_zip.key
        function_name     = "ingestion_api_lambda",
        handler           = "ingestion_api_function.lambda_handler",
        runtime           = "python3.9"
        source_code_hash  = data.archive_file.ingestion_lambda_zip.output_base64sha256
      }
    }
  }

  endpoint_methods = {
    "sourceA-POST": {
      method       = "POST",
      endpoint_key = "sourceA"
    },
    "sourceB-POST": {
      method        = "POST",
      endpoint_key  = "sourceB"
    },
    "sourceC-POST": {
      method        = "POST",
      endpoint_key  = "sourceC"
    }
  }
}


# Define the API Gateway
resource "aws_api_gateway_rest_api" "ingestion_api" {
  name        = "ingestion_api"
  description = "API for data ingestion"
}
# Create API Gateway resources for each endpoint
resource "aws_api_gateway_resource" "ingestion_api" {
  for_each = local.endpoints

  rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
  parent_id   = aws_api_gateway_rest_api.ingestion_api.root_resource_id
  path_part   = each.value.path
}

resource "aws_api_gateway_method" "ingestion_api" {
  for_each = local.endpoint_methods

  rest_api_id   = aws_api_gateway_rest_api.ingestion_api.id
  resource_id   = aws_api_gateway_resource.ingestion_api[each.value.endpoint_key].id
  http_method   = each.value.method
  authorization = "NONE"
}

# Integrate API methods with Lambda functions
resource "aws_api_gateway_integration" "ingestion_api" {
  for_each = aws_api_gateway_method.ingestion_api

  rest_api_id             = aws_api_gateway_rest_api.ingestion_api.id
  resource_id             = each.value.resource_id
  http_method             = each.value.http_method
  type                    = "AWS_PROXY"
  uri                     = aws_lambda_function.ingestion_api.invoke_arn
  integration_http_method = "POST"
}

# Deploy the API Gateway
resource "aws_api_gateway_deployment" "api_ingestion_deployment" {
  depends_on = [aws_api_gateway_integration.ingestion_api]

  rest_api_id = aws_api_gateway_rest_api.ingestion_api.id
  stage_name  = "v1"

  triggers = {
    redeployment = sha1(jsonencode(aws_api_gateway_integration.ingestion_api))
  }
}

# Output the base URL of the API Gateway
output "base_url" {
  value = "${aws_api_gateway_rest_api.ingestion_api.execution_arn}/api"
}

roles.tf

This is where I managed all the roles and permission for all the different resources in ingestion module. These are supposed to be self explanatory if you know how permissions and policies work in AWS or in general. But for now you can simply skip this section and come back later. Said that, these are a very important bit that you should take extra care while describing. This could become a big security vulnerability if not done properly. Always follow least privilege approach while defining permissions.

resource "aws_iam_role" "lambda_execution_role" {
  name = "lambda_execution_role"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      },
    ]
  })
}

resource "aws_iam_policy" "lambda_policy" {
  name        = "lambda_policy"
  description = "IAM policy for Lambda to access S3 and DB"

  policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
          "s3:GetObject",
          "s3:PutObject",
          "s3:DeleteObject"
          // Add DB permissions here
        ]
        Effect = "Allow"
        Resource = "*"
      },
      {
        "Effect": "Allow",
        "Action": [
          "sns:Publish"
        ],
        "Resource": aws_sns_topic.sns_router.arn
      }
    ]
  })
}

resource "aws_iam_role_policy_attachment" "lambda_policy_attachment" {
  role       = aws_iam_role.lambda_execution_role.name
  policy_arn = aws_iam_policy.lambda_policy.arn
}

resource "aws_lambda_permission" "api_gateway_invoke" {
  statement_id  = "AllowExecutionFromAPIGateway"
  action        = "lambda:InvokeFunction"
  function_name = aws_lambda_function.ingestion_api.function_name
  principal     = "apigateway.amazonaws.com"

  # This source_arn restricts access to a specific API Gateway stage and method
  source_arn = "${aws_api_gateway_rest_api.ingestion_api.execution_arn}/*/*/*"
}






resource "aws_iam_role" "sqs_lambda" {
  name = "sqs_lambda_${var.suffix}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17"
    Statement = [
      {
        Action = "sts:AssumeRole"
        Effect = "Allow"
        Principal = {
          Service = "lambda.amazonaws.com"
        }
      },
    ]
  })
}


resource "aws_iam_policy" "lambda_sqs_policy" {
  name        = "lambda_sqs_policy"
  description = "Allows Lambda function to consume messages from SQS queues"

  policy = jsonencode({
    Version   = "2012-10-17",
    Statement = [
      {
        Action    = [
          "sqs:ReceiveMessage",
          "sqs:DeleteMessage",
          "sqs:GetQueueAttributes",
        ],
        Resource  = values(aws_sqs_queue.queues)[*].arn,
        Effect    = "Allow",
      },
      {
        Action = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
        ],
        Resource = "arn:aws:logs:*:*:*",
        Effect   = "Allow",
      },
      {
        "Effect": "Allow",
        "Action": [
          "dynamodb:PutItem",
          "dynamodb:GetItem",
          "dynamodb:DeleteItem",
        ],
        "Resource": "arn:aws:dynamodb:${var.region}:${var.account_id}:table/datastore"
      }
    ],
  })

}

resource "aws_iam_role_policy_attachment" "lambda_sqs_attach" {
  role       = aws_iam_role.sqs_lambda.name
  policy_arn = aws_iam_policy.lambda_sqs_policy.arn
}





resource "aws_sqs_queue_policy" "queue_policy" {
  for_each = local.queues

  queue_url = aws_sqs_queue.queues[each.key].url

  policy = jsonencode({
    Version: "2012-10-17",
    Statement: [
      {
        Effect: "Allow",
        Principal: {
          AWS: "*"
        },
        Action: "sqs:SendMessage",
        Resource: aws_sqs_queue.queues[each.key].arn,
        Condition: {
          ArnEquals: {
            "aws:SourceArn": aws_sns_topic.sns_router.arn
          }
        }
      }
    ]
  })
}





resource "aws_iam_role" "sns" {
  name = "sns_role_${var.suffix}"

  assume_role_policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Action = "sts:AssumeRole",
        Principal = {
          Service = "sns.amazonaws.com"
        },
        Effect = "Allow",
        Sid    = "",
      },
    ],
  })
}

resource "aws_iam_policy" "sns" {
  name   = "sns_policy_${var.suffix}"
  policy = jsonencode({
    Version = "2012-10-17",
    Statement = [
      {
        Effect   = "Allow",
        Action   = [
          "logs:CreateLogGroup",
          "logs:CreateLogStream",
          "logs:PutLogEvents",
          "logs:DescribeLogStreams"
        ],
        Resource = "arn:aws:logs:*:*:*"
      },
    ],
  })
}

resource "aws_iam_role_policy_attachment" "sns_logging_policy_attachment" {
  role       = aws_iam_role.sns.name
  policy_arn = aws_iam_policy.sns.arn
}

s3.tf

This file holds the creation of different Simple Storage Buckets for uploading lambda code. This is not needed but a good practice to keep the versions of your deployed code in s3 so that its easy to revert back and go to the previous version if needed. Also, helps in keeping the audit and debugging.

resource "random_string" "bucket_suffix" {
  length  = 8
  special = false
  upper   = false
  keepers = {
    # Generate a new suffix only if one of these attributes changes
    attr = "value"
  }
}

resource "aws_s3_bucket" "ingestion_lambda_bucket" {
  bucket = "ingestion-lambda-bucket-${random_string.bucket_suffix.result}"
}

data "archive_file" "ingestion_lambda_zip" {
  type        = "zip"
  source_dir  = "${path.root}/../ingestion"
  output_path = "${path.module}/../ingestion_function.zip"
  excludes    = ["venv", "test", ".idea"]
}

resource "aws_s3_object" "lambda_zip" {
  bucket = aws_s3_bucket.ingestion_lambda_bucket.bucket
  key    = "${data.archive_file.ingestion_lambda_zip.output_base64sha256}.zip"
  source = data.archive_file.ingestion_lambda_zip.output_path

  depends_on = [aws_s3_bucket.ingestion_lambda_bucket,data.archive_file.ingestion_lambda_zip]
}

sqs.tf

This creates the FIFO Queues as described in the architecture for each source.

Note: you need to suffix queue name with .fifo in case of a FIFO queue. Or it will throw error.

locals {
  queues = {
    "sourceA" = {
      name          = "sourceA-${var.suffix}.fifo",
      fifo          = true,
      deduplication = true,
    },
    "sourceB" = {
      name          = "sourceB-${var.suffix}.fifo",
      fifo          = true,
      deduplication = true,
    },
    "sourceC" = {
      name          = "sourceC-${var.suffix}.fifo",
      fifo          = true,
      deduplication = true,
    }
  }
}

resource "aws_sqs_queue" "queues" {
  for_each = local.queues

  name                        = each.value.name
  fifo_queue                  = each.value.fifo
  content_based_deduplication = each.value.deduplication
  visibility_timeout_seconds  = 120
  # Enable if you want automatic deduplication based on the content
  depends_on                  = [aws_lambda_function.sqs_worker]
}

resource "aws_lambda_event_source_mapping" "sqs_queue_mapping" {
  for_each = local.queues

  event_source_arn = aws_sqs_queue.queues[each.key].arn
  function_name    = aws_lambda_function.sqs_worker[each.key].arn
}

sns.tf

This is Simple Notification Service and its role is to provide a single topic for the API lambda to publish message. And based on the message attribute source, it decides where to route the message.

It is pointing to the locals that is defined in SQS file. Since, the queues used is same, it was straighforward in my head to use the same variable. But if you want to keep that isolated, you are free to do so.

The filter policy under subscription is the part that defines which message to be routed. In this case, the message attribute must contain the field source with the defined source or it will be dropped. The source must be either sourceA, sourceB, or sourceC.

resource "aws_sns_topic" "sns_router" {
  name        = "data-ingestion.fifo"
  fifo_topic  = true
}

resource "aws_sns_topic_subscription" "sns_subscription" {
  for_each = local.queues

  topic_arn = aws_sns_topic.sns_router.arn
  protocol  = "sqs"
  endpoint  = aws_sqs_queue.queues[each.key].arn

  filter_policy = jsonencode({
    source = [each.key]
  })
}

dynamodb.tf

Finally, I needed a database to store the ingested data. For the purpose of this architecture, I will setup a Dynamo dB table with a very generic structure to store the ingested data.

resource "aws_dynamodb_table" "dynamodb" {
  name           = "datastore"
  billing_mode   = "PAY_PER_REQUEST" # Or "PROVISIONED" based on your usage pattern
  hash_key       = "ID"

  attribute {
    name = "ID"
    type = "S" # S = String, N = Number, B = Binary
  }

  # attributes used in Global Secondary Indexes as 'attribute' blocks
  attribute {
    name = "ENTITY_NAME"
    type = "S"
  }

  # Global Secondary Index for ENTITY_NAME
  global_secondary_index {
    name               = "EntityNameIndex"
    hash_key           = "ENTITY_NAME"
    projection_type    = "ALL" # Adjust based on needs, ALL includes all attributes in the index
  }

  tags = {
    Environment = "dev"
  }
}

Once you have setup the terraform code correctly (along with the lambda code) and execute this module, you should see the infrastructure built and deployed in your AWS account.

Code – Ingestion API Lambda

Let’s look at the ingestion api code for lambda that would take the data in from different sources and publish it to the SNS. This is the APIs lambda after API Gateway.

In the code below, all methods has the similar code as I’m not doing much with the data. But in real world, you would perform the required validation on the events received in the request to decide whether to process or reject it in the beginning.

import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.event_handler.api_gateway import APIGatewayRestResolver
from aws_lambda_powertools.logging import correlation_paths
from aws_lambda_powertools.utilities.typing import LambdaContext
import boto3
import hashlib
from datetime import datetime

logger = Logger()
router = APIGatewayRestResolver()

# Initialize the Boto3 SNS client
sns_client = boto3.client('sns')

# SNS Topic ARN - replace with your actual SNS topic ARN
SNS_TOPIC_ARN = 'arn:aws:sns:<REGION>:<ACCOUNT_ID>:data-ingestion.fifo'

def get_aws_context_info():
    logger.info("Getting aws context info")
    session = boto3.session.Session()
    region = session.region_name
    client = boto3.client('sts')
    account_id = client.get_caller_identity()["Account"]

    logger.info(f"Region: {region}, Account: {account_id}")
    return region, account_id

def publish_to_sns(message, topic_arn, deduplication_id, group_id):
    """
    Publishes a message to an SNS topic with message attributes, including custom attributes
    for deduplication ID and group ID for informational purposes or downstream processing.

    :param message: Message to be sent.
    :param topic_arn: ARN of the SNS topic.
    :param deduplication_id: Custom deduplication ID for the message.
    :param group_id: Custom group ID for the message.
    """
    response = sns_client.publish(
        TopicArn=topic_arn,
        Message=json.dumps(message),  # Assuming you want to send the 'body' part as the main message
        MessageGroupId=group_id,
        MessageDeduplicationId=deduplication_id,
        MessageAttributes={
            'source': {
                'DataType': 'String',
                'StringValue': message['source']
            },
        }
    )
    return response


@router.post("/sourceA")
def put_sourceA():
    logger.info("POST /sourceA was called")
    body = router.current_event.json_body
    body['timestamp'] = datetime.utcnow().isoformat()
    logger.info(f"Body: {body}")

    # Publish the message to SNS
    response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
                              deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(), group_id=body['source'])
    logger.info(f"Message published to SNS: {response}")

    return {"body": "POST /sourceA success", "statusCode": 201}


@router.post("/sourceB")
def put_sourceB():
    logger.info("POST /sourceB was called")
    body = router.current_event.json_body
    body['timestamp'] = datetime.utcnow().isoformat()

    # Publish the message to SNS
    response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
                              deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(), group_id=body['source'])
    logger.info(f"Message published to SNS: {response}")

    return {"body": "POST /sourceB success", "statusCode": 201}


@router.post("/sourceC")
def put_sourceC():
    logger.info("POST /sourceC was called")
    body = router.current_event.json_body
    body['timestamp'] = datetime.utcnow().isoformat()

    # Publish the message to SNS
    response = publish_to_sns(message=body, topic_arn=SNS_TOPIC_ARN,
                              deduplication_id=hashlib.md5(json.dumps(body).encode('utf-8')).hexdigest(),
                              group_id=body['source'])
    logger.info(f"Message published to SNS: {response}")

    return {"body": "POST /sourceC success", "statusCode": 201}


@logger.inject_lambda_context(correlation_id_path=correlation_paths.API_GATEWAY_REST, log_event=True)
def lambda_handler(event: dict, context: LambdaContext):
    logger.info(f"Request Received: {event}")
    return router.resolve(event, context)

The most important method in the above code is the publish_to_sns() method as it describes some important key structure for the message.

As we are using the fifo queue, it is mandatory to provide the MessageGroupId and MessageDeduplicationId. Also, we are using filter policy based on the source field of the message, we must provide that in the MessageAttributes. Here, the message source is parsed from the source key of the message.

This code publishes the message to the SNS and then SNS routes that message to the downstream SQS based on the filter policy described in the SNS subscription on the source attribute.

def publish_to_sns(message, topic_arn, deduplication_id, group_id):
    """
    Publishes a message to an SNS topic with message attributes, including custom attributes
    for deduplication ID and group ID for informational purposes or downstream processing.

    :param message: Message to be sent.
    :param topic_arn: ARN of the SNS topic.
    :param deduplication_id: Custom deduplication ID for the message.
    :param group_id: Custom group ID for the message.
    """
    response = sns_client.publish(
        TopicArn=topic_arn,
        Message=json.dumps(message),  # Assuming you want to send the 'body' part as the main message
        MessageGroupId=group_id,
        MessageDeduplicationId=deduplication_id,
        MessageAttributes={
            'source': {
                'DataType': 'String',
                'StringValue': message['source']
            },
        }
    )
    return response

Code – SQS Worker Lambda

Last we would look at the Worker SQS Lambda code that polls the message from the SQS in batch and processes it.

The code below, parses the data based on the defined data structure and persists it into the database.

import json
import uuid
from datetime import datetime

import boto3
from aws_lambda_powertools import Logger

from entities.entities import Milestone, Contract

logger = Logger()
# Initialize DynamoDB and SQS clients
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('datastore')


def handle_sourceA_message(event, context):
    for record in event['Records']:
        try:
            logger.info(record)

            # First, parse the SQS message body
            message_body = json.loads(record['body'])
            logger.info(message_body)

            # Then, parse the nested 'Message' field within the SQS message body
            sns_message = json.loads(message_body['Message'])
            logger.info(sns_message)

            # Check the 'entity_name' and proceed if it matches 'EQUIPMENT'
            if sns_message['entity_name'] == 'EQUIPMENT':
                persist_equipment(sns_message['body'])

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
            return {"statusCode": 500}

    return {
        'statusCode': 200,
        'body': json.dumps('Successfully processed SQS messages.')
    }


def handle_sourceB_message(event, context):
    for record in event['Records']:
        try:
            logger.info(record)

            # First, parse the SQS message body
            message_body = json.loads(record['body'])
            logger.info(message_body)

            # Then, parse the nested 'Message' field within the SQS message body
            sns_message = json.loads(message_body['Message'])
            logger.info(sns_message)

            # Check the 'entity_name' and proceed if it matches 'CONTRACT'
            if sns_message['entity_name'] == 'CONTRACT':
                persist_contract(sns_message['body'])

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
            return {"statusCode": 500}

    return {
        'statusCode': 200,
        'body': json.dumps('Successfully processed SQS messages.')
    }


def handle_sourceC_message(event, context):
    for record in event['Records']:
        try:
            logger.info(record)
            # First, parse the SQS message body
            message_body = json.loads(record['body'])
            logger.info(message_body)

            # Then, parse the nested 'Message' field within the SQS message body
            sns_message = json.loads(message_body['Message'])
            logger.info(sns_message)

            if sns_message['entity_name'] == 'MILESTONE':
                persist_milestone(sns_message['body'])

        except Exception as e:
            logger.error(f"Error processing message: {str(e)}")
            return {"statusCode": 500}


def persist_contract(contract_body):
    contract_item = Contract.from_event(contract_body)

    logger.info(f"Writing contract to DynamoDB: {contract_item}")

    try:
        response = table.put_item(Item=contract_item.to_dict())
        logger.info(f"PutItem succeeded: {response}")
    except Exception as e:
        logger.error(f"Error writing contract to DynamoDB: {str(e)}")


def persist_milestone(milestone):

    milestone_item = Milestone.from_event(milestone)

    logger.info(f"Writing milestone to DynamoDB: {milestone_item}")

    try:
        response = table.put_item(Item=milestone_item.to_dict())
        logger.info(f"PutItem succeeded: {response}")
    except Exception as e:
        logger.error(f"Error writing contract to DynamoDB: {str(e)}")


def persist_equipment(equipment):
    unique_id = str(uuid.uuid4())
    equipment_item = {
        'ID': f"EQUIPMENT#{unique_id}",
        'entity_name': 'EQUIPMENT',
        'cat_id': equipment.get('cat_id', ''),
        'contract_id': equipment.get('contract_id', ''),
        'parent_equipment_id': equipment.get('parent_id', ''),
        'current_milestone_id': equipment.get('current_milestone_id', '')
    }

    logger.info(f"Writing equipment to DynamoDB: {equipment_item}")

    try:
        response = table.put_item(Item=equipment_item)
        logger.info(f"PutItem succeeded: {response}")
    except Exception as e:
        logger.error(f"Error writing contract to DynamoDB: {str(e)}")

In order to understand the code, you must understand the structure of the message. So, for this tutorial, I curated a sample data structure of a industrial supply chain system with 3 main entities e.g. Contract, Milestone and Equipment.

You can see the structure of the data below:

CONTRACTS
CONTRACT_IDPK
CONTRACT_NAMEVARCHAR
CONTRACT_TYPEENUM
CONTRACT_STATUSENUM
MILESTONES
MILESTONE_IDPK
CONTRACT_IDFK (CONTRACTS > CONTRACT_ID)
DESCRIPTIONVARCHAR
START_DATEDATETIME
END_DATEDATETIME
EQUIPMENTS
EQUIPMENT_IDPK
PARENT_EQUIPMENT_IDFK (EQUIPMENTS>EQUIPMENT_ID)
CONTRACT_IDFK (CONTRACTS > CONTRACT_ID)
CURRENT_MILESTONEFK (MILESTONES > MILESTONE_ID)

Conclusion

In the ever-evolving landscape of cloud computing, AWS’s serverless event-driven architecture offers a promising pathway to enhance data ingestion from multiple and diverse sources. By leveraging services like AWS Lambda, SNS, and SQS, businesses can achieve more scalable, efficient, and cost-effective data handling processes. However, the transition to a serverless model is not without its challenges. It requires a thoughtful approach to design, execution, and continuous improvement.

What challenges have you faced while integrating serverless architectures into your data operations?

Have you found innovative ways to overcome these challenges, or are there aspects of serverless technology that still seem daunting?

Share your experiences and thoughts in the comments below. Your insights not only contribute to a richer discussion but also help in shaping more robust serverless solutions for everyone.

Related

Filed Under: Blogging, Programming Tagged With: api gateway, aws, event-driven, lambda, microservices, python, sns, sqs, terraform

Primary Sidebar

Subscribe to Blog via Email

Do you enjoy the content? Feel free to leave your email with me to receive new content straight to your inbox. I'm an engineer, you can trust me :)

Join 874 other subscribers

Latest Podcasts

Recent Posts

  • Is The Cosmos a Vast Computation?
  • Building Semantic Search for E-commerce Using Product Embeddings and OpenSearch
  • Leader Election with ZooKeeper: Simplifying Distributed Systems Management
  • AWS Serverless Event Driven Data Ingestion from Multiple and Diverse Sources
  • A Step-by-Step Guide to Deploy a Static Website with CloudFront and S3 Using CDK Behind A Custom Domain

Recent Comments

  • Varun Shrivastava on Deploy Lambda Function and API Gateway With Terraform
  • Vaibhav Shrivastava on Deploy Lambda Function and API Gateway With Terraform
  • Varun Shrivastava on Should Girls Wear Short Clothes?
  • D on Should Girls Wear Short Clothes?
  • disqus_X5PikVsRAg on Basic Calculator Leetcode Problem Using Object-Oriented Programming In Java

Categories

  • Blogging
  • Cooking
  • Fashion
  • Finance & Money
  • Programming
  • Reviews
  • Software Quality Assurance
  • Technology
  • Travelling
  • Tutorials
  • Web Hosting
  • Wordpress N SEO

Archives

  • November 2024
  • September 2024
  • July 2024
  • April 2024
  • February 2024
  • November 2023
  • June 2023
  • May 2023
  • April 2023
  • August 2022
  • May 2022
  • April 2022
  • February 2022
  • January 2022
  • November 2021
  • September 2021
  • August 2021
  • June 2021
  • May 2021
  • April 2021
  • February 2021
  • January 2021
  • December 2020
  • November 2020
  • October 2020
  • September 2020
  • August 2020
  • July 2020
  • June 2020
  • May 2020
  • April 2020
  • February 2020
  • December 2019
  • November 2019
  • October 2019
  • August 2019
  • July 2019
  • June 2019
  • May 2019
  • April 2019
  • March 2019
  • January 2019
  • November 2018
  • October 2018
  • September 2018
  • August 2018
  • July 2018
  • June 2018
  • May 2018
  • March 2018
  • February 2018
  • January 2018
  • December 2017
  • November 2017
  • October 2017
  • September 2017
  • August 2017
  • July 2017
  • June 2017
  • May 2017
  • April 2017
  • March 2017
  • February 2017
  • January 2017
  • December 2016
  • November 2016
  • October 2016
  • September 2016
  • August 2016
  • July 2016
  • June 2016
  • May 2016

Tags

Affordable Hosting (4) algorithms (4) amazon (3) aoc-2020 (7) believe in yourself (4) best (4) database (4) earn money blogging (5) education (4) elementary sorting algorithms (4) experience (3) fashion (4) finance (6) Financial Freedom (7) food (7) friends (3) goals (5) google (5) india (10) indian cuisine (5) indian education system (4) java (16) life (16) life changing (4) love (4) make money (3) microservices (9) motivation (4) oops (4) podcast (6) poor education system (4) principles of microservices (5) problem-solving (7) programmer (5) programming (28) python (5) reality (3) seo (6) spring (3) success (10) success factor (4) technology (4) top 5 (7) typescript (3) wordpress (7)

Copyright © 2025 · Be My Aficionado · WordPress · Log in

Go to mobile version