Amazon Simple Queue Service (SQS) Use Cases

Introduction

In this blog section, Amazon Simple Queue Service (SQS) will be discussed with use cases to better understand and gain hands-on experience. In the big data learning, it is very important to dirty your hands, otherwise, the efforts that you endeavored will be like writing on the water.

Amazon Simple Queue Service (Amazon SQS) offers a secure, durable, and available hosted queue that lets you integrate and decouple distributed software systems and components.

Amazon SQS supports both standard and FIFO queues.

What are the main benefits of Amazon SQS?

  • Security: You control who can send messages to and receive messages from the Amazon SQS queue. Server-side encryption (SSE) lets you transmit sensitive data by protecting the contents of messages in queues using keys managed in AWS Key Management Service (AWS KMS)
  • Durability: To ensure the safety of your messages, Amazon SQS stores them on multiple servers. Standard queues support at-least-once message delivery, and FIFO queues support exactly-once message processing.
  • Availability: Amazon SQS uses redundant infrastructure to provide highly concurrent access to messages and high availability for producing and consuming messages.
  • Scalability: Amazon SQS can process each buffered request independently, scaling transparently to handle any load increases or spikes without any provisioning instructions.
  • Reliability: Amazon SQS locks your messages during processing so that multiple producers can send and multiple consumers can receive messages at the same time.
  • Customization: Your queues don’t have to be exactly alike – for example, you can set a default delay in a queue. You can store the contents of messages larger than 256 KB using Amazon Simple Storage Service (Amazon S3) or Amazon DynamoDB, with Amazon SQS holding a pointer to the Amazon S3 object, or you can split a large message into smaller messages.

Getting Started

I follow the AWS Tutorial topics for simulating the cases. Before the start of the Amazon SQS tutorials, you must complete the Setting up Amazon SQS steps.

Topics:

Creating Amazon SQS queues

Three ways cam be used for creating an Amazon SQS queue;

  • AWS Management Console
  • AWS SDK for Java
  • AWS CloudFormation

AWS Management Console

After signing to the Amazon SQS console, Create New Queue selection is selected and an appropriate name is given for Queue Name. If the queue type will be FIFO, then the name should be ended with .fifo suffix (Standard is selected by default, choose FIFO). In this use case, I will choose the FIFO Queue option, and then click Quick-Create Queue button on the right bottom.

Create Amazon SQS Queue
Figıre-1. Create Amazon SQS Queue

For a FIFO queue, the Content-Based Deduplication column displays whether you have enabled exactly-once processing.

AWS SDK for Java

There are many ways for connecting AWS services via the development environment. My way is to use AWS Toolkit plugin of Intellij. After reading here, you can connect your AWS services easily.

In order to create, modify, and delete queue, below sample code is used.

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;

import java.util.List;
import java.util.Map.Entry;

/**
 * This sample demonstrates how to make basic requests to Amazon SQS using the
 * AWS SDK for Java.
 * <p>
 * Prerequisites: You must have a valid Amazon Web Services developer account,
 * and be signed up to use Amazon SQS. For more information about Amazon SQS,
 * see https://aws.amazon.com/sqs
 * <p>
 * Make sure that your credentials are located in ~/.aws/credentials
 */
