Filter and Delete in In-Flight Messages - SQS - AWS | Devops Junction

In this article, we are going to learn how to filter and delete specific messages in the SQS queue before they are read by the consumer

Sometimes we accidentally push the wrong messages to SQS or for some reason we want to stop the messages in the middle before the consumer read them

we are going to discuss the steps, we can perform to ensure that we remove specific messages before they are read by the consumer

SQS Filter Delete

Steps to filter and delete the messages in SQS

These are the steps I can recommend to stop the messages during the transit before it can be consumed by the worker process

  1. Stop the consumer or point the consumer to another dummy SQS queue for the time being
  2. Configure and increase the Delivery delay on the subjected SQS queue.
  3. Browse the Messages using our SQSCLI product and check the format of the message body
  4. Use the following Python InFlightSQS file to read and remove specific messages and keep the other messages back in the queue

 

Boto Python Script to filter messages and Delete

The script is designed to take four command line arguments during the invocation

  1. --threads [or] -t to define the number of threads between 1 to 10
  2. --search [or] -s to pass the search string to look for in each message
  3. --queue [or] -q queue name to browse and perform the tasks
  4. --delete [or] -d boolen string value True (or) False to control whether the matching message should be deleted or just logged

 

It starts multiple threads defined by the -t or --threads during the invocation. it can differ from 1 to 10 ( you can change this limit in sourcecode)

Each worker thread would act as an instance of the InFlightSQS and try to consume messages from SQS independently

the consumed message would be checked against a condition. If it match, it would be written to a file named worker-<worker-no>.json and deleted only If you have set the delete flag to True

If the message does not match the condition, it would simply be ignored and it would be visible to the consumer after the configured visibility timeout of the SQS

Note*: We recommend you to check the Visibility time configuration of SQS prior running this script.

Ideally this value is 30 seconds, In delayed queues and queues with high visibility time.  you might have to wait little longer for the visibility time to elapse before your actual consumer can read the messages

 

Where to download and how to use

You can download the InFlightSQS source code from the following github repository

https://github.com/AKSarav/InFlightSQS

You can clone the project and make your own changes and submit a Pull request if you want to share that feature with the wider audience

git clone https://github.com/AKSarav/InFlightSQS.git

 

How it handles the Authentication

We are assuming that the Authentication AWS profile from the local environment, Either through the default IAM profile or the AWS CLI profile

Make sure you set your AWS CLI environment right if you are running from a local or IAM profile if you are running the script from an EC2 instance

InFlightSQS  (Boto) use the default Environment variables and the profiles on the local environment.

Usage

usage: inFlightSQS.py [-h] [-t THREADS] [-s SEARCH] [-q QUEUE] [-d DELETE]

Examples

Here are some examples of InFlightSQS with Deletion option set to True and False

# Search for string "stringTOsearch" in SQS Queue "mysqsqueue" and delete the messages
python InFlightSQS.py \
-t 10 \
-s stringTOsearch \
-q mysqsqueue \
-d True \

# Search for string "stringTOsearch" in SQS Queue "mysqsqueue" and do not delete the messages
python InFlightSQS.py \
-t 10 \
-s stringTOsearch \
-q mysqsqueue \
-d False

Installation

Install the dependencies speicified on the requirements.txt file

pip3 install -r requirements.txt

Run the script with the following updates values

  • Thread Count
  • String to Search
  • Queue Name
  • Delete Flag ( False | True)
python InFlightSQS.py -t 10 -s stringTOsearch -q mysqsqueue -d False

Source code for quick reference

Here is the source code for your quick reference but please do check the latest code in the Github repository

import boto
import boto3
import json
import time
import threading
import re
import parser
import os
import argparse
import pdb


