ITM 300 - Cloud Foundations

Module 08: Product 8 Lab - SNS & SQS Workflow

list

Quick Oil Change and Repair

Product Objective

In this lab, we will leverage AWS Simple Notification Service (SNS) and Simple Queue Service (SQS) to set up a system capable of sending multiple push messages to various services, such as billing, customer notifications, and analytics. By sending a single SNS message upon a service status update, we can trigger multiple downstream services, streamlining communication and ensuring efficient, real-time updates across our application.

Create the SNS Topic

Create a SQS Queue

Create a second Queue

Subscribe to the topic for both Queues

Follow the same steps to subscribe to the VehicleStatus SNS Topic for the AllStatusUpdatesQueue as well.

Add the filter

{
  "service_status": [
    "Completed"
  ]
}

Send a message

{
    "Type": "Notification",
    "Message": "Testing a message"
}

View the message

Do the same for the VehicleCompletedQueue. You should see the message in both places.

Send another message

Send another message, but this time put Approved for the service_status.

{
    "Type": "Notification",
    "Message": "Testing a message that should only be in all status"
}

Go back and check the queues. You should see the message in the AllStatusUpdatesQueue, but not the VehicleCompletedQueue because of the filter we added to the subscription.

Update our lambda to send a SNS message on updates.

import { SNSClient, PublishCommand } from "@aws-sdk/client-sns"; // ES Modules import
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
  DynamoDBDocumentClient,
  ScanCommand,
  PutCommand,
  GetCommand,
  DeleteCommand,
  UpdateCommand,
} from "@aws-sdk/lib-dynamodb";

const client = new DynamoDBClient({});

const mydynamodb = DynamoDBDocumentClient.from(client);

const tableName = "VehicleServices";    

export const getDynamoServiceRequests = async () => {
    const statusToExclude = "Completed";
    const statusToExcludeRejected = "Service Rejected";

    try {
        const params = {
            TableName: tableName,
            FilterExpression: "attribute_not_exists(service_status) OR (#service_status <> :status AND #service_status <> :statusRejected)",
            ExpressionAttributeNames: {
                "#service_status": "service_status"
            },
            ExpressionAttributeValues: {
                ":status": statusToExclude,
                ":statusRejected":statusToExcludeRejected
            }
        };      
        const body = await mydynamodb.send(new ScanCommand(params));
        return body.Items; // Return JSON string of items
    } catch (error) {
        console.error("Error fetching DynamoDB service requests:", error);
        throw error; // Re-throw the error to handle it further up the call stack
    }
}

export const addDynamoServiceRequest = async (requestBody) => {
  try {
    const serviceId = generateServiceId(); // Generate unique service_id based on current date and time      
    const params = {
      TableName: tableName,
      Item: {
        service_id: serviceId, // Assuming service_id is provided in requestBody
        service_status: "New Request",
        service_description: requestBody.service_description,
        phone_number: requestBody.phone_number,
        license_plate: requestBody.license_plate ?? "Unknown", // Use requestBody.license_number or default to "Unknown"
      },
    };

    await mydynamodb.send(new PutCommand(params));

    return `Successfully added new service request`;
  } catch (error) {
    console.error("Error adding DynamoDB service request:", error);
    throw error; // Re-throw the error to handle it further up the call stack
  }
};

export const updateDynamoServiceRequest = async (requestBody, serviceId) => {
  try {

    const licensePlate = requestBody.license_plate ?? "Unknown";

    // Check if the item exists
    const getParams = {
      TableName: tableName,
      Key: {
        service_id: serviceId,
        license_plate: licensePlate,
      },
    };

    const { Item } = await mydynamodb.send(new GetCommand(getParams));

    if (!Item) {
      throw new Error("No record found");
    }    

    // Initialize the UpdateExpression components
    let updateExpression = "set";
    const expressionAttributeNames = {};
    const expressionAttributeValues = {};

    // Dynamically build the UpdateExpression, ExpressionAttributeNames, and ExpressionAttributeValues
    if (requestBody.service_description) {
      updateExpression += " #sd = :sd,";
      expressionAttributeNames["#sd"] = "service_description";
      expressionAttributeValues[":sd"] = requestBody.service_description;
    }
    if (requestBody.phone_number) {
      updateExpression += " #pn = :pn,";
      expressionAttributeNames["#pn"] = "phone_number";
      expressionAttributeValues[":pn"] = requestBody.phone_number;
    }
    if (requestBody.service_status) {
      updateExpression += " #ss = :ss,";
      expressionAttributeNames["#ss"] = "service_status";
      expressionAttributeValues[":ss"] = requestBody.service_status;
      if (requestBody.service_status == "Completed"){
        updateExpression += " #cd = :cd,";
        expressionAttributeNames["#cd"] = "completion_date";
        expressionAttributeValues[":cd"] = new Date().toString();     
      }       
    }


    // Remove any trailing comma from the update expression
    updateExpression = updateExpression.replace(/,$/, "");

    const params = {
      TableName: tableName,
      Key: {
        service_id: serviceId,
        license_plate: requestBody.license_plate ?? "Unknown", // Assuming license_plate is part of the key
      },
      UpdateExpression: updateExpression,
      ExpressionAttributeNames: expressionAttributeNames,
      ExpressionAttributeValues: expressionAttributeValues,
    };

    await mydynamodb.send(new UpdateCommand(params));

    await sendSNSMessage(requestBody,serviceId);

    return `Successfully updated service request`;
  } catch (error) {
    if (error.message === "No record found") {
      return error.message;
    } else {
      console.error("Error updating DynamoDB service request:", error);
      throw error; // Re-throw the error to handle it further up the call stack
    }
  }
};