public class SQSSimpleJavaClientExample {
    public static void main(String[] args) {
        /*
         * Create a new instance of the builder with all defaults (credentials
         * and region) set automatically. For more information, see
         * Creating Service Clients in the AWS SDK for Java Developer Guide.
         */

        final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

        System.out.println("===============================================");
        System.out.println("Getting Started with Amazon SQS Standard Queues");
        System.out.println("===============================================\n");

        try {
            // Create a queue.
            System.out.println("Creating a new SQS queue called MyQueue.\n");
            final CreateQueueRequest createQueueRequest =
                    new CreateQueueRequest("MyQueueSDK");
            final String myQueueUrl = sqs.createQueue(createQueueRequest)
                    .getQueueUrl();

            // List all queues.
            System.out.println("Listing all queues in your account.\n");
            for (final String queueUrl : sqs.listQueues().getQueueUrls()) {
                System.out.println("  QueueUrl: " + queueUrl);
            }
            System.out.println();

            // Send a message.
            System.out.println("Sending a message to MyQueue.\n");
            sqs.sendMessage(new SendMessageRequest(myQueueUrl,
                    "This is my message text."));

            // Receive messages.
            System.out.println("Receiving messages from MyQueue.\n");
            final ReceiveMessageRequest receiveMessageRequest =
                    new ReceiveMessageRequest(myQueueUrl);
            final List<Message> messages = sqs.receiveMessage(receiveMessageRequest)
                    .getMessages();
            for (final Message message : messages) {
                System.out.println("Message");
                System.out.println("  MessageId:     "
                        + message.getMessageId());
                System.out.println("  ReceiptHandle: "
                        + message.getReceiptHandle());
                System.out.println("  MD5OfBody:     "
                        + message.getMD5OfBody());
                System.out.println("  Body:          "
                        + message.getBody());
                for (final Entry<String, String> entry : message.getAttributes()
                        .entrySet()) {
                    System.out.println("Attribute");
                    System.out.println("  Name:  " + entry
                            .getKey());
                    System.out.println("  Value: " + entry
                            .getValue());
                }
            }
            System.out.println();

            // Delete the messages.
            System.out.println("Deleting the messages that we received.\n");

            for (final Message message : messages) {
                sqs.deleteMessage(new DeleteMessageRequest(myQueueUrl,
                        message.getReceiptHandle()));
            }

            // Delete the queue.
            System.out.println("Deleting the test queue.\n");
            sqs.deleteQueue(new DeleteQueueRequest(myQueueUrl));
        } catch (final AmazonServiceException ase) {
            System.out.println("Caught an AmazonServiceException, which means " +
                    "your request made it to Amazon SQS, but was " +
                    "rejected with an error response for some reason.");
            System.out.println("Error Message:    " + ase.getMessage());
            System.out.println("HTTP Status Code: " + ase.getStatusCode());
            System.out.println("AWS Error Code:   " + ase.getErrorCode());
            System.out.println("Error Type:       " + ase.getErrorType());
            System.out.println("Request ID:       " + ase.getRequestId());
        } catch (final AmazonClientException ace) {
            System.out.println("Caught an AmazonClientException, which means " +
                    "the client encountered a serious internal problem while " +
                    "trying to communicate with Amazon SQS, such as not " +
                    "being able to access the network.");
            System.out.println("Error Message: " + ace.getMessage());
        }
    }
}

The output of sample code will be alike below

===============================================
Getting Started with Amazon SQS Standard Queues
===============================================

Creating a new SQS queue called MyQueue.

Listing all queues in your account.

  QueueUrl: https://sqs.us-west-1.amazonaws.com/XXXXXXXXX/MyQueueSDK

Sending a message to MyQueue.

Receiving messages from MyQueue.

Message
  MessageId:     54980861-423d-44b6-b0e7-448f82283634
  ReceiptHandle: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
  MD5OfBody:     6a1559560f67c5e7a7d5d838bf0272ee
  Body:          This is my message text.

Deleting the messages that we received.

Deleting the test queue.


Process finished with exit code 0

To create a FIFO queue, FifoQueue attribute should be set as true.

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.*;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

