umma.dev

AWS: Data Processing and Analytics

Querying a Global Secondary Index

  • An index with a partition key and a sort key
  • Considered global if the query in the index spans all of the data in the base table across partitions
  • No size limits and has its own provisioned throughput settings for read and write activity

Create a DynamoDB Table

  • Name: <dynamoDB-table-name>
  • Partition Key: MovieTitle (string)
  • Sort Key: ReleaseYear (number)
  • Click Create table

Create and Query a Global Secondary Index

  • Go to the Movies table and select the Indexes tab
  • Click Create index
    • Partition Key: Genre (string)
    • Sort Key: Rating (number)
    • Index name: Genre-Rating-Index
  • Click Create index > Explore table items
  • Click on Create item
  • Add sample data:
{
  "MovieTitle": {
    "S": "The Dark Knight"
  },
  "ReleaseYear": {
    "N": "2008"
  },
  "Genre": {
    "S": "Action"
  },
  "Rating": {
    "N": "9.0"
  },
  "Director": {
    "S": "Christopher Nolan"
  }
}
{
  "MovieTitle": {
    "S": "Inception"
  },
  "ReleaseYear": {
    "N": "2010"
  },
  "Genre": {
    "S": "Sci-Fi"
  },
  "Rating": {
    "N": "8.8"
  },
  "Director": {
    "S": "Christopher Nolan"
  }
}
  • Query the GSI
    • Select Query
    • Click on Select a table or index and select Genre-Rating-Index
    • Enter Action into the Partition key field and click Run

Querying a Local Secondary Index

  • An index that has the same partition keys as the base table but different sort key
  • A base table partition has the same partition key value
  • Must be specified at the time of table creation

Create a DynamoDB Table

  • Name: Movies
  • Partition key: Year (number)
  • Sort key: Title (string)
  • Create a LS (under table settings, chose customise settings)
  • Look for Secondary indexes and click Create local index
  • Enter RatingIndex as the index name and Rating as the sort key
  • Create index
  • Create table

Create and Query the LSI

  • Ensure table has Active status
  • Explore items > explore table items > create item
  • Sample data:
{
  "Year": {
    "N": "2020"
  },
  "Title": {
    "S": "Movie A"
  },
  "Rating": {
    "N": "8.1"
  }
}
{
  "Year": {
    "N": "2020"
  },
  "Title": {
    "S": "Movie B"
  },
  "Rating": {
    "N": "7.5"
  }
}
  • Query the LSI: select a table or index > RatingIndex
    • Partition key value: 2020
    • Condition for sort key: rating greater than 7.0
    • Select Query
    • Click Run

Querying Data with Amazon Athena and AWS Glue Crawler Integration

Amazon S3 Bucket

  • Click on Create a new bucket and provide it with a unique name (leave default settings as is)
  • Download or create csv file of data
  • Create two folders and name them file and query
  • Open file and click on upload to upload your data

AWS Glue Data Catalog

  • Go to AWS Glue and click create within Databases (on left hand side)
  • Give the name and description unique values, click Create database

AWS Glue Data Crawler

  • In AWS Glue, on the left under Data Catalog > Databases, click on Crawlers
  • Click on Create crawler
  • Give it a name and description
  • Click Next
  • Click on Add a data source
    • Data source: s3
    • S3 path: s3://<name-of-s3-bucket>/file/
    • Click on Add an S3 data source
    • Click Next
    • Under IAM role, select an existing one or create one
    • Click Next
    • Under target database, select the database created earlier
    • Click Next
    • Review all details under Review and create
    • Click on Create crawler
    • Once created, click Run crawler

Querying Data with Amazon Athena

  • In Amazon Athena, set up a query location
  • Click on Edit settings
  • In query result location and encryption: s3://<name-of-s3-bucket>/query/
  • Click Save
  • Select database created in AWS Glue Data Catalog with the following config
    • Data source: AwsDataCatalog
    • Databases: select previously created
  • Write and run SQL queries to analyse data
    • To view all records: copy and paste, and then run the SQL query SELECT * FROM file;
    • Query data using other SQL queries on the database, such as WHERE, FROM and ORDER BY

