<dynamoDB-table-name>
{
"MovieTitle": {
"S": "The Dark Knight"
},
"ReleaseYear": {
"N": "2008"
},
"Genre": {
"S": "Action"
},
"Rating": {
"N": "9.0"
},
"Director": {
"S": "Christopher Nolan"
}
}
{
"MovieTitle": {
"S": "Inception"
},
"ReleaseYear": {
"N": "2010"
},
"Genre": {
"S": "Sci-Fi"
},
"Rating": {
"N": "8.8"
},
"Director": {
"S": "Christopher Nolan"
}
}
Movies
{
"Year": {
"N": "2020"
},
"Title": {
"S": "Movie A"
},
"Rating": {
"N": "8.1"
}
}
{
"Year": {
"N": "2020"
},
"Title": {
"S": "Movie B"
},
"Rating": {
"N": "7.5"
}
}
s3://<name-of-s3-bucket>/file/
s3://<name-of-s3-bucket>/query/
SELECT * FROM file;
WHERE
, FROM
and ORDER BY
<unique-dynamodb-tb-name>
<unique-lambda-function-name>
import boto3
import random
from datetime import datetime, timedelta
# Initialize DynamoDB
dynamodb = boto3.resource('dynamodb')
table_name = '<your_table_name_for_DynamoDB_Data>' # Replace with your table name
table = dynamodb.Table(table_name)
# Sample data lists for generating realistic names, cities, and emails
first_names = ["John", "Jane", "Alice", "Bob", "Charlie", "Diana"]
last_names = ["Smith", "Johnson", "Williams", "Brown", "Jones", "Garcia"]
cities = ["New York", "Los Angeles", "Chicago", "Houston", "Phoenix", "Philadelphia"]
def generate_random_email(first_name, last_name):
domains = ["example.com", "email.com", "testmail.com", "fakemail.com"]
domain = random.choice(domains)
return f"{first_name.lower()}.{last_name.lower()}@{domain}"
def generate_random_date():
start_date = datetime.now() - timedelta(days=365) # up to one year ago
random_days = random.randint(0, 365)
return (start_date + timedelta(days=random_days)).isoformat()
def lambda_handler(event, context):
# Specify the number of records to generate
num_records = 100 # Adjust this as needed
for _ in range(num_records):
first_name = random.choice(first_names)
last_name = random.choice(last_names)
city = random.choice(cities)
item = {
'id': str(random.randint(1, 1000000)),
'name': f"{first_name} {last_name}",
'email': generate_random_email(first_name, last_name),
'city': city,
'created_at': generate_random_date()
}
# Insert the item into DynamoDB
table.put_item(Item=item)
return {
'statusCode': 200,
'body': f'{num_records} records successfully inserted into {table_name}.'
}
<your_table_name_for_DynamoDB_Data
with the name of the table you created<unique-dynamo-db-s3-name>
<unique-athena-query-s3-name>
In AWS Glue: navigate to ETL jobs > author and edit ETL jobs > script editor > under engine select Python shell and select Start fresh as the Options > click Create script
Configure ETL job
<DynamoDBToS3Export>
import boto3
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from io import BytesIO
# Initialize boto3 resources and clients
dynamodb = boto3.resource('dynamodb')
s3 = boto3.client('s3')
# Define DynamoDB and S3 details
table_name = '<your-DynamoDB-table-name>' # Replace with your DynamoDB table name
output_bucket = '<your-S3-bucket-name>' # Replace with your S3 bucket name
output_key = 'parquet-output/'
# Initialize DynamoDB Table
table = dynamodb.Table(table_name)
# Step 1: Scan DynamoDB Table and Retrieve Data
def scan_dynamodb_table(table):
response = table.scan()
data = response['Items']
# Handle pagination
while 'LastEvaluatedKey' in response:
response = table.scan(ExclusiveStartKey=response['LastEvaluatedKey'])
data.extend(response['Items'])
return data
# Retrieve data from DynamoDB
data = scan_dynamodb_table(table)
# Step 2: Convert Data to Pandas DataFrame
df = pd.DataFrame(data)
# Step 3: Convert DataFrame to Apache Parquet Format
table = pa.Table.from_pandas(df)
parquet_buffer = BytesIO()
pq.write_table(table, parquet_buffer)
## Step 4: Upload Parquet File to S3
parquet_buffer.seek(0)
s3.put_object(Bucket=output_bucket, Key=f"{output_key}dynamodb_data.parquet", Body=parquet_buffer)
print("Export from DynamoDB to Parquet format in S3 completed.")
CREATE DATABASE customer_order_db;
CREATE EXTERNAL TABLE IF NOT EXISTS customer_order_db.dynamodb_data (
id STRING,
name STRING,
email STRING,
city STRING,
created_at STRING
)
STORED AS PARQUET
LOCATION 's3://<your-DynamoDB-table-name>/parquet-output/';
SELECT *
FROM customer_order_db.dynamodb_data
LIMIT 10;
SELECT
id,
name,
email,
city,
created_at
FROM customer_order_db.dynamodb_data
WHERE city = 'New York';
SELECT
city,
COUNT(*) AS total_customers
FROM customer_order_db.dynamodb_data
GROUP BY city
ORDER BY total_customers DESC;
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions
# Fetch job parameters
args = getResolvedOptions(sys.argv, [])
input_path = "s3://{bucket_name}/input/transactions.csv"
output_path = "s3://{bucket_name}/output/"
# Load data from S3
transactions = pd.read_csv(input_path)
# Function to redact card numbers
def redact_card_number(card_number):
# Convert the card number to a string, just in case it's in numeric format
card_number = str(card_number)
# Replace all but the last 4 digits with asterisks
return '*' * (len(card_number) - 4) + card_number[-4:]
# Apply the redaction function to the CardNumber column
transactions['CardNumber'] = transactions['CardNumber'].apply(redact_card_number)
# Save processed data
transactions.to_csv(output_path + "transactions_redacted.csv", index=False)
print("Data processing complete. Card numbers redacted.")
{bucket_name}
to your bucket<s3-bucket-name-retail>
<s3-bucket-name-athena-results>
<unique-function-name>
import boto3
import csv
from io import StringIO
from datetime import datetime
from dateutil.relativedelta import relativedelta
import random
s3_client = boto3.client('s3')
bucket_name = 'retail-transaction-data-lab-<unique-number>'
# Sample product names for generating data
product_names = [
"Widget A", "Widget B", "Gadget C", "Gadget D",
"Thingamajig E", "Contraption F", "Device G", "Apparatus H"
]
def generate_sample_data(transaction_date, num_records):
"""Generate sample transaction data for a given date."""
data = []
for _ in range(num_records):
transaction_id = f'TRAN{random.randint(1000, 9999)}'
store_id = f'ST{str(random.randint(1, 10)).zfill(3)}'
product_id = f'P{str(random.randint(1, 50)).zfill(3)}'
product_name = random.choice(product_names)
quantity = random.randint(1, 10)
price = f'{random.uniform(5.0, 100.0):.2f}'
# Appending generated data as a row
data.append([transaction_id, store_id, product_id, product_name, quantity, price, transaction_date])
return data
def create_partition_path(year, month):
"""Return the S3 path for the given year and month."""
return f'year={year}/month={month}/'
def upload_csv_to_s3(data, year, month, write_header=True):
"""Create and upload the CSV to S3 for a given partition."""
# Create the CSV in memory
csv_buffer = StringIO()
writer = csv.writer(csv_buffer)
# Write the header only if specified
if write_header:
writer.writerow(['transaction_id', 'store_id', 'product_id', 'product_name', 'quantity', 'price', 'transaction_date'])
# Write the data rows
writer.writerows(data)
# Generate the S3 partition path
partition_path = create_partition_path(year, str(month).zfill(2))
# Upload CSV to S3
s3_client.put_object(Bucket=bucket_name, Key=partition_path + 'transactions.csv', Body=csv_buffer.getvalue())
print(f'Uploaded CSV to s3://{bucket_name}/{partition_path}transactions.csv')
def lambda_handler(event, context):
# Optional: Seed the random number generator for reproducibility
random.seed(42)
# Set the number of months and records to generate
months_to_generate = 12 # Last 12 months
records_per_month = random.randint(50, 150) # Generate between 50 and 150 records
current_date = datetime.now()
for month_offset in range(months_to_generate):
# Calculate the date and year/month once for this month
transaction_date = (current_date - relativedelta(months=month_offset)).strftime('%Y-%m-%d')
year = (current_date - relativedelta(months=month_offset)).year
month = (current_date - relativedelta(months=month_offset)).month
# Generate dynamic transaction data
data = generate_sample_data(transaction_date, records_per_month)
# Upload the generated CSV data to S3
upload_csv_to_s3(data, year, month)
return {
'statusCode': 200,
'body': 'Uploaded transaction data for multiple months.'
}
CREATE DATABASE retail_data_db;
CREATE EXTERNAL TABLE IF NOT EXISTS retail_data_db.retail_transactions (
transaction_id STRING,
store_id STRING,
product_id STRING,
quantity BIGINT,
price DOUBLE,
transaction_date STRING
)
PARTITIONED BY (year STRING, month STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION 's3://retail-transaction-data-lab-3333/'
TBLPROPERTIES ('has_encrypted_data'='false')
MSCK REPAIR TABLE retail_data_db.retail_transactions;
SELECT *
FROM retail_data_db.retail_transactions
WHERE year = '2024' AND month = '01';
## Prepare Environment
s3://{your-bucket-name}/input
s3://{your-bucket-name}/output/
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions
# Fetch job parameters
args = getResolvedOptions(sys.argv, [])
input_path = "s3://{bucket_name_INPUT_bucket}/Sales_Data.csv"
output_path = "s3://{bucket_name_OUTPUT_bucket}/"
# Load the data from S3
data = pd.read_csv(input_path)
# Data cleaning process
cleaned_data = data.drop_duplicates()
# Save the cleaned data back to S3
cleaned_data.to_csv(output_path + "sales_data_removed-duplicates.csv", index=False)
import sys
import pandas as pd
from awsglue.utils import getResolvedOptions
# Fetch job parameters
args = getResolvedOptions(sys.argv, [])
input_path = "s3://{bucket_name_INPUT_bucket}/sales_data_removed-duplicates.csv"
output_path = "s3://{bucket_name_OUTPUT_bucket}/"
data = pd.read_csv(input_path)
summary = data.groupby(['ProductID', 'ProductName']).agg(
Total_Quantity=('Quantity', 'sum'),
Total_Sales=('Price', lambda x: (x \* data.loc[x.index, 'Quantity']).sum())
).reset_index()
# Save the summarized data back to S3
summary.to_csv(output_path + "sales_data_removed-duplicates_summation-done.csv", index=False)
Foods
Desserts
Foods
ProcessDynamoDBStream
import json
def lambda_handler(event, context):
for record in event['Records']:
if record['eventName'] == 'INSERT':
print(f"New item added: {record['dynamodb']['NewImage']}")
elif record['eventName'] == 'MODIFY':
print(f"Item modified: {record['dynamodb']['NewImage']}")
elif record['eventName'] == 'REMOVE':
print(f"Item removed: {record['dynamodb']['OldImage']}")
return {
'statusCode': 200,
'body': json.dumps('Processed event successfully')
}