Integrating SQS, Lambda and Celery (Part 2)

In my prior post, we worked through getting Amazon Simple Queue Service (SQS) up and running as a Celery message queue (a “broker” in Celery terms). Now, we want to see if we can add a message to SQS outside of Celery. If we can do this, it opens up some pretty cool possibilities. We can process work in AWS Lambda, and then push results into SQS for further processing in Celery. Its not just limited to AWS Lambda either. We could have lots of individual clients push jobs to a SQS queue, and then have that work be done by our Celery worker nodes.

When we left off in the last post, we had:

  • SQS established as the Celery broker (with a SQS Queue named dev-celery)
  • Celery set up to use JSON as the serializer
  • A Django Celery shared task add(a, b) that adds two numbers together

I’m going to walk through the steps to “reverse engineer” the protocol Celery uses, so please bear with with me, as hopefully this is instructive as to what Celery is doing behind the scenes.

First, stop any Celery worker processes you might have running. We want a message to get “stuck” in the message queue and be able to inspect it manually.

Next, fire up the Django shell, and kick off a new task into the queue.

Ok, it looks like we have a message in flight, now let’s go to the AWS SQS console and inspect our queue. (My queue is named dev-pl-celery, yours is probably still named dev-celery)

AWS SQS queue (selecting View/Delete Messages)

For our one message, it looks like we have in the body something like this :

eyJib2R5IjogImV5SmxlSEJw ... (lots snipped) .. vanNvbiJ9

Hmm, well, that’s not too helpful. On a hunch, lets try base64 decoding that string.

Excellent, that looks like JSON. Just one quirk, it looks like the “body” is still base64 encoded. Let’s try decoding that piece and see what we get.

Well, that’s nice. Now that we have that, it looks like we have two pieces:

  1. A body section that contains information about the task
  2. For lack of a better descriptor, the “envelope” around the body that has some meta information about the task

To demonstrate how one might set up some code to generate these messages, I created a demo node.js function that encodes a message. You can see the demo here, although you will need to create an account to view (sorry, I’m not aware of anywhere that you can run node.js code in a way similar to jsfiddle. Static Gist is here.) When you run the “encode.js” tab, it outputs a string that is a fully encoded message for SQS. After encoding the message, copy the output, and then go to the Amazon SQS console. Click on “Queue Actions > Send a Message”, and paste the code as a new message.

Screen Shot 2015-07-27 at 5.27.08 PM

You should be able to see that we have two messages sitting in the queue (may have to hit refresh in the AWS console).

Before we spin back up our workers, lets modify them so we can see the work being done.

Now, we can spin up workers. We would like to see our two messages sitting in the queue be processed.

And that’s it! We can see our results in the log output, and we’ve successfully pushed a message from outside of Python into SQS and back to our Celery worker for processing.

Image Courtesy National Archives

Leave a Reply

Your email address will not be published. Required fields are marked *