Richard North's Blog

Fun with Disque, Java and Spinach

This example continues similarly to a previous post, in which I used my Testcontainers tool to aid integration testing of Java code against a dependency running in a Docker container.

This time, following the recent 1.0 RC1 build of the promising-looking Disque, I wanted to play with using it from Java. Disque comes from antirez, author of the awesome Redis tool. Additionally this was a chance to try out one of the Java Disque client libraries, Spinach, by the author of the Lettuce Redis client.

For this post Testcontainers is an enabler, but this is also an experiment in using Disque both in a simple test and for a worked example: as a queue for sending mail messages in an application.

Running Disque in Docker #

Before starting, I created a Docker image for the 1.0-rc1 version of Disque, loosely based on the official docker Redis image. You can find the image on Docker hub here, or run it from the shell as follows:

docker run richnorth/disque:1.0-rc1

However for my experiments I instantiated a fresh Disque instance within JUnit tests as follows, using Testcontainers:

public GenericContainer container = new GenericContainer("richnorth/disque:1.0-rc1")

Trying out simple queue functionality #

Spinach makes it pretty simple to obtain a connection:

DisqueClient client = new DisqueClient(DisqueURI.create(container.getIpAddress(), container.getMappedPort(7711)));
connection = client.connect().sync();

After which we can test putting a 'job' (which is a String value up to 4GB in size) onto a queue (main_queue in this case):

connection.addjob("main_queue", "body", 1, TimeUnit.MINUTES);

Note that Spinach requires us to specify a TTL for the job, even though this is an optional field in Disque's API. TTL is used to control how long the job is allowed to remain on the queue before it expires. Here I'm using 1 minute.

Update: Thanks to Mark Paluch (@mp911de, the author of Spinach), for clarifying the Javadocs here: the timeout parameter for addjob is the command timeout, not TTL. So, the above example will allow a generous 1 minute timeout for inserting the job into Disque.

Later, we can obtain a single job from the queue:

Job<String, String> job = connection.getjob("main_queue");

assertEquals("The retrieved job is the same as the one that was added",

Acknowledging jobs #

Next it's important to acknowledge the job - i.e. mark it as successfully processed:


After we called getjob above, Disque temporarily stops showing it to other clients. However, if the consumer does not ACK the job with ackjob, Disque assumes some kind of failure has occurred and puts it back on the queue. Exactly how long it waits before doing this is controlled with the RETRY property, which is set for the job when it's put on the queue.

Handling failure #

For a subsequent test, we'll see what happens in a failure scenario where we let the message hit its RETRY timeout without acknowledging it. First, we define a fairly quick RETRY setting (the default is five minutes - too long to wait in a test!):

retryAfter1Second = AddJobArgs.builder()
.retry(1, TimeUnit.SECONDS)

And here is a test of what happens when we fail to acknowledge the job before the timeout:

public void testFailureToAckJobBeforeTimeout() throws InterruptedException {

info("Adding a job to the queue");
connection.addjob("main_queue", "body",
1, TimeUnit.MINUTES,

info("Getting a job from the queue");
Job<String, String> job = connection.getjob("main_queue");

assertEquals("The retrieved job is the same as the one that was added",

info("Simulating a failure to ack the job before the timeout (1s)");

info("Attempting to get another job from the queue - the RETRY setting for the job means it should have reappeared");
// The timeout specified here is how long the command will wait for a job to appear
Job<String, String> job2 = connection.getjob(5, TimeUnit.SECONDS,

assertNotNull("After re-getting the original job is back on the queue", job2);
assertEquals("The retrieved job is the same as the one that was added",

You can see all this put together in some simple tests, here.

Recently I needed to add an email sending capability to a system, for which we wanted to use Mailgun's HTTP API. To decouple our system from the mail API performance and availability characteristics, we implemented a simple no-fuss DB-backed queue and asynchronous dispatch process.

For this example I wanted to try the same scenario with Disque providing the queue backing store.

The nice thing is that Disque takes care of all at-least-once guarantees for us while providing enough multi-consumer good practices to give us a pretty good likelihood of at-most-once as well.

Note: While the tools I'm using seem pretty solid, this is just example code, with lots of important features/failure conditions not considered. Don't take this and use it in a production system!

Let's define a struct-like class to represent a mail message while it's stored (Lombok annotations make this nice and concise!):

@Data @AllArgsConstructor
public class MailMessage {
public final String subject;
public final String recipient;
public final String body;

Here's the (really simple) interface for our mail sending service:

public interface MailSenderService extends Callable<MailSenderService.Result> {

* Enqueue a message to be sent on a subsequent
* {@link DisqueBackedMailSenderService#doScheduledSend()}
* @param message mail message to be sent
* @return an ID for the enqueued message

String enqueueMessage(MailMessage message);

* Trigger a scheduled send of mail messages.
* TODO: In a real implementation this could be called by a
* {@link java.util.concurrent.ScheduledExecutorService},
* @return the result of sending queued messages.

MailSenderService.Result doScheduledSend();

* call() implementation to allow MailSenderService to be used as a
* Callable. Simply delegates to the
* {@link DisqueBackedMailSenderService#doScheduledSend()} method.
* @return the result of sending queued messages.
* @throws Exception

default MailSenderService.Result call() throws Exception {
return doScheduledSend();

class Result {
public final int successfulCount;
public final int failedCount;

In our implementation, enqueueMessage can be really simple. We encode the message as JSON data to allow Disque to hold it as a String:

public String enqueueMessage(MailMessage message) {
String messageAsJson = gson.toJson(message);

// Enqueue the message as JSON
String jobId = disque.addjob("mail", messageAsJson, 1, TimeUnit.SECONDS);

return jobId;

Actually sending our messages requires a little more work to achieve reliability. We want to:

  1. Grab a batch of jobs from the queue
  2. Iterate over each of them, trying to send using our third-party mail API
  3. Note which messages were, and were not, sent
  4. Update the queue on Disque with an ACK for sent messages, or a NACK for messages that could not be sent. This will ensure that sent messages are permanently removed from the queue and won't be sent again, but failed messages are immediately available for this or another consumer to try sending again.

Here we go:

public Result doScheduledSend() {

Set<String> succeededJobIds = new HashSet<>();
Set<String> failedJobIds = new HashSet<>();

// Wait up to 100ms for new messages to arrive on the queue
// Retrieve up to 10 messages
// (Ordinarily we'd make these configurable!)
List<Job<String, String>> jobs =
disque.getjobs(100, TimeUnit.MILLISECONDS, 10, "mail");

for (Job<String, String> job : jobs) {
String jsonBody = job.getBody();
MailMessage message = gson.fromJson(jsonBody, MailMessage.class);

try {
} catch (MailApiClient.MailSendException e) {

// For any failed messages, proactively return to the queue
failedJobIds.toArray(new String[failedJobIds.size()]));
// For any successful jobs, mark as completed
succeededJobIds.toArray(new String[succeededJobIds.size()]));

return new Result(succeededJobIds.size(), failedJobIds.size());

If we fail dramatically (e.g. an unexpected exception is thrown, or our JVM or server dies), we can rely on Disque to put our messages back on the queue at their RETRY timeout, ready for another process to pick up.

And that's it - you can see these tests as a starting point to see how it could be used - of course, with Testcontainers making it pretty easy to test ;).

What's missing? #

What if this were a real mail queue implementation? What might we need to add? Off the top of my head:

Further reading #

← Home