def receivemessage(workername):

    print("Starting thread: " + workername)

    sqs = boto3.resource('sqs')
    SQSQ = sqs.get_queue_by_name(QueueName=QNAME)

    no_of_messages = SQSQ.attributes['ApproximateNumberOfMessages']

    while int(no_of_messages) > 0 :
        
        try:
            messages=SQSQ.receive_messages(MaxNumberOfMessages=10,WaitTimeSeconds=20)
        except Exception as e:
            print(e)
            print("Failed to receive message from primary queue")
            exit(1)

        print ("Number of messages in Queue: " + no_of_messages)

        for message in messages:
            msgbody = message.body

            if re.search(r''+SEARCH_STRING, msgbody):
                with open(workername+'.json', 'a') as outfile:
                    outfile.writelines(message.body)
                    outfile.write("\n")
                    # Mark the message as deleted if delete flag is set
                    if DELETE:
                        message.delete()   
                    outfile.close()            
            else:
                pass
                # print("Message does not contain "+SEARCH_STRING+ " ignoring..")

                # uncomment the following block if you want to keep records of the messages that are not matching
                # with open(workername+'-normal.json', 'a') as outfile:
                #     outfile.writelines(message.body)
                #     outfile.write("\n")
                #     # Mark the message as deleted 
                #     outfile.close() 
                
        
        no_of_messages = SQSQ.attributes['ApproximateNumberOfMessages']


if __name__ == '__main__':

    # argparse - get number of threads and search string
    parser = argparse.ArgumentParser()
    parser.add_argument("-t", "--threads", help="Number of threads to spawn", type=int)
    parser.add_argument("-s", "--search", help="Search string to look for in the message body", type=str)
    parser.add_argument("-q", "--queue", help="Name of the queue to read from", type=str)
    parser.add_argument("-d", "--delete", help="Delete the message from the queue after reading", type=bool)
    args = parser.parse_args()

    parser.error = lambda err: print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>")

    

    # Check if the inputs are provided or print usage
    if not args.threads or not args.search or not args.queue or not args.delete:
        print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>")
        exit(1)

    # Check the datatype of the inputs
    if not isinstance(args.threads, int) or not isinstance(args.search, str) or not isinstance(args.queue, str) or not isinstance(args.delete, bool):
        print("Usage: python InFlightSQS.py -t <number of threads> -s <search string> -q <queue name> -d <delete message from queue>")
        exit(1)

    # Print the inputs
    print("Number of threads: " + str(args.threads))
    print("Search string: " + args.search)
    print("Queue name: " + args.queue)
    print("Delete flag: " + str(args.delete))
    
    # Check the inputs for valid values
    if args.threads < 1 or args.threads > 10:
        print("Number of threads should be between 1 and 10")
        exit(1)
    elif args.search == "":
        print("Search string cannot be empty")
        exit(1)
    elif args.queue == "":
        print("Queue name cannot be empty")
        exit(1)
    elif args.delete == "True" or args.delete == "False":
        print("Delete flag cannot be empty and should be either True or False")
        exit(1)

    SEARCH_STRING = args.search
    NUM_THREADS = args.threads
    QNAME = args.queue
    DELETE = args.delete

    
    # Create threads as per the number of threads specified
    for i in range(NUM_THREADS):
        t = threading.Thread(target=receivemessage, args=("worker-"+str(i),))
        t.start()
        time.sleep(1)
    
    # Wait for all threads to finish
    main_thread = threading.currentThread()
    for t in threading.enumerate():
        if t is not main_thread:
            t.join()

 

Conclusion

Hope this script helps you to find and delete the specific messages in SQS queue InFlight which are accidentally sent and you want to delete in mid air before it is consumed by the actual consumer

If you have any feedback or ideas to make this better. please do let me know in comments section

Cheers

Sarav AK

Follow me on Linkedin My Profile
Follow DevopsJunction onFacebook orTwitter
For more practical videos and tutorials. Subscribe to our channel

Buy Me a Coffee at ko-fi.com

Signup for Exclusive "Subscriber-only" Content

Loading