Performing SQL Queries on DynamoDB with Amazon Athena

Create a DynamoDB Table

  • Name: <unique-dynamodb-tb-name>
  • Primary key
    • Partition key: id (string)
  • Leave other settings as default
  • Click Create Table

Create an AWS Lambda Function for Data Generation

  • Chose Author from scratch
  • Name: <unique-lambda-function-name>
  • Runtime: select Python 3.x
  • Execution role: choose an existing role or create a new one
  • Click Create function
  • Replace default code within function code section:
import boto3
import random
from datetime import datetime, timedelta

# Initialize DynamoDB
dynamodb = boto3.resource('dynamodb')
table_name = '<your_table_name_for_DynamoDB_Data>'  # Replace with your table name
table = dynamodb.Table(table_name)

# Sample data lists for generating realistic names, cities, and emails
first_names = ["John", "Jane", "Alice", "Bob", "Charlie", "Diana"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
cities = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix", "Philadelphia"]

def generate_random_email(first_name, last_name):
    domains = ["example.com", "email.com", "testmail.com", "fakemail.com"]
    domain = random.choice(domains)
    return f"{first_name.lower()}.{last_name.lower()}@{domain}"

def generate_random_date():
    start_date = datetime.now() - timedelta(days=365)  # up to one year ago
    random_days = random.randint(0, 365)
    return (start_date + timedelta(days=random_days)).isoformat()

def lambda_handler(event, context):
    # Specify the number of records to generate
    num_records = 100  # Adjust this as needed

    for _ in range(num_records):
        first_name = random.choice(first_names)
        last_name = random.choice(last_names)
        city = random.choice(cities)

        item = {
            'id': str(random.randint(1, 1000000)),
            'name': f"{first_name} {last_name}",
            'email': generate_random_email(first_name, last_name),
            'city': city,
            'created_at': generate_random_date()
        }

        # Insert the item into DynamoDB
        table.put_item(Item=item)

    return {
        'statusCode': 200,
        'body': f'{num_records} records successfully inserted into {table_name}.'
    }
  • Replace <your_table_name_for_DynamoDB_Data with the name of the table you created
  • Click Deploy to save
  • Click Test and create a new test event with default settings
  • Click Test again

Set Up Two S3 Buckets

  • DynamoDB bucket
    • Name: <unique-dynamo-db-s3-name>
    • Leave all other settings as default
  • Athena query results
    • Name: <unique-athena-query-s3-name>
    • Leave all other settings as default

Create an AWS Glue Job for Data Export

  • In AWS Glue: navigate to ETL jobs > author and edit ETL jobs > script editor > under engine select Python shell and select Start fresh as the Options > click Create script

  • Configure ETL job

    • Name: <DynamoDBToS3Export>
    • IAM role: chose an existing role or create a new one
    • Ensure data processing units are 1/16 DPU
    • Click Save
    • In script editor:
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from io import BytesIO

# Initialize boto3 resources and clients

dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')

# Define DynamoDB and S3 details

table_name = '<your-DynamoDB-table-name>' # Replace with your DynamoDB table name
output_bucket = '<your-S3-bucket-name>' # Replace with your S3 bucket name
output_key = 'parquet-output/'

# Initialize DynamoDB Table

table = dynamodb.Table(table_name)

# Step 1: Scan DynamoDB Table and Retrieve Data

def scan_dynamodb_table(table):
response = table.scan()
data = response['Items']

    # Handle pagination
    while 'LastEvaluatedKey' in response:
        response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
        data.extend(response['Items'])

    return data

# Retrieve data from DynamoDB

data = scan_dynamodb_table(table)

# Step 2: Convert Data to Pandas DataFrame

df = pd.DataFrame(data)

# Step 3: Convert DataFrame to Apache Parquet Format

table = pa.Table.from_pandas(df)
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer)

## Step 4: Upload Parquet File to S3

parquet_buffer.seek(0)
s3.put_object(Bucket=output_bucket, Key=f"{output_key}dynamodb_data.parquet", Body=parquet_buffer)

print("Export from DynamoDB to Parquet format in S3 completed.")
  • Click Save and run