// Helper function to generate a unique service_id based on current date and time
const generateServiceId = () => {
  const now = new Date();
  const formattedDate = now.toISOString().replace(/[-T:.Z]/g, ""); // Format date string
  const milliseconds = now.getMilliseconds().toString().padStart(3, "0"); // Get milliseconds and pad with leading zeros if necessary
  return `${formattedDate}${milliseconds}`; // Concatenate date and milliseconds
};

async function sendSNSMessage(requestBody, serviceId){
    const client = new SNSClient();
    const input = { // PublishInput
      TopicArn: "REPLACE-WITH-YOUR-TOPIC-ARN",
      Message: `{"service_status":"${requestBody.service_status}"}`,
      Subject: serviceId,
      MessageAttributes: { // MessageAttributeMap
        "service_status": { // MessageAttributeValue
          DataType: "String", // required
          StringValue: `${requestBody.service_status}`,
        },
      },
      // MessageDeduplicationId: `${requestBody.service_id}`,
      // MessageGroupId: `${requestBody.service_id}`,
    };
    const command = new PublishCommand(input);
    const response = await client.send(command);  
}

You'll need to go get the Topic ARN from the SNS Topic VehicleStatus. You'll update the code where it says "REPLACE-WITH-YOUR-TOPIC-ARN" with the ARN of your Topic.

Once you've made the update, go to your website and submit a service request. Type in a phone number, license plate number, and the message requesting an oil change for a 2020 Corvette.

Then log into the admin area and then change the status for that service to Approved.

Go look at the queues. You should see the message under one but not the other.

Update the status to Completed from the admin area.

Go look at the queues. You should see the new message under both.

Add a lambda to run when a message is received

Create a new lambda function named VehicleAnalytics.

Add this code to the index.mjs file:

export const handler = async (event) => {
  // TODO implement
  const response = {
    statusCode: 200,
    body: JSON.stringify('Status Updated!' + event),
  };
      console.log("status updated!"+event);
    for (const record of event.Records) {
        // Log the message details
        console.log('Message ID:', record.messageId);
        console.log('Receipt Handle:', record.receiptHandle);
        console.log('Body:', record.body);

        // Process the message (e.g., parse JSON, interact with other AWS services, etc.)
        try {
            const messageBody = JSON.parse(record.body);
            // Example: Log the parsed message body
            console.log('Parsed Message Body:', messageBody);
            // You can add your custom processing logic here
            const fullMessage = JSON.parse(messageBody.Message);
            console.log("Full Message:",fullMessage);
        } catch (error) {
            console.error('Error processing message:', error);
        }

    }    
  console.log(event['Records']);
  return response;
};

Go to the SQS queue for the AllStatusUpdatesQueue

Go look at the queue. You'll notice that the messages are now gone from that queue. Once a message is processed by a lambda with a 200 status, it will automatically be deleted from the queue.

You can now go to the CloudWatch service and look at the log groups for the VehicleAnalytics.

CloudWatch Log Groups

There you should see logs that have information about the two updates you did previously. These logs could then be used to create dashboards and visualizations of the services performed over time.

CloudWatch SNS Log

Outside of the lab environment, SNS can be utilized to send emails and text messages, which can be very useful for notifying customers about the status of their vehicles or informing them that their car is ready for pickup.

By integrating SNS and SQS, we can effortlessly add new workflows to various activities without requiring the original service to be aware of the additional pipelines that have been established.

Lab Summary

This lab focuses on utilizing AWS Simple Notification Service (SNS) and Simple Queue Service (SQS) to create a messaging system that can notify various services about changes in vehicle service statuses. The key objectives include setting up SNS topics and SQS queues, configuring subscriptions, filtering messages, sending and receiving messages, and integrating these functionalities into a Lambda function for further processing.

Key Concepts Explained:

Key AWS Services:

Reflection Questions: