Richard North’s Blog

Fun with Disque, Java and Spinach

This ex­am­ple con­tin­ues sim­i­larly to a pre­vi­ous post, in which I used my Testcontainers tool to aid in­te­gra­tion test­ing of Java code against a de­pen­dency run­ning in a Docker con­tainer.

This time, fol­low­ing the re­cent 1.0 RC1 build of the promis­ing-look­ing Disque, I wanted to play with us­ing it from Java. Disque comes from an­ti­rez, au­thor of the awe­some Redis tool. Additionally this was a chance to try out one of the Java Disque client li­braries, Spinach, by the au­thor of the Lettuce Redis client.

For this post Testcontainers is an en­abler, but this is also an ex­per­i­ment in us­ing Disque both in a sim­ple test and for a worked ex­am­ple: as a queue for send­ing mail mes­sages in an ap­pli­ca­tion.

Running Disque in Docker #

Before start­ing, I cre­ated a Docker im­age for the 1.0-rc1 ver­sion of Disque, loosely based on the of­fi­cial docker Redis im­age. You can find the im­age on Docker hub here, or run it from the shell as fol­lows:

docker run richnorth/disque:1.0-rc1

However for my ex­per­i­ments I in­stan­ti­ated a fresh Disque in­stance within JUnit tests as fol­lows, us­ing Testcontainers:

@Rule
public GenericContainer container = new GenericContainer("richnorth/disque:1.0-rc1")
.withExposedPorts(7711);

Trying out sim­ple queue func­tion­al­ity #

Spinach makes it pretty sim­ple to ob­tain a con­nec­tion:

DisqueClient client = new DisqueClient(DisqueURI.create(container.getIpAddress(), container.getMappedPort(7711)));
connection = client.connect().sync();

After which we can test putting a job’ (which is a String value up to 4GB in size) onto a queue (main_queue in this case):

connection.addjob("main_queue", "body", 1, TimeUnit.MINUTES);

Note that Spinach re­quires us to spec­ify a TTL for the job, even though this is an op­tional field in Disque’s API. TTL is used to con­trol how long the job is al­lowed to re­main on the queue be­fore it ex­pires. Here I’m us­ing 1 minute.

Update: Thanks to Mark Paluch (@mp911de, the au­thor of Spinach), for clar­i­fy­ing the Javadocs here: the time­out pa­ra­me­ter for ad­djob is the com­mand time­out, not TTL. So, the above ex­am­ple will al­low a gen­er­ous 1 minute time­out for in­sert­ing the job into Disque.

Later, we can ob­tain a sin­gle job from the queue:

Job<String, String> job = connection.getjob("main_queue");

assertEquals("The retrieved job is the same as the one that was added",
"body",
job.getBody());

Acknowledging jobs #

Next it’s im­por­tant to ac­knowl­edge the job - i.e. mark it as suc­cess­fully processed:

connection.ackjob(job.getId());

After we called getjob above, Disque tem­porar­ily stops show­ing it to other clients. However, if the con­sumer does not ACK the job with ackjob, Disque as­sumes some kind of fail­ure has oc­curred and puts it back on the queue. Exactly how long it waits be­fore do­ing this is con­trolled with the RETRY prop­erty, which is set for the job when it’s put on the queue.

Handling fail­ure #

For a sub­se­quent test, we’ll see what hap­pens in a fail­ure sce­nario where we let the mes­sage hit its RETRY time­out with­out ac­knowl­edg­ing it. First, we de­fine a fairly quick RETRY set­ting (the de­fault is five min­utes - too long to wait in a test!):

retryAfter1Second = AddJobArgs.builder()
.retry(1, TimeUnit.SECONDS)
.build();

And here is a test of what hap­pens when we fail to ac­knowl­edge the job be­fore the time­out:

@Test
public void testFailureToAckJobBeforeTimeout() throws InterruptedException {

info("Adding a job to the queue");
connection.addjob("main_queue", "body",
1, TimeUnit.MINUTES,
retryAfter1Second);

info("Getting a job from the queue");
Job<String, String> job = connection.getjob("main_queue");

assertEquals("The retrieved job is the same as the one that was added",
"body",
job.getBody());

info("Simulating a failure to ack the job before the timeout (1s)");
TimeUnit.SECONDS.sleep(2);

info("Attempting to get another job from the queue - the RETRY setting for the job means it should have reappeared");
// The timeout specified here is how long the command will wait for a job to appear
Job<String, String> job2 = connection.getjob(5, TimeUnit.SECONDS,
"main_queue");

assertNotNull("After re-getting the original job is back on the queue", job2);
assertEquals("The retrieved job is the same as the one that was added",
"body",
job2.getBody());
}

You can see all this put to­gether in some sim­ple tests, here.

Recently I needed to add an email send­ing ca­pa­bil­ity to a sys­tem, for which we wanted to use Mailgun’s HTTP API. To de­cou­ple our sys­tem from the mail API per­for­mance and avail­abil­ity char­ac­ter­is­tics, we im­ple­mented a sim­ple no-fuss DB-backed queue and asyn­chro­nous dis­patch process.

For this ex­am­ple I wanted to try the same sce­nario with Disque pro­vid­ing the queue back­ing store.

The nice thing is that Disque takes care of all at-least-once guar­an­tees for us while pro­vid­ing enough multi-con­sumer good prac­tices to give us a pretty good like­li­hood of at-most-once as well.

Note: While the tools I’m us­ing seem pretty solid, this is just ex­am­ple code, with lots of im­por­tant fea­tures/​fail­ure con­di­tions not con­sid­ered. Don’t take this and use it in a pro­duc­tion sys­tem!

Let’s de­fine a struct-like class to rep­re­sent a mail mes­sage while it’s stored (Lombok an­no­ta­tions make this nice and con­cise!):

@Data @AllArgsConstructor
public class MailMessage {
public final String subject;
public final String recipient;
public final String body;
}

Here’s the (really sim­ple) in­ter­face for our mail send­ing ser­vice:

public interface MailSenderService extends Callable<MailSenderService.Result> {

/**
* Enqueue a message to be sent on a subsequent
* {@link DisqueBackedMailSenderService#doScheduledSend()}
*
* @param message mail message to be sent
* @return an ID for the enqueued message
*/

String enqueueMessage(MailMessage message);

/**
* Trigger a scheduled send of mail messages.
*
* TODO: In a real implementation this could be called by a
* {@link java.util.concurrent.ScheduledExecutorService},
*
* @return the result of sending queued messages.
*/

MailSenderService.Result doScheduledSend();

/**
* call() implementation to allow MailSenderService to be used as a
* Callable. Simply delegates to the
* {@link DisqueBackedMailSenderService#doScheduledSend()} method.
*
* @return the result of sending queued messages.
* @throws Exception
*/

default MailSenderService.Result call() throws Exception {
return doScheduledSend();
}

@Data
@RequiredArgsConstructor
class Result {
public final int successfulCount;
public final int failedCount;
}
}

In our im­ple­men­ta­tion, enqueueMessage can be re­ally sim­ple. We en­code the mes­sage as JSON data to al­low Disque to hold it as a String:

public String enqueueMessage(MailMessage message) {
String messageAsJson = gson.toJson(message);

// Enqueue the message as JSON
String jobId = disque.addjob("mail", messageAsJson, 1, TimeUnit.SECONDS);

return jobId;
}

Actually send­ing our mes­sages re­quires a lit­tle more work to achieve re­li­a­bil­ity. We want to:

  1. Grab a batch of jobs from the queue
  2. Iterate over each of them, try­ing to send us­ing our third-party mail API
  3. Note which mes­sages were, and were not, sent
  4. Update the queue on Disque with an ACK for sent mes­sages, or a NACK for mes­sages that could not be sent. This will en­sure that sent mes­sages are per­ma­nently re­moved from the queue and won’t be sent again, but failed mes­sages are im­me­di­ately avail­able for this or an­other con­sumer to try send­ing again.

Here we go:

public Result doScheduledSend() {

Set<String> succeededJobIds = new HashSet<>();
Set<String> failedJobIds = new HashSet<>();

// Wait up to 100ms for new messages to arrive on the queue
// Retrieve up to 10 messages
// (Ordinarily we'd make these configurable!)
List<Job<String, String>> jobs =
disque.getjobs(100, TimeUnit.MILLISECONDS, 10, "mail");

for (Job<String, String> job : jobs) {
String jsonBody = job.getBody();
MailMessage message = gson.fromJson(jsonBody, MailMessage.class);

try {
mailApiClient.send(message);
succeededJobIds.add(job.getId());
} catch (MailApiClient.MailSendException e) {
failedJobIds.add(job.getId());
}
}

// For any failed messages, proactively return to the queue
disque.nack(
failedJobIds.toArray(new String[failedJobIds.size()]));
// For any successful jobs, mark as completed
disque.ackjob(
succeededJobIds.toArray(new String[succeededJobIds.size()]));

return new Result(succeededJobIds.size(), failedJobIds.size());
}

If we fail dra­mat­i­cally (e.g. an un­ex­pected ex­cep­tion is thrown, or our JVM or server dies), we can rely on Disque to put our mes­sages back on the queue at their RETRY time­out, ready for an­other process to pick up.

And that’s it - you can see these tests as a start­ing point to see how it could be used - of course, with Testcontainers mak­ing it pretty easy to test ;).

What’s miss­ing? #

What if this were a real mail queue im­ple­men­ta­tion? What might we need to add? Off the top of my head:

Further read­ing #

← Home