Fun with Disque, Java and Spinach
This example continues similarly to a previous post, in which I used my Testcontainers tool to aid integration testing of Java code against a dependency running in a Docker container.
This time, following the recent 1.0 RC1 build of the promising-looking Disque, I wanted to play with using it from Java. Disque comes from antirez, author of the awesome Redis tool. Additionally this was a chance to try out one of the Java Disque client libraries, Spinach, by the author of the Lettuce Redis client.
For this post Testcontainers is an enabler, but this is also an experiment in using Disque both in a simple test and for a worked example: as a queue for sending mail messages in an application.
Running Disque in Docker #
Before starting, I created a Docker image for the 1.0-rc1 version of Disque, loosely based on the official docker Redis image. You can find the image on Docker hub here, or run it from the shell as follows:
docker run richnorth/disque:1.0-rc1
However for my experiments I instantiated a fresh Disque instance within JUnit tests as follows, using Testcontainers:
@Rule
public GenericContainer container = new GenericContainer("richnorth/disque:1.0-rc1")
.withExposedPorts(7711);
Trying out simple queue functionality #
Spinach makes it pretty simple to obtain a connection:
DisqueClient client = new DisqueClient(DisqueURI.create(container.getIpAddress(), container.getMappedPort(7711)));
connection = client.connect().sync();
After which we can test putting a ‘job’ (which is a String value up to 4GB in size) onto a queue (main_queue
in this case):
connection.addjob("main_queue", "body", 1, TimeUnit.MINUTES);
Note that Spinach requires us to specify a TTL for the job, even though this is an optional field in Disque’s API. TTL is used to control how long the job is allowed to remain on the queue before it expires. Here I’m using 1 minute.
Update: Thanks to Mark Paluch (@mp911de, the author of Spinach), for clarifying the Javadocs here: the timeout parameter for addjob is the command timeout, not TTL. So, the above example will allow a generous 1 minute timeout for inserting the job into Disque.
Later, we can obtain a single job from the queue:
Job<String, String> job = connection.getjob("main_queue");
assertEquals("The retrieved job is the same as the one that was added",
"body",
job.getBody());
Acknowledging jobs #
Next it’s important to acknowledge the job - i.e. mark it as successfully processed:
connection.ackjob(job.getId());
After we called getjob
above, Disque temporarily stops showing it to other clients. However, if the consumer does not ACK the job with ackjob
, Disque assumes some kind of failure has occurred and puts it back on the queue. Exactly how long it waits before doing this is controlled with the RETRY property, which is set for the job when it’s put on the queue.
Handling failure #
For a subsequent test, we’ll see what happens in a failure scenario where we let the message hit its RETRY timeout without acknowledging it. First, we define a fairly quick RETRY setting (the default is five minutes - too long to wait in a test!):
retryAfter1Second = AddJobArgs.builder()
.retry(1, TimeUnit.SECONDS)
.build();
And here is a test of what happens when we fail to acknowledge the job before the timeout:
@Test
public void testFailureToAckJobBeforeTimeout() throws InterruptedException {
info("Adding a job to the queue");
connection.addjob("main_queue", "body",
1, TimeUnit.MINUTES,
retryAfter1Second);
info("Getting a job from the queue");
Job<String, String> job = connection.getjob("main_queue");
assertEquals("The retrieved job is the same as the one that was added",
"body",
job.getBody());
info("Simulating a failure to ack the job before the timeout (1s)");
TimeUnit.SECONDS.sleep(2);
info("Attempting to get another job from the queue - the RETRY setting for the job means it should have reappeared");
// The timeout specified here is how long the command will wait for a job to appear
Job<String, String> job2 = connection.getjob(5, TimeUnit.SECONDS,
"main_queue");
assertNotNull("After re-getting the original job is back on the queue", job2);
assertEquals("The retrieved job is the same as the one that was added",
"body",
job2.getBody());
}
You can see all this put together in some simple tests, here.
A more fully featured example - queuing mail messages for dispatch #
Recently I needed to add an email sending capability to a system, for which we wanted to use Mailgun’s HTTP API. To decouple our system from the mail API performance and availability characteristics, we implemented a simple no-fuss DB-backed queue and asynchronous dispatch process.
For this example I wanted to try the same scenario with Disque providing the queue backing store.
The nice thing is that Disque takes care of all at-least-once guarantees for us while providing enough multi-consumer good practices to give us a pretty good likelihood of at-most-once as well.
Note: While the tools I’m using seem pretty solid, this is just example code, with lots of important features/failure conditions not considered. Don’t take this and use it in a production system!
Let’s define a struct-like class to represent a mail message while it’s stored (Lombok annotations make this nice and concise!):
@Data @AllArgsConstructor
public class MailMessage {
public final String subject;
public final String recipient;
public final String body;
}
Here’s the (really simple) interface for our mail sending service:
public interface MailSenderService extends Callable<MailSenderService.Result> {
/**
* Enqueue a message to be sent on a subsequent
* {@link DisqueBackedMailSenderService#doScheduledSend()}
*
* @param message mail message to be sent
* @return an ID for the enqueued message
*/
String enqueueMessage(MailMessage message);
/**
* Trigger a scheduled send of mail messages.
*
* TODO: In a real implementation this could be called by a
* {@link java.util.concurrent.ScheduledExecutorService},
*
* @return the result of sending queued messages.
*/
MailSenderService.Result doScheduledSend();
/**
* call() implementation to allow MailSenderService to be used as a
* Callable. Simply delegates to the
* {@link DisqueBackedMailSenderService#doScheduledSend()} method.
*
* @return the result of sending queued messages.
* @throws Exception
*/
default MailSenderService.Result call() throws Exception {
return doScheduledSend();
}
@Data
@RequiredArgsConstructor
class Result {
public final int successfulCount;
public final int failedCount;
}
}
In our implementation, enqueueMessage
can be really simple. We encode the message as JSON data to allow Disque to hold it as a String:
public String enqueueMessage(MailMessage message) {
String messageAsJson = gson.toJson(message);
// Enqueue the message as JSON
String jobId = disque.addjob("mail", messageAsJson, 1, TimeUnit.SECONDS);
return jobId;
}
Actually sending our messages requires a little more work to achieve reliability. We want to:
- Grab a batch of jobs from the queue
- Iterate over each of them, trying to send using our third-party mail API
- Note which messages were, and were not, sent
- Update the queue on Disque with an ACK for sent messages, or a NACK for messages that could not be sent. This will ensure that sent messages are permanently removed from the queue and won’t be sent again, but failed messages are immediately available for this or another consumer to try sending again.
Here we go:
public Result doScheduledSend() {
Set<String> succeededJobIds = new HashSet<>();
Set<String> failedJobIds = new HashSet<>();
// Wait up to 100ms for new messages to arrive on the queue
// Retrieve up to 10 messages
// (Ordinarily we'd make these configurable!)
List<Job<String, String>> jobs =
disque.getjobs(100, TimeUnit.MILLISECONDS, 10, "mail");
for (Job<String, String> job : jobs) {
String jsonBody = job.getBody();
MailMessage message = gson.fromJson(jsonBody, MailMessage.class);
try {
mailApiClient.send(message);
succeededJobIds.add(job.getId());
} catch (MailApiClient.MailSendException e) {
failedJobIds.add(job.getId());
}
}
// For any failed messages, proactively return to the queue
disque.nack(
failedJobIds.toArray(new String[failedJobIds.size()]));
// For any successful jobs, mark as completed
disque.ackjob(
succeededJobIds.toArray(new String[succeededJobIds.size()]));
return new Result(succeededJobIds.size(), failedJobIds.size());
}
If we fail dramatically (e.g. an unexpected exception is thrown, or our JVM or server dies), we can rely on Disque to put our messages back on the queue at their RETRY timeout, ready for another process to pick up.
And that’s it - you can see these tests as a starting point to see how it could be used - of course, with Testcontainers making it pretty easy to test ;).
What’s missing? #
What if this were a real mail queue implementation? What might we need to add? Off the top of my head:
- I’ve not written JUnit tests for multiple consumers from the same Disque instance - yet. I suspect that this example should actually work in this mode without modification, but I don’t want to believe it until I see it. That could be an exercise for the reader ;)
- This example only considers failure of individual message, but what if the whole third-party mail API is down? We don’t want to repetitively try re-sending every message when we can predict that they’ll probably fail. The circuit breakers pattern, using Netflix’s Hystrix or my (admittedly less battle tested) Duct Tape library could be useful here.
- It would probably be worth looking at what happens when the mail API is working, but performing slowly. There would certainly need to be some tuning of GETJOB batch size versus RETRY timeout, to ensure that our consumers don’t each pick up massive batches. Some effort probably needs to be taken to ensure a constant throughput of mail dispatch.
- Speaking from experience, it can be really handy to hold onto sent messages in a data store somewhere. It’s often necessary to have a record of what was sent, and sometimes it’s necessary to re-send a batch of emails which were successfully sent but didn’t reach their recipient due to downstream issues. For a real-world implementation I’d probably add a persistence element to this.
- Probably a few other things…
Further reading #
- Disque RC announcement and README
- Adventures with Disque
- Spinach blog post and README
- Testcontainers
- Project Lombok
- visible-assertions (used for annotating test output in these examples)