• Home
  • Testimonials
  • Blog
  • Contact Us

At a Glance of a Key

Crafting Dreams into Ventures, Code into Excellence: Your Journey to Success

  • Home
  • Testimonials
  • Blog
  • Contact Us

Building a Delayed Message System with Redis and FastAPI

2025-01-10 Debugging Development No Comments 6 minute read

Recently, I had a chance to help a fellow software engineer with an interesting project – building a system that allows printing delayed messages using Redis. The requirements were simple:

1. Implement a POST API endpoint that accepts a string message and a relative time in seconds (since now).
2. Implement an independent system that shows the messages on the screen at the right time, within a second granularity.
3. Use Redis as your primary data store.

We can take different approaches, each with pros and cons, so I want to start by building a simple working prototype and then keep improving it as needed. Our programming language will be Python combined with the lightweight FastAPI framework.

By reading the requirements, we can see that an efficient solution will probably use a priority queue sorted by the timestamp of when we need to show a given message. Redis’s closest to a priority queue is its ZSET data structure, which is a sorted set (implemented using a hash table and a skip list).

When we want to push a message, we will use the ZADD instruction and set the score (the attribute the set is sorted by) to the timestamp for printing the message. This operation will have an O(log n) time complexity. Then, on the consumer side, we will use the ZRANGEBYSCORE instruction with a time filter to get the messages we need to print and remove them from the set using the ZREM instruction. Both of these operations also have O(log n) time complexity.

The API implementation looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
class Message(BaseModel):
    message: str
    delay_sec: int = Field(..., ge=0)

@app.post("/messages")
async def post_message(msg: Message):
    try:
        # Calculate absolute time as a UNIX timestamp
        absolute_time = (datetime.now(timezone.utc) + timedelta(seconds=msg.delay_sec)).timestamp()
       
        # Insert the message into Redis ZSET
        redis_client.zadd(REDIS_KEY_NAME, {msg.message: absolute_time})
       
        return JSONResponse(
            status_code=201,
            content={"scheduled_for": absolute_time}
        )
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"Failed to add message: {e}")

In this example, we introduce a POST /messages endpoint which calculates the absolute timestamp in UTC of when the message should be printed, converts it to UNIX epoch time in seconds, and pushes it to the sorted set.

Note that the call to zadd () has other optional arguments that can define the set’s behaviour in case of duplicate keys (e.g., insert regardless, update the score of the existing key, fail the operation, etc.).

API calls can happen in parallel, meaning there will be concurrent calls to ZADD. Luckily, we don’t need to do anything extra here because Redis will handle the concurrency for us and ensure the integrity of the data structure. Internally, Redis queues every incoming request and then processes it sequentially in a single-threaded event loop.

Now, as discussed, the consumer side looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
def poll_messages():
    while True:
        current_time = datetime.now(timezone.utc).timestamp()

        messages = redis_client.zrangebyscore(REDIS_KEY_NAME, "-inf", current_time, withscores=True)

        if messages:
            for message, timestamp in messages:
                print(f"{datetime.fromtimestamp(timestamp, timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}: {message}")
                redis_client.zrem(REDIS_KEY_NAME, message)

        time.sleep(1)

In a loop that runs once a second, we poll for new messages in Redis with a timestamp less than Now(), then print and delete them from the set.

At this stage, we have a good prototype with decent performance, as the producer and consumer operate in a logarithmic time complexity (1M messages ~ 20 operations, 100M messages ~ 26 operations).

If we want to optimize further, we can “shard” our set into buckets (let’s say 10 minutes each) to reduce the overall data size of each bucket, resulting in even lower latency on all sides. However, it does add a bit of complexity, as now you need to ensure you have consumed a bucket before moving to the next one.

Now, what will happen if we run multiple pollers? Here’s where the fun begins. The poller function calls to ZRANGE and then ZREM without any atomicity or locking around it, meaning that we may print the same message in more than one poller. Also, we risk the chance of one poller trying to remove a message already removed by another poller.

We don’t need to worry about removing a non-existing message as it is handled on the Redis side and won’t return any failure, but how can we change our code so that the same message is not printed more than once?

As an option, we may write a simple LUA script that does ZRANGE, then ZREM and then returns the read messages, and executes it directly from Python (using redis_client.eval) atomically – due to the single-threaded nature of Redis. That will solve the problem of printing the same message twice but will introduce a new one – losing messages. Imagine one of your pollers got a bunch of messages to print and crashed before it could finish. These messages were already deleted from Redis; hence, they won’t be retried and will be lost forever.

Another approach would be to use Redis Streams. Redis Streams is a robust data structure designed to handle real-time event-driven data. It allows producers to append messages to a stream, which acts as a log of events and enables consumers to read these messages in a reliable and ordered manner. Streams support features like message IDs, consumer groups for distributed processing, and the ability to track pending messages for acknowledgment.