public class SQSFIFOJavaClientExample {
    public static void main(String[] args) {
        /*
         * Create a new instance of the builder with all defaults (credentials
         * and region) set automatically. For more information, see
         * Creating Service Clients in the AWS SDK for Java Developer Guide.
         */
        
        final AmazonSQS sqs = AmazonSQSClientBuilder.defaultClient();

        System.out.println("===========================================");
        System.out.println("Getting Started with Amazon SQS FIFO Queues");
        System.out.println("===========================================\n");

        try {

            // Create a FIFO queue.
            System.out.println("Creating a new Amazon SQS FIFO queue called " +
                    "MyFifoQueue.fifo.\n");
            final Map<String, String> attributes = new HashMap<>();

            // A FIFO queue must have the FifoQueue attribute set to true.
            attributes.put("FifoQueue", "true");

            /*
             * If the user doesn't provide a MessageDeduplicationId, generate a
             * MessageDeduplicationId based on the content.
             */
            attributes.put("ContentBasedDeduplication", "true");

            // The FIFO queue name must end with the .fifo suffix.
            final CreateQueueRequest createQueueRequest =
                    new CreateQueueRequest("MyFifoQueue.fifo")
                            .withAttributes(attributes);
            final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

            // List all queues.
            System.out.println("Listing all queues in your account.\n");
            for (final String queueUrl : sqs.listQueues().getQueueUrls()) {
                System.out.println("  QueueUrl: " + queueUrl);
            }
            System.out.println();

            // Send a message.
            System.out.println("Sending a message to MyFifoQueue.fifo.\n");
            final SendMessageRequest sendMessageRequest =
                    new SendMessageRequest(myQueueUrl,
                            "This is my message text.");

            /*
             * When you send messages to a FIFO queue, you must provide a
             * non-empty MessageGroupId.
             */
            sendMessageRequest.setMessageGroupId("messageGroup1");

            // Uncomment the following to provide the MessageDeduplicationId
            //sendMessageRequest.setMessageDeduplicationId("1");
            final SendMessageResult sendMessageResult = sqs
                    .sendMessage(sendMessageRequest);
            final String sequenceNumber = sendMessageResult.getSequenceNumber();
            final String messageId = sendMessageResult.getMessageId();
            System.out.println("SendMessage succeed with messageId "
                    + messageId + ", sequence number " + sequenceNumber + "\n");

            // Receive messages.
            System.out.println("Receiving messages from MyFifoQueue.fifo.\n");
            final ReceiveMessageRequest receiveMessageRequest =
                    new ReceiveMessageRequest(myQueueUrl);

            // Uncomment the following to provide the ReceiveRequestDeduplicationId
            //receiveMessageRequest.setReceiveRequestAttemptId("1");
            final List<Message> messages = sqs.receiveMessage(receiveMessageRequest)
                    .getMessages();
            for (final Message message : messages) {
                System.out.println("Message");
                System.out.println("  MessageId:     "
                        + message.getMessageId());
                System.out.println("  ReceiptHandle: "
                        + message.getReceiptHandle());
                System.out.println("  MD5OfBody:     "
                        + message.getMD5OfBody());
                System.out.println("  Body:          "
                        + message.getBody());
                for (final Entry<String, String> entry : message.getAttributes()
                        .entrySet()) {
                    System.out.println("Attribute");
                    System.out.println("  Name:  " + entry.getKey());
                    System.out.println("  Value: " + entry.getValue());
                }
            }
            System.out.println();

            // Delete the messages.
            System.out.println("Deleting the messages that we received.\n");

            for (final Message message : messages) {
                sqs.deleteMessage(new DeleteMessageRequest(myQueueUrl,
                        message.getReceiptHandle()));
            }

            // Delete the queue.
            System.out.println("Deleting the queue.\n");
            sqs.deleteQueue(new DeleteQueueRequest(myQueueUrl));
        } catch (final AmazonServiceException ase) {
            System.out.println("Caught an AmazonServiceException, which means " +
                    "your request made it to Amazon SQS, but was " +
                    "rejected with an error response for some reason.");
            System.out.println("Error Message:    " + ase.getMessage());
            System.out.println("HTTP Status Code: " + ase.getStatusCode());
            System.out.println("AWS Error Code:   " + ase.getErrorCode());
            System.out.println("Error Type:       " + ase.getErrorType());
            System.out.println("Request ID:       " + ase.getRequestId());
        } catch (final AmazonClientException ace) {
            System.out.println("Caught an AmazonClientException, which means " +
                    "the client encountered a serious internal problem while " +
                    "trying to communicate with Amazon SQS, such as not " +
                    "being able to access the network.");
            System.out.println("Error Message: " + ace.getMessage());
        }
    }
}
===========================================
Getting Started with Amazon SQS FIFO Queues
===========================================

Creating a new Amazon SQS FIFO queue called MyFifoQueue.fifo.

Listing all queues in your account.

  QueueUrl: https://sqs.us-west-1.amazonaws.com/xxxxxxxxxx/MyFifoQueue.fifo

Sending a message to MyFifoQueue.fifo.

SendMessage succeed with messageId ab1a5ced-a350-46d6-94c5-a1b00b8c171b, sequence number 18854092782457583616

