In my prior post, we worked through getting Amazon Simple Queue Service (SQS) up and running as a Celery message queue (a “broker” in Celery terms). Now, we want to see if we can add a message to SQS outside of Celery. If we can do this, it opens up some pretty cool possibilities. We can process work in AWS Lambda, and then push results into SQS for further processing in Celery. Its not just limited to AWS Lambda either. We could have lots of individual clients push jobs to a SQS queue, and then have that work be done by our Celery worker nodes.
When we left off in the last post, we had:
- SQS established as the Celery broker (with a SQS Queue named dev-celery)
- Celery set up to use JSON as the serializer
- A Django Celery shared task
add(a, b)
that adds two numbers together
I’m going to walk through the steps to “reverse engineer” the protocol Celery uses, so please bear with with me, as hopefully this is instructive as to what Celery is doing behind the scenes.
First, stop any Celery worker processes you might have running. We want a message to get “stuck” in the message queue and be able to inspect it manually.
Next, fire up the Django shell, and kick off a new task into the queue.
1 2 3 4 5 6 7 8 9 10 |
(celerydemo)$ ./manage.py shell Python 3.4.3 (default, Mar 23 2015, 04:19:36) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. (InteractiveConsole) >>> from queuedemo.tasks import add >>> result = add.delay(3,4) >>> result.status 'PENDING' >>> |
Ok, it looks like we have a message in flight, now let’s go to the AWS SQS console and inspect our queue. (My queue is named dev-pl-celery, yours is probably still named dev-celery)
For our one message, it looks like we have in the body something like this :
eyJib2R5IjogImV5SmxlSEJw ... (lots snipped) .. vanNvbiJ9
Hmm, well, that’s not too helpful. On a hunch, lets try base64 decoding that string.
1 2 3 4 5 6 7 8 9 |
(celerydemo)$ python Python 3.4.3 (default, Mar 23 2015, 04:19:36) [GCC 4.2.1 Compatible Apple LLVM 6.0 (clang-600.0.57)] on darwin Type "help", "copyright", "credits" or "license" for more information. >>> import base64 >>> encoded = "eyJib2R5IjogImV5SmxlSEJwY21Weklqb2diblZzYkN3Z0luUmhjMnR6WlhRaU9pQnVkV3hzTENBaWNtVjBjbWxsY3lJNklEQXNJQ0owYVcxbGJHbHRhWFFpT2lCYmJuVnNiQ3dnYm5Wc2JGMHNJQ0oxZEdNaU9pQjBjblZsTENBaVkyRnNiR0poWTJ0eklqb2diblZzYkN3Z0ltRnlaM01pT2lCYk15d2dORjBzSUNKcmQyRnlaM01pT2lCN2ZTd2dJbVYwWVNJNklHNTFiR3dzSUNKMFlYTnJJam9nSW5GMVpYVmxaR1Z0Ynk1MFlYTnJjeTVoWkdRaUxDQWlZMmh2Y21RaU9pQnVkV3hzTENBaWFXUWlPaUFpWTJGaU5HWTRNek10TjJNNVlTMDBOamc1TFRsaE9HTXRPR0l5TWpkalpHUmxObUk1SWl3Z0ltVnljbUpoWTJ0eklqb2diblZzYkgwPSIsICJoZWFkZXJzIjoge30sICJjb250ZW50LWVuY29kaW5nIjogInV0Zi04IiwgInByb3BlcnRpZXMiOiB7ImRlbGl2ZXJ5X3RhZyI6ICIzYTdhOWMwOS05NjFiLTQ0ZjQtOGQyYy0xMzA2ZmNkM2VlZmEiLCAicmVwbHlfdG8iOiAiOWU1NDUwOGYtYTM0Zi0zNzRiLWEwM2EtOTdjMjg1NmU1NjAzIiwgImNvcnJlbGF0aW9uX2lkIjogImNhYjRmODMzLTdjOWEtNDY4OS05YThjLThiMjI3Y2RkZTZiOSIsICJkZWxpdmVyeV9tb2RlIjogMiwgImJvZHlfZW5jb2RpbmciOiAiYmFzZTY0IiwgImRlbGl2ZXJ5X2luZm8iOiB7InJvdXRpbmdfa2V5IjogImNlbGVyeSIsICJleGNoYW5nZSI6ICJjZWxlcnkiLCAicHJpb3JpdHkiOiAwfX0sICJjb250ZW50LXR5cGUiOiAiYXBwbGljYXRpb24vanNvbiJ9" >>> decoded = base64.standard_b64decode(encoded) >>> decoded b'{"body": "eyJleHBpcmVzIjogbnVsbCwgInRhc2tzZXQiOiBudWxsLCAicmV0cmllcyI6IDAsICJ0aW1lbGltaXQiOiBbbnVsbCwgbnVsbF0sICJ1dGMiOiB0cnVlLCAiY2FsbGJhY2tzIjogbnVsbCwgImFyZ3MiOiBbMywgNF0sICJrd2FyZ3MiOiB7fSwgImV0YSI6IG51bGwsICJ0YXNrIjogInF1ZXVlZGVtby50YXNrcy5hZGQiLCAiY2hvcmQiOiBudWxsLCAiaWQiOiAiY2FiNGY4MzMtN2M5YS00Njg5LTlhOGMtOGIyMjdjZGRlNmI5IiwgImVycmJhY2tzIjogbnVsbH0=", "headers": {}, "content-encoding": "utf-8", "properties": {"delivery_tag": "3a7a9c09-961b-44f4-8d2c-1306fcd3eefa", "reply_to": "9e54508f-a34f-374b-a03a-97c2856e5603", "correlation_id": "cab4f833-7c9a-4689-9a8c-8b227cdde6b9", "delivery_mode": 2, "body_encoding": "base64", "delivery_info": {"routing_key": "celery", "exchange": "celery", "priority": 0}}, "content-type": "application/json"}' |
Excellent, that looks like JSON. Just one quirk, it looks like the “body” is still base64 encoded. Let’s try decoding that piece and see what we get.
1 2 |
>>> base64.standard_b64decode( "eyJleHBpcmVzIjogbnVsbCwgInRhc2tzZXQiOiBudWxsLCAicmV0cmllcyI6IDAsICJ0aW1lbGltaXQiOiBbbnVsbCwgbnVsbF0sICJ1dGMiOiB0cnVlLCAiY2FsbGJhY2tzIjogbnVsbCwgImFyZ3MiOiBbMywgNF0sICJrd2FyZ3MiOiB7fSwgImV0YSI6IG51bGwsICJ0YXNrIjogInF1ZXVlZGVtby50YXNrcy5hZGQiLCAiY2hvcmQiOiBudWxsLCAiaWQiOiAiY2FiNGY4MzMtN2M5YS00Njg5LTlhOGMtOGIyMjdjZGRlNmI5IiwgImVycmJhY2tzIjogbnVsbH0=") b'{"expires": null, "taskset": null, "retries": 0, "timelimit": [null, null], "utc": true, "callbacks": null, "args": [3, 4], "kwargs": {}, "eta": null, "task": "queuedemo.tasks.add", "chord": null, "id": "cab4f833-7c9a-4689-9a8c-8b227cdde6b9", "errbacks": null}' |
Well, that’s nice. Now that we have that, it looks like we have two pieces:
- A body section that contains information about the task
- For lack of a better descriptor, the “envelope” around the body that has some meta information about the task
To demonstrate how one might set up some code to generate these messages, I created a demo node.js function that encodes a message. You can see the demo here, although you will need to create an account to view (sorry, I’m not aware of anywhere that you can run node.js code in a way similar to jsfiddle. Static Gist is here.) When you run the “encode.js” tab, it outputs a string that is a fully encoded message for SQS. After encoding the message, copy the output, and then go to the Amazon SQS console. Click on “Queue Actions > Send a Message”, and paste the code as a new message.
You should be able to see that we have two messages sitting in the queue (may have to hit refresh in the AWS console).
Before we spin back up our workers, lets modify them so we can see the work being done.
1 2 3 4 5 6 7 8 9 10 11 |
from celery import shared_task import logging logger = logging.getLogger(__name__) @shared_task def add(x, y): "Test async function." result = x + y logger.info('Got args x=%s y=%s, returning %s', x, y, result) return result |
Now, we can spin up workers. We would like to see our two messages sitting in the queue be processed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 |
(celerydemo)$ ./manage.py celery worker --loglevel=INFO -------------- celery@plewis-mac.local v3.1.18 (Cipater) ---- **** ----- --- * *** * -- Darwin-14.4.0-x86_64-i386-64bit -- * - **** --- - ** ---------- [config] - ** ---------- .> app: celerysqs:0x10aad6208 - ** ---------- .> transport: sqs://redacted:**@localhost// - ** ---------- .> results: djcelery.backends.database:DatabaseBackend - *** --- * --- .> concurrency: 8 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .&> celery exchange=celery(direct) key=celery [tasks] . celerysqs.celery.debug_task . queuedemo.tasks.add [2015-07-28 00:55:54,753: WARNING/MainProcess] celery@plewis-mac.local ready. [2015-07-28 00:55:54,883: INFO/MainProcess] Received task: queuedemo.tasks.add[6de70824-94b1-47c9-908f-2b638f5cde3b] [2015-07-28 00:55:54,885: INFO/MainProcess] Received task: queuedemo.tasks.add[7d951360-996f-4283-ba85-b2c5b2da4c87] [2015-07-28 00:55:54,887: INFO/Worker-1] Got args x=3 y=4, returning 7 [2015-07-28 00:55:54,887: INFO/Worker-2] Got args x=7 y=3, returning 10 [2015-07-28 00:55:55,273: INFO/MainProcess] Task queuedemo.tasks.add[6de70824-94b1-47c9-908f-2b638f5cde3b] succeeded in 0.38632831000722945s: 7 [2015-07-28 00:55:55,273: INFO/MainProcess] Task queuedemo.tasks.add[7d951360-996f-4283-ba85-b2c5b2da4c87] succeeded in 0.3866659009363502s: 10 |
And that’s it! We can see our results in the log output, and we’ve successfully pushed a message from outside of Python into SQS and back to our Celery worker for processing.
Image Courtesy National Archives
Leave a Reply