Bandalore: a Clojure client library for Amazon's Simple Queue Service (SQS)

I recently found myself wanting to work with Amazon's Simple Queue Service (SQS), but I could find no reasonable Clojure library for accessing it.  Of course, AWS' own Java SDK is the canonical implementation of their APIs (at least in the JVM space), so putting together a Clojure wrapper that adds a few handy extras wasn't particularly difficult.

You can find Bandalore hosted on github, licensed under the EPL. A proper release will find its way into Maven central within the next couple of days.  The code isn't much more than 12 hours old, so consider yourself forewarned. ;-)

I hope people find the library useful.  If you've any questions, feel free to ping me in irc or twitter.

What follows is an excerpt from the README documentation for Bandalore that describes some of its more interesting functionality:

seqs being the lingua franca of Clojure collections, it would be helpful if we could treat an SQS queue as a seq of messages. While receive does return a seq of messages, each receive call is limited to receiving a maximum of 10 messages, and there is no streaming or push counterpart in the SQS API.

The solution to this is polling-receive, which returns a lazy seq that reaches out to SQS as necessary:

[sourcecode language="clojure"] => (map (sqs/deleting-consumer client :body) (sqs/polling-receive client q :limit 10)) ("3" "5" "7" "8" … "81" "90" "91") [/sourcecode]

polling-receive accepts all of the same optional kwargs as receive does, but adds two more to control its usage of receive API calls:

Often queues are used to direct compute resources, so you’d like to be able to saturate those boxen with as much work as your queue can offer up. The obvious solution is to pmap across a seq of incoming messages, which you can do trivially with the seq provided by polling-receive. Just make sure you tweak the :max-wait time so that, assuming you want to continuously process incoming messages, the seq of messages doesn’t terminate because none have been available for a while.

Here’s an example where one thread sends a message once a second for a minute, and another consumes those messages using a lazy seq provided by polling-receive:

[sourcecode language="clojure"] => (defn send-dummy-messages [client q count] (future (doseq [n (range count)] (Thread/sleep 100) (sqs/send client q (str n))))) #'cemerick.bandalore-test/send-dummy-messages => (defn consume-dummy-messages [client q] (future (dorun (map (sqs/deleting-consumer client (comp println :body)) (sqs/polling-receive client q :max-wait Integer/MAX_VALUE :limit 10))))) #'cemerick.bandalore-test/consume-dummy-messages => (consume-dummy-messages client q) ;; start the consumer #<core$future_call$reify__5500@a6f00bc: :pending> => (send-dummy-messages client q 1000) ;; start the sender #<core$future_call$reify__5500@18986032: :pending> 3 4 1 0 2 8 5 7 … [/sourcecode]

You’d presumably want to set up some ways to control your consumer. Hopefully it's clear that it would be trivial to parallelize the processing function being wrapped by deleting-consumer using pmap, distribute processing among agents if that’s more appropriate, etc.