Create an Athena Table

  • In Amazon Athena click on Edit settings
  • Locate the S3 bucket and click Save
  • In the query editor: CREATE DATABASE customer_order_db;
  • Define table schema:
CREATE EXTERNAL TABLE IF NOT EXISTS customer_order_db.dynamodb_data (
    id STRING,
    name STRING,
    email STRING,
    city STRING,
    created_at STRING
)
STORED AS PARQUET
LOCATION 's3://<your-DynamoDB-table-name>/parquet-output/';
  • Run the query to create the table

Query the Data in Athena

  • Select all records:
SELECT *
FROM customer_order_db.dynamodb_data
LIMIT 10;
  • Filter records by city:
SELECT
  id,
  name,
  email,
  city,
  created_at
FROM customer_order_db.dynamodb_data
WHERE city = 'New York';
  • Count customers by city:
SELECT
  city,
  COUNT(*) AS total_customers
FROM customer_order_db.dynamodb_data
GROUP BY city
ORDER BY total_customers DESC;

Intro to AWS Glue using Python Shell

Prepare Environment

  • Create an S3 bucket and create a input/ folder
  • Add a csv file into the input folder

Create a Python Shell Job in AWS Glue

  • In AWS Glue click on Author and edit ETL jobs
  • Click Script editor
  • Under Engine, select Python shell and select Start fresh
  • Click Create script
  • Provide a name for the job
  • In the script section:
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions

# Fetch job parameters
args = getResolvedOptions(sys.argv, [])

input_path = "s3://{bucket_name}/input/transactions.csv"
output_path = "s3://{bucket_name}/output/"

# Load data from S3
transactions = pd.read_csv(input_path)

# Function to redact card numbers
def redact_card_number(card_number):
    # Convert the card number to a string, just in case it's in numeric format
    card_number = str(card_number)
    # Replace all but the last 4 digits with asterisks
    return '*' * (len(card_number) - 4) + card_number[-4:]

# Apply the redaction function to the CardNumber column
transactions['CardNumber'] = transactions['CardNumber'].apply(redact_card_number)

# Save processed data
transactions.to_csv(output_path + "transactions_redacted.csv", index=False)

print("Data processing complete. Card numbers redacted.")
  • Remember to change {bucket_name} to your bucket
  • Set the IAM role created earlier and click on Job details
  • Click Save
  • Click on the Runs Tab and click Run

Creating Amazon Athena Partitioned Tables

  • Amazon Athena allows querying of data stored in S3 using SQL

Create S3 Buckets to Store Data

  • Retail transaction data
    • Name: <s3-bucket-name-retail>
    • Leave all other options as default
    • Click Create bucket
  • Athena query results
    • Name: <s3-bucket-name-athena-results>
    • Leave all other options as default
    • Click Create bucket

Set Up a Lambda Function to Generate and Upload CSV Files

  • Choose Author from scratch
  • Function name: <unique-function-name>
  • Runtime: Python 3.12
  • Execution role: choose an existing one or create a new IAM role
  • Click Create function
  • Use the following script:
import boto3
import csv
from io import StringIO
from datetime import datetime
from dateutil.relativedelta import relativedelta
import random

s3_client = boto3.client('s3')
bucket_name = 'retail-transaction-data-lab-<unique-number>'

# Sample product names for generating data
product_names = [
    "Widget A", "Widget B", "Gadget C", "Gadget D",
    "Thingamajig E", "Contraption F", "Device G", "Apparatus H"
]

def generate_sample_data(transaction_date, num_records):
    """Generate sample transaction data for a given date."""
    data = []
    for _ in range(num_records):
        transaction_id = f'TRAN{random.randint(1000, 9999)}'
        store_id = f'ST{str(random.randint(1, 10)).zfill(3)}'
        product_id = f'P{str(random.randint(1, 50)).zfill(3)}'
        product_name = random.choice(product_names)
        quantity = random.randint(1, 10)
        price = f'{random.uniform(5.0, 100.0):.2f}'

        # Appending generated data as a row
        data.append([transaction_id, store_id, product_id, product_name, quantity, price, transaction_date])
    return data