Receiving messages from MyFifoQueue.fifo.

Message
  MessageId:     ab1a5ced-a350-46d6-94c5-a1b00b8c171b
  ReceiptHandle: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
  MD5OfBody:     6a1559560f67c5e7a7d5d838bf0272ee
  Body:          This is my message text.

Deleting the messages that we received.

Deleting the queue.


Process finished with exit code 0

AWS CloudFormation

You can use the AWS CloudFormation console and a JSON (or YAML) template to create an Amazon SQS queue.

MyQueue.json file is created and its content is filled with below JSON code.

{
   "AWSTemplateFormatVersion": "2010-09-09",
   "Resources": {
      "MyQueue": {
         "Properties": {
            "QueueName": "MyQueue.fifo",
            "FifoQueue": true,
            "ContentBasedDeduplication": true
             },
         "Type": "AWS::SQS::Queue"
         }
      },
   "Outputs": {
      "QueueName": {
         "Description": "The name of the queue",
         "Value": {
            "Fn::GetAtt": [
               "MyQueue",
               "QueueName"
            ]
         }
      },
      "QueueURL": {
         "Description": "The URL of the queue",
         "Value": {
            "Ref": "MyQueue"
         }
      },
      "QueueARN": {
         "Description": "The ARN of the queue",
         "Value": {
            "Fn::GetAtt": [
               "MyQueue",
               "Arn"
            ]
         }
      }
   }
}
  1. Sign in to the AWS CloudFormation console, and then choose Create Stack.
  2. On the Specify Template panel, choose Upload a template file, choose your MyQueue.json file, and then choose Next.
  3. On the Specify Details page, type MyQueue for Stack Name, and then choose Next.
  4. On the Options page, choose Next.
  5. On the Review page, choose Create.

AWS CloudFormation begins to create the MyQueue stack and displays the CREATE_IN_PROGRESS status. When the process is complete, AWS CloudFormation displays the CREATE_COMPLETE status.

Create Amazon SQS queue via AWS CloudFormation service
Figure-2. Create Amazon SQS queue via AWS CloudFormation service
Amazon SQS queue via AWS CloudFormation service
Figure-3. Amazon SQS queue via AWS CloudFormation service

Sending messages to Amazon SQS queues

After creating an Amazon SQS queue, you can send a message in two ways

  1. AWS Management Console
  2. AWS SDK for Java

AWS Management Console

Figure-4. Send a message using AWS Management Console
Figure-5. Sending Message Dialog box

Message Group ID and Message Deduplication ID should be set for FIFO Queue.

Figure-6. Result dialog box

AWS SDK for Java

In order to send a message to a standard queue, the above java code snippet (SQSSimpleJavaClientExample) is used.

sqs.sendMessage(new SendMessageRequest(myQueueUrl, "This is my message text."));

This line sends message to queue that we created before, so the result was like below

Message
  MessageId:     ab1a5ced-a350-46d6-94c5-a1b00b8c171b
  ReceiptHandle: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
  MD5OfBody:     6a1559560f67c5e7a7d5d838bf0272ee
  Body:          This is my message text.

But, when we send it to a FIFO queue, you should define MessageGroupId as a parameter.

// Send a message
System.out.println("Sending a message to MyFifoQueue.fifo.\n");
final SendMessageRequest sendMessageRequest = new SendMessageRequest(myQueueUrl, "This is my message text.");

// When you send messages to a FIFO queue, you must provide a non-empty MessageGroupId.
sendMessageRequest.setMessageGroupId("messageGroup1");
one message on FIFO queue
Figure-7. one message on FIFO queue

Receiving and deleting a message from an Amazon SQS queue

The message which send before for FIFO queue can be deleted using SQS Management Console.

Deleting message from FIFO Queue
Figure-8. Deleting message from FIFO Queue

In order to see message, it should be polled before.

Figure-9. Polling for Messages
Visibility progress bar when polling
Figure-10. Visibility progress bar when polling

After completing the polling, message can be deleted within visibility timeout.

Message deleting from Queue
Figure-11. Visible for Consumer

When message is visible for consumer (progress bar completed as 100%), message cannot be deleted. Before visibility progress, it can be deleted.