So basically, we can get the messages we read from ZRANGE, push them to a stream, and then have a bunch of consumers listening to the stream, getting the messages and acknowledging them. The stream will ensure one-time delivery as a message read from the stream becomes “pending” and won’t be visible to other consumers. Once a message is acknowledged, it will be removed from the stream. In case we crash and don’t acknowledge a message, it remains in the PEL (Pending Entries List) associated with the consumer group. Then, using the XPENDING command, we can see the messages that weren’t acknowledged and decide what to do. Either reassign them to another consumer using XCLAIM or print and XACK them within the same consumer group.

Note: It won’t guarantee 100% success in preventing printing the same message twice (as the consumer may crash after printing the message but before acknowledging it), but it will significantly reduce the chances of it happening.

So, to recap, we would use the Redis Sorted Set to act like a priority queue, helping us get the messages we need to print at the right time. Then, we would use Redis streams to ensure the same message wouldn’t be printed more than once.

The producer code (API implementation) won’t change and will continue pushing the received messages to the sorted set. On the other hand, the consumer will now have two threads instead of one. The first will poll for messages from within the sorted set and push them to the stream. The second will get messages from the stream, print them, and acknowledge them. I won’t implement the reclaiming of messages from the PEL list here, and I will leave it as a good exercise for you. The new consumer code looks like this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Thread 1
def poll_messages():
    lua_script = """
    local sorted_set_key = KEYS[1]
    local stream_key = KEYS[2]
    local current_time = ARGV[1]

    -- Get messages ready for processing
    local messages = redis.call("ZRANGEBYSCORE", sorted_set_key, "-inf", current_time, "WITHSCORES")
    for i = 1, #messages, 2 do
        local message = messages[i]
        redis.call("XADD", stream_key, "*", "message", message)
        redis.call("ZREM", sorted_set_key, message)
    end
    return #messages / 2
    """

    while True:
        current_time = datetime.now(timezone.utc).timestamp()
        redis_client.eval(lua_script, 2, SORTED_SET_KEY, STREAM_KEY, current_time)
        time.sleep(1)

# Thread 2
def process_stream():
    while True:
        messages = redis_client.xreadgroup(CONSUMER_GROUP, "consumer", {STREAM_KEY: ">"}, count=10)
        for _, entries in messages:
            for message_id, fields in entries:
                message = fields.get("message")
                print(f"{datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S')}: {message}")
                redis_client.xack(STREAM_KEY, CONSUMER_GROUP, message_id)
        time.sleep(1)

The complete code can be found here, including a small README file with instructions on running it and a small producer script that can generate messages to test the whole system.

While playing around with it, you may notice that while a message exists in the sorted set, you cannot send another message with the exact text. The API will return a success, BUT the sorted list will duplicate it, as we use it as the key. I am also leaving it as a challenge for you to fix. A slight hint is that you can make the sorted set keys unique by adding a random UUID prefix to each message when you push it and then removing it on the consumer side before printing it. Other optimizations can also be made.

– Alexander.

Oh hi there 👋
It’s nice to meet you.

Sign up to receive a notification when new posts are published!

We don’t spam!

Check your inbox or spam folder to confirm your subscription.

DevOpsPerformancePythonRedisStreams

Go Concurrency, Practical Example

Leave a Reply Cancel reply

About Me

Principal Software Engineer and an industry leader with startup and FAANG experience. I specialize in distributed systems, storage, data protection services and payment processors.

Beyond technical expertise, I am passionate about supporting fellow engineers in their careers. Through approachable blogs and hands-on guidance, I help navigate the ever-evolving landscape of technology, empowering individuals to thrive in their professional journeys.

Open LinkedIn

Recent Posts

  • Building a Delayed Message System with Redis and FastAPI
  • Go Concurrency, Practical Example
  • Using GORM – Part 3: Models and Idempotency
  • Using GORM – Part 2: Transactions and Save Points
  • Using GORM – Part 1: Introduction

Archives

  • January 2025
  • December 2024
  • March 2023
  • February 2023
  • September 2022
  • July 2022
  • July 2021
  • June 2021
  • February 2021
  • April 2018
  • March 2018
  • January 2018
  • July 2017
  • June 2017
  • May 2017

Categories

  • AWS
  • Career Growth
  • Cyber Security
  • Debugging
  • Development
  • Storage
  • Tips & Tricks

Tags

API AWS Azure Bash Brainfuck C++ Challenge Cloud Cloud Bursting Concurrency Database DevOps Disassembly DLL Documentation DynamoDB Go Golang Guice Java Jenkins Mossad NoSQL OOP Performance Programming Python Redis Security Serverless Singleton Streams Testing Unit Tests WebService

All Rights Reserved 2025 © Sirotin Enterprises Inc.
Proudly powered by WordPress | Theme: Doo by ThemeVS.