def create_partition_path(year, month):
    """Return the S3 path for the given year and month."""
    return f'year={year}/month={month}/'

def upload_csv_to_s3(data, year, month, write_header=True):
    """Create and upload the CSV to S3 for a given partition."""
    # Create the CSV in memory
    csv_buffer = StringIO()
    writer = csv.writer(csv_buffer)

    # Write the header only if specified
    if write_header:
        writer.writerow(['transaction_id', 'store_id', 'product_id', 'product_name', 'quantity', 'price', 'transaction_date'])

    # Write the data rows
    writer.writerows(data)

    # Generate the S3 partition path
    partition_path = create_partition_path(year, str(month).zfill(2))

    # Upload CSV to S3
    s3_client.put_object(Bucket=bucket_name, Key=partition_path + 'transactions.csv', Body=csv_buffer.getvalue())

    print(f'Uploaded CSV to s3://{bucket_name}/{partition_path}transactions.csv')

def lambda_handler(event, context):
    # Optional: Seed the random number generator for reproducibility
    random.seed(42)

    # Set the number of months and records to generate
    months_to_generate = 12  # Last 12 months
    records_per_month = random.randint(50, 150)  # Generate between 50 and 150 records

    current_date = datetime.now()

    for month_offset in range(months_to_generate):
        # Calculate the date and year/month once for this month
        transaction_date = (current_date - relativedelta(months=month_offset)).strftime('%Y-%m-%d')
        year = (current_date - relativedelta(months=month_offset)).year
        month = (current_date - relativedelta(months=month_offset)).month

        # Generate dynamic transaction data
        data = generate_sample_data(transaction_date, records_per_month)

        # Upload the generated CSV data to S3
        upload_csv_to_s3(data, year, month)

    return {
        'statusCode': 200,
        'body': 'Uploaded transaction data for multiple months.'
    }

Test Lambda Function

  • Click on Test and configure a test event
  • Click Save and Test again

Create a Partitioned Table in Athena

  • In Athena, click on Edit settings
  • Browse and select the S3 bucket folder you created earlier for athena results
  • Run the following query: CREATE DATABASE retail_data_db;
  • Run the following to create a table that reflects the partitioned structure:
CREATE EXTERNAL TABLE IF NOT EXISTS retail_data_db.retail_transactions (
    transaction_id STRING,
    store_id STRING,
    product_id STRING,
    quantity BIGINT,
    price DOUBLE,
    transaction_date STRING
)
PARTITIONED BY (year STRING, month STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://retail-transaction-data-lab-3333/'
TBLPROPERTIES ('has_encrypted_data'='false')
  • Add a partition: MSCK REPAIR TABLE retail_data_db.retail_transactions;
  • Query the partition:
SELECT *
FROM retail_data_db.retail_transactions
WHERE year = '2024' AND month = '01';

Simplifying ETL Job with AWS Glue Visual ETL

  • ETL stands for extract, transform and load

## Prepare Environment

  • Create two S3 buckets and call them input and output
  • Download transactions csv file and upload it to the input folder

Creating an ETL Job Using Visual ETL

  • In AWS Glue, click on Author and edit ETL jobs
  • Click on Visual ETL
  • Rename the job (click into Untitled job)
  • Select an existing IAM role or create one
  • Scroll down to Requested number of workers and type 2 in the input box
  • Return to the previous tab (visual) and click on the blue circle with a plus sign
  • Click on Amazon S3 on the left
  • Click on Amazon S3 node
  • Paste the following directory: s3://{your-bucket-name}/input
  • Data format: CSV
  • Click on the plus sign again (to add a node) and navigate to Transforms tab
  • Select Detect Sensitive Data
  • Click on Detect Sensitive Data Transforms node
  • Detect sensitive data: Finds columns that contain sensitive data
  • Type of sensitive information to detect: select specific patterns
  • Select patterns: search for credit card and select it
  • Select global action: REDACT. redact detected text
  • Add a node and navigate to Transforms tab, select Change Schema
  • Click on Change Schema Transforms node
    • Node parents: detect sensitive data
    • Change schema (apply mapping)
      • Source key: Amount
      • Target key: TransactionAmount
  • Add another node and navigate to Targets tab, select S3
  • Click on Amazon S3 Target node
  • Node parents: change schema
    • Format: Parquet
    • Compression type: GZIP
    • S3 target location: s3://{your-bucket-name}/output/
  • Click Save

Getting Started with AWS Glue Workflows

Set Up S3 Buckets

  • Create two folders called input and output

Creating AWS Glue Jobs

  • Go to AWS Glue and select ETL jobs
  • Click script editor > under engine select Python shell and select start fresh as the options> click create script
  • Name the job: Remove duplicates
  • Use the following script:
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions

# Fetch job parameters
args = getResolvedOptions(sys.argv, [])

input_path = "s3://{bucket_name_INPUT_bucket}/Sales_Data.csv"
output_path = "s3://{bucket_name_OUTPUT_bucket}/"

# Load the data from S3
data = pd.read_csv(input_path)

# Data cleaning process
cleaned_data = data.drop_duplicates()

# Save the cleaned data back to S3
cleaned_data.to_csv(output_path + "sales_data_removed-duplicates.csv", index=False)
  • Select an existing IAM role or create a new one
  • Click Save
  • Create a data summarisation job
    • Create a job and name is Summarise Data, follow the same steps above
    • Choose an existing IAM or create a new one
    • Add the following script:
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions


# Fetch job parameters

args = getResolvedOptions(sys.argv, [])

input_path = "s3://{bucket_name_INPUT_bucket}/sales_data_removed-duplicates.csv"
output_path = "s3://{bucket_name_OUTPUT_bucket}/"

data = pd.read_csv(input_path)

summary = data.groupby(['ProductID', 'ProductName']).agg(
Total_Quantity=('Quantity', 'sum'),
Total_Sales=('Price', lambda x: (x \* data.loc[x.index, 'Quantity']).sum())
).reset_index()

# Save the summarized data back to S3

summary.to_csv(output_path + "sales_data_removed-duplicates_summation-done.csv", index=False)
  • Click Save

Creating and Running the Workflow

  • Create a workflow in Glue
  • Click Add trigger
    • Add a name and description
    • Trigger type: on demand
    • Click Add
  • Add jobs for summarise data to start after remove duplicates job
    • Click on add node
    • Tick remove duplicates before clicking add
    • Click on Remove duplicates
    • Click on Add trigger and give it name before clicking Add
    • Tick on Summarise data and click Add
  • Run the workflow

Processing Amazon DynamoDB Streams with AWS Lambda

Enabling an Amazon DynamoDB Stream

  • Create a new table in DynamoDB
  • Name: Foods
  • Partition Key: Desserts
  • Leave other configurations as default
  • Select the newly created table, Foods
  • Go to Exports and streams and find DynamoDB stream details
    • Click on Turn on

Creating an AWS Lambda Function to Process DynamoDB Streams

  • Choose Create new function
  • Author from scratch
  • Function Name: ProcessDynamoDBStream
  • Runtime: Python 3.13
  • Change execution role to IAM role that isn’t default or create one
  • Click Create function
  • In the code editor, paste the following code in and click Deploy:
import json
def lambda_handler(event, context):
    for record in event['Records']:
        if record['eventName'] == 'INSERT':
            print(f"New item added: {record['dynamodb']['NewImage']}")
        elif record['eventName'] == 'MODIFY':
            print(f"Item modified: {record['dynamodb']['NewImage']}")
        elif record['eventName'] == 'REMOVE':
            print(f"Item removed: {record['dynamodb']['OldImage']}")
    return {
        'statusCode': 200,
        'body': json.dumps('Processed event successfully')
    }
  • Scroll up to Function Overview and click Add Trigger
    • Trigger configuration: DynamoDB
    • DynamoDB table: Food
    • Batch size: 1
    • Leave all other configurations as default and click Add

Testing the Set Up for Processing the DynamoDB Stream

  • Return to the DynamoDB table and click Actions > Create item
  • Enter any random dessert and click Create item
  • Go back to the Lambda function and click on Monitor and then View CloudWatch logs