Messaging Systems

The Queue is a powerful data structure which forms the foundation of many concurrent design patterns. Often, these design patterns center around passing messages between agents within the concurrent system. We will explore one of the simplest and most useful of these message-based patterns - the so-called “Task Queue”. Later, we may also look at the somewhat related “Publish-Subscribe” pattern (also sometimes referred to as “PubSub”).

By the end of this module, the student should be able to:

  • Describe the components of a task queue system, and explain how it will be utilized within our Flask-based API system architecture.

  • Create task queues in Redis using the hotqueue library, and work with the put() and consume() methods to queue and receive messages across two Python programs.

  • Use the q.worker decorator in hotqueue to create a simple Python consumer program.

  • Explain the general approach to organizing Python code into different modules and describe how to do this for the flask-based API system we are building.

  • Implement good code organization practices including denoting objects as public or private.

Task Queue (or Work Queue)

In a task queue system,

  • Agents called “producers” write messages to a queue that describe work to be done.

  • A separate set of agents called “consumers” receive the messages and do the work. While work is being done, no new messages are received by the consumer.

  • Each message is delivered exactly once to a single consumer to ensure no work is “duplicated”.

  • Multiple consumers can be processing “work” messages at once, and similarly, 0 consumers can be processing messages at a given time (in which case, messages will simply queue up).

The Task Queue pattern is a good fit for our jobs service.

  • Our Flask API will play the role of producer.

  • One or more “worker” programs will play the role of consumer.

  • Workers will receive messages about new jobs to execute and performing the analysis steps.

Task Queues in Redis

The HotQueue class provides two methods for creating a task queue consumer; the first is the .consume() method and the second is the q.worker decorator.

The Consume Method

With a q object defined like q = HotQueue("some_queue", host="<Redis_IP>", port=6379, db=1), the consume method works as follows:

  • The q.consume() method returns an iterator which can be looped over using a for loop (much like a list).

  • Each object returned by the iterator is a message received from the task queue.

  • The q.consume() method blocks (i.e., waits indefinitely) when there are no additional messages in the queue named some_queue.

The basic syntax of the consume method is this:

for item in q.consume():
    # do something with item

In this case, the item object is the message that was retrieved from the task queue.

Exercises. Complete the following, either in Kubernetes or directly on isp02. (see k8s files below.)

  1. Start/scale two python debug containers with redis and hotqueue installed (you can use the jstubbs/redis-client image if you prefer). In two separate shells, exec into each debug container and start ipython.

  2. In each terminal, create a HotQueue object pointing to the same Redis queue.

  3. In the first terminal, add three or four Python strings to the queue; check the length of the queue.

  4. In the second terminal, use a for loop and the .consume() method to print objects in the queue to the screen.

  5. Observe that the strings are printed out in the second terminal.

  6. Back in the first terminal, check the length of the queue; add some more objects to the queue.

  7. Confirm the newly added objects are “instantaneously” printed to the screen back in the second terminal.

If you want, you can use the following k8s files for the exercise above (but if you already have a redis deployment, you don’t need to create a new one.)

Content for the redis-client-debug-deployment.yml file:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: redis-client-debug-deployment
  labels:
    app: redis-client-debug
spec:
  replicas: 2
  selector:
    matchLabels:
      app: redis-client-debug
  template:
    metadata:
      labels:
        app: redis-client-debug
    spec:
      containers:
        - name: py39
          image: jstubbs/redis-client
          command: ['sleep', '999999999']

Content for the redis-ex-deployment.yml file:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: jstubbs-redis-ex-deployment
  labels:
    app: jstubbs-redis-ex
spec:
  replicas: 1
  selector:
    matchLabels:
      app: jstubbs-redis-ex
  template:
    metadata:
      labels:
        app: jstubbs-redis-ex
    spec:
      containers:
        - name: jstubbs-test-redis
          image: redis:6

The q.worker Decorator

Given a Hotqueue queue object, q, the q.worker decorator is a convenience utility to turn a function into a consumer without having to write the for loop. The basic syntax is:

@q.worker
def do_work(item):
    # do something with item

In the example above, item will be populated with the item dequeued.

Then, to start consuming messages, simply call the function:

>>> do_work()
# ... blocks until new messages arrive

Note

The @q.worker decorator replaces the for loop. Once you call a function decorated with @q.worker, the code never returns unless there is an unhandled exception.

Exercise. Write a function, echo(item), to print an item to the screen, and use the q.worker decorator to turn it into a consumer. Call your echo function in one terminal and in a separate terminal, send messages to the redis queue. Verify that the message items are printed to the screen in the first terminal.

In practice, we will use the @q.worker in a Python source file like so –

# A simple example of Python source file, worker.py
q = HotQueue("some_queue", host="<Redis_IP>", port=6379, db=1)

@q.worker
def do_work(item):
    # do something with item...

do_work()

Assuming the file above was saved as worker.py, calling python worker.py from the shell would result in a non-terminating program that “processed” the items in the "some_queue" queue using the do_work(item) function. The only thing that would cause our worker to stop is an unhandled exception.

Concurrency in the Jobs API

Recall that our big-picture goal is to add a Jobs endpoint to our Flask system that can process long-running tasks. We will implement our Jobs API with concurrency in mind. The goals will be:

  • Enable analysis jobs that take longer to run than the request/response cycle (typically, a few seconds or less).

  • Deploy multiple “worker” processes to enable more throughput of jobs.

The overall architecture will thus be:

  1. Save the request in a database and respond to the user that the analysis will eventually be run.

  2. Give the user a unique identifier with which they can check the status of their job and fetch the results when they are ready,

  3. Queue the job to run so that a worker can pick it up and run it.

  4. Build the worker to actually work the job.

Parts a), b) and c) are the tasks of the Flask API, while part d) will be a worker, running as a separate pod/container, that is waiting for new items in the Redis queue.

Code Organization

As software systems get larger, it is very important to keep code organized so that finding the functions, classes, etc. responsible for different behaviors is as easy as possible. To some extent, this is technology-specific, as different languages, frameworks, etc., have different rules and conventions about code organization. We’ll focus on Python, since that is what we are using.

The basic unit of code organization in Python is called a “module”. This is just a Python source file (ends in a .py extension) with variables, functions, classes, etc., defined in it. We’ve already used a number of modules, including modules that are part of the Python standard library (e.g. json) and modules that are part of third-party libraries (e.g., redis).

The following should be kept in mind when designing the modules of a larger system:

  • Modules should be focused, with specific tasks or functionality in mind, and their names (preferably, short) should match their focus.

  • Modules are also the most typical entry-point for the Python interpreter itself, (e.g., python some_module.py).

  • Accessing code from external modules is accomplished through the import statement.

  • Circular imports will cause errors - if module A imports an object from module B, module B cannot import from module A.

Examples. The Python standard library is a good source of examples of module design. You can browse the standard library for Python 3.9 here.

  • We see the Python standard library has modules focused on a variety of computing tasks; for example, for working with different data types, such as the datetime module and the array module. The descriptions are succinct:

    • The datetime module supplies classes for manipulating dates and times.

    • This module defines an object type which can compactly represent an array of basic values: characters, integers, floating point numbers

  • For working with various file formats: e.g., csv, configparser

  • For working with concurrency: threading, multiprocessing, etc.

With this in mind, a first approach might be to break up our system into two modules:

  • api.py - this module contains the flask web server.

  • worker.py - this module contains the code to execute jobs.

However, both the API server and the workers will need to interact with the database and the queue:

  • The API will create new jobs in the database, put new jobs onto the queue, and retrieve the status of jobs (and probably the output products of the job).

  • The worker will pull jobs off the queue, retrieve jobs from the database, and update them.

This suggests a different structure:

  • api.py - this module contains the flask web server.

  • jobs.py - this module contains core functionality for working with jobs in Redis (and on the queue).

  • worker.py - this module contains the code to execute jobs.

Common code for working with redis/hotqueue can go in the jobs.py module and be imported in both api.py and worker.py.

Note

High-quality modular design is a crucial aspect of building good software. It requires significant thought and experience to do correctly, and when done poorly it can have dire consequences. In the best case, poor module design can make the software difficult to maintain/upgrade; in the worst case, it can prevent it from running correctly at all.

We can sketch out our module design by making a list of the functionality that will be available in each module. This is only an initial pass at listing the functionality needed – we will refine it over time – but making an initial list is important for thinking through the problem.

api.py: This file will contain all the functionality related to the flask web server, and will include functions related to each of the API endpoints in our application.

  • POST /data – Load the data into the application. Will write to Redis.

  • GET /data?search=… – List all of the data in the system, optionally filtering with a search query parameter. Will read from Redis.

  • GET /data/<id> – Get a specific object from the dataset using its id. Will read from Redis.

  • POST /jobs – Create a new job. This function will save the job description to Redis and add a new task on the queue for the job. Will write to Redis and the queue.

  • GET /jobs – List all the jobs. Will read from Redis.

  • GET /jobs/<id> – Get the status of a specific job by id. Will read from Redis.

  • GET /jobs/<id>/results – Return the outputs (results) of a completed job. Will read from Redis.

worker.py: This file will contain all of the functionality needed to get jobs from the task queue and execute the jobs.

  • Get a new job – Hotqueue consumer to get an item off the queue. Will get from the queue and write to Redis to update the status of the job.

  • Perform analysis –

  • Finalize job – Saves the results of the analysis and updates the job status to complete. Will write to Redis.

jobs.py: This file will contain all functionality needed for working with jobs in the Redis database and the Hotqueue queue.

  • Save a new job – Will need to write to Redis.

  • Retrieve an existing job - Will need to read from Redis.

  • Update an existing jobs – Will need to read and write to Redis.

Private vs Public Objects

As software projects grow, the notion of public and private access points (functions, variables, etc.) becomes an increasingly important part of code organization.

  • Private objects should only be used within the module they are defined. If a developer needs to change the implementation of a private object, she only needs to make sure the changes work within the existing module.

  • Public objects can be used by external modules. Changes to public objects need more careful analysis to understand the impact across the system.

Like the layout of code itself, this topic is technology-specific. In this class, we will take a simplified approach based on our use of Python. Remember, this is a simplification to illustrate the basic concepts - in practice, more advanced/robust approaches are used.

  • We will name private objects starting with a single underscore (_) character.

  • If an object does not start with an underscore, it should be considered public.

We can see public and private objects in use within the standard library as well. If we open up the source code for the datetime module, which can be found on GitHub we see a mix of public and private objects and methods.

  • Private objects are listed first.

  • Public objects start on line 473 with the timedelta class.

Exercise. Create three files, api.py, worker.py and jobs.py in your local repository, and update them by working through the following example.

Here are some function and variable definitions, some of which have incomplete implementations and/or have invalid syntax.

To begin, place them in the appropriate files. Also, determine if they should be public or private.

def generate_jid():
    """
    Generate a pseudo-random identifier for a job.
    """
    return str(uuid.uuid4())

app = Flask(__name__)

def generate_job_key(jid):
    """
    Generate the redis key from the job id to be used when storing, retrieving or updating
    a job in the database.
    """
    return 'job.{}'.format(jid)

q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)

def instantiate_job(jid, status, start, end):
    """
    Create the job object description as a python dictionary. Requires the job id, status,
    start and end parameters.
    """
    if type(jid) == str:
        return {'id': jid,
                'status': status,
                'start': start,
                'end': end
        }
    return {'id': jid.decode('utf-8'),
            'status': status.decode('utf-8'),
            'start': start.decode('utf-8'),
            'end': end.decode('utf-8')
    }

@app.route('/jobs', methods=['POST'])
def jobs_api():
    """
    API route for creating a new job to do some analysis. This route accepts a JSON payload
    describing the job to be created.
    """
    try:
        job = request.get_json(force=True)
    except Exception as e:
        return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
    return json.dumps(jobs.add_job(job['start'], job['end']))

def save_job(job_key, job_dict):
    """Save a job object in the Redis database."""
    rd.hset(.......)

def queue_job(jid):
    """Add a job to the redis queue."""
    ....

if __name__ == '__main__':
    """
    Main entrypoint of the API server
    """
    app.run(debug=True, host='0.0.0.0')

def add_job(start, end, status="submitted"):
    """Add a job to the redis queue."""
    jid = generate_jid()
    job_dict = instantiate_job(jid, status, start, end)
    save_job(......)
    queue_job(......)
    return job_dict

@<...>   # fill in
def execute_job(jid):
    """
    Retrieve a job id from the task queue and execute the job.
    Monitors the job to completion and updates the database accordingly.
    """
    # fill in ...
    # the basic steps are:
    # 1) get job id from message and update job status to indicate that the job has started
    # 2) start the analysis job and monitor it to completion.
    # 3) update the job status to indicate that the job has finished.

rd = redis.StrictRedis(host='172.17.0.1', port=6379, db=0)

def update_job_status(jid, status):
    """Update the status of job with job id `jid` to status `status`."""
    job = get_job_by_id(jid)
    if job:
        job['status'] = status
        save_job(generate_job_key(jid), job)
    else:
        raise Exception()

Solution. We start by recognizing that app = Flask(__name__) is the instantiation of a Flask app, the @app.route is a flask decorator for defining an endpoint in the API, and the app.run line is used to launch the flask server, so we add those both in the api.py file:

# api.py

  app = Flask(__name__)

  @app.route('/jobs', methods=['POST'])
  def jobs_api():
      """
      API route for creating a new job to do some analysis. This route accepts a JSON payload
      describing the job to be created.
      """
      try:
          job = request.get_json(force=True)
      except Exception as e:
          return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
      return json.dumps(jobs.add_job(job['start'], job['end']))

  if __name__ == '__main__':
      app.run(debug=True, host='0.0.0.0')

We also recognize that several functions appear to be jobs-related:

  • generate_jid

  • generate_job_key

  • instantiate_job

  • save_job

  • queue_job

  • add_job

  • execute_job

  • update_job_status

Note that the jobs_api() function, which we just put in api.py, actually references jobs.add_job, so we can put add_job in the jobs.py file as a public function, and anything that it calls can be added to jobs.py as a (potentially private) function. Note that add_job calls the following functions:

  • generate_jid

  • instantiate_job

  • save_job

  • queue_job

so we can put all of these in jobs.py:

# jobs.py
  def generate_jid():
      """
      Generate a pseudo-random identifier for a job.
      """
      return str(uuid.uuid4())

  def instantiate_job(jid, status, start, end):
      """
      Create the job object description as a python dictionary. Requires the job id, status,
      start and end parameters.
      """
      if type(jid) == str:
          return {'id': jid,
                  'status': status,
                  'start': start,
                  'end': end
          }
      return {'id': jid.decode('utf-8'),
              'status': status.decode('utf-8'),
              'start': start.decode('utf-8'),
              'end': end.decode('utf-8')
      }

  def save_job(job_key, job_dict):
      """Save a job object in the Redis database."""
      rd.hset(.......)

  def queue_job(jid):
      """Add a job to the redis queue."""
      ....

  def add_job(start, end, status="submitted"):
      """Add a job to the redis queue."""
      jid = _generate_jid()
      job_dict = instantiate_job(jid, status, start, end)
      save_job(......)
      queue_job(......)
      return job_dict

That leaves the following:

  • q = HotQueue(..)

  • rd = StrictRedis(..)

  • update_job_status()

  • generate_job_key

  • execute_job()

Consider that:

  • We know worker.py is responsible for actually executing the job, so execute_job should go there.

  • The update_job_status() is a jobs-related task, so it goes in the jobs.py file – it also makes a call to instantiate_job which is already in jobs.py.

  • The jobs.py file definitely needs access to the rd object so that goes there.

  • Lastly, the q will be needed by both jobs.py and worker.py, but worker.py is already importing from jobs, so we better put it in jobs.py as well.

Therefore, the final placement of all the functions looks like the following:

# api.py

  app = Flask(__name__)

  @app.route('/jobs', methods=['POST'])
  def jobs_api():
      """
      API route for creating a new job to do some analysis. This route accepts a JSON payload
      describing the job to be created.
      """
      try:
          job = request.get_json(force=True)
      except Exception as e:
          return True, json.dumps({'status': "Error", 'message': 'Invalid JSON: {}.'.format(e)})
      return json.dumps(jobs.add_job(job['start'], job['end']))

  if __name__ == '__main__':
      app.run(debug=True, host='0.0.0.0')
# jobs.py
  q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)
  rd = redis.StrictRedis(host='172.17.0.1', port=6379, db=0)

  def generate_jid():
      """
      Generate a pseudo-random identifier for a job.
      """
      return str(uuid.uuid4())

  def generate_job_key(jid):
  """
  Generate the redis key from the job id to be used when storing, retrieving or updating
  a job in the database.
  """
  return 'job.{}'.format(jid)

  def instantiate_job(jid, status, start, end):
      """
      Create the job object description as a python dictionary. Requires the job id, status,
      start and end parameters.
      """
      if type(jid) == str:
          return {'id': jid,
                  'status': status,
                  'start': start,
                  'end': end
          }
      return {'id': jid.decode('utf-8'),
              'status': status.decode('utf-8'),
              'start': start.decode('utf-8'),
              'end': end.decode('utf-8')
      }

  def save_job(job_key, job_dict):
      """Save a job object in the Redis database."""
      rd.hset(.......)

  def queue_job(jid):
      """Add a job to the redis queue."""
      ....

  def add_job(start, end, status="submitted"):
      """Add a job to the redis queue."""
      jid = _generate_jid()
      job_dict = instantiate_job(jid, status, start, end)
      save_job(......)
      queue_job(......)
      return job_dict

  def update_job_status(jid, status):
      """Update the status of job with job id `jid` to status `status`."""
      job = get_job_by_id(jid)
      if job:
          job['status'] = status
          _save_job(_generate_job_key(jid), job)
      else:
          raise Exception()
# worker.py
  @<...>   # fill in
  def execute_job(jid):
      """
      Retrieve a job id from the task queue and execute the job.
      Monitors the job to completion and updates the database accordingly.
      """
      # fill in ...
      # the basic steps are:
      # 1) get job id from message and update job status to indicate that the job has started
      # 2) start the analysis job and monitor it to completion.
      # 3) update the job status to indicate that the job has finished.

Now that we have placed all of the functions, we can determine which ones should be public and which ones should be private. In general, we want to limit the number of public functions we have. Public functions represent an API for other modules, and the larger the API, the more difficult it will be to make changes in the future.

To this end, we need to determine which functions are called from external modules and which ones are only used locally. We see that add_job is called from the jobs_api function in api.py, so add_job should be public. Additionally, while the code isn’t explicitly provided, it is clear that the execute_job function will need to call update_job_status, so that should be public as well. All the other jobs functions are only used internally and can be made private.

Exercise. After placing the functions in the correct files, add the necessary import statements.

Solution. Let’s start with api.py. We know we need to import the Flask class to create the app object and to use the flask request object. We also use the json package from the standard library. Finally, we are using our own jobs module.

# api.py
import json
from flask import Flask, request
import jobs

# rest of the code same as above...

For jobs.py, there is nothing from our own code to import (which is good since the other modules will be importing from it, but we do need to import the StrictRedis and HotQueue classes. Also, don’t forget the use of the uuid module from the standard lib! So, jobs.py becomes:

# jobs.py
import uuid
from hotqueue import HotQueue
from redis import StrictRedis

# rest of the code same as above...

Finally, on the surface it doesn’t appear that the worker needs to import anything, but we know it needs the q object to get items. It’s hidden by the missing decorator. Let’s go ahead and import it:

# worker.py
from jobs import q

# rest of the code same as above...

Take-Home Exercise. Write code to finish the implementations for _save_job and _queue_job.

Solution. The _save_job function should save the job to the database, while the _queue_job function should put it on the queue. We know how to write those:

def _save_job(job_key, job_dict):
    """Save a job object in the Redis database."""
    rd.hset(job_key, mapping=job_dict)

def _queue_job(jid):
    """Add a job to the redis queue."""
    q.put(jid)

Take-Home Exercise. Fix the calls to _save_job and execute_job within the add_job function. Solution. The issue in each of these are the missing parameters. The _save_job takes job_key, job_dict, so we just need to pass those in. Similarly, _queue_job takes jid, so we pass that in. The add_job function thus becomes:

def add_job(start, end, status="submitted"):
    """Add a job to the redis queue."""
    jid = _generate_jid()
    job_dict = _instantiate_job(jid, status, start, end)
    # update call to save_job:
    save_job(_generate_job_key(jid), job_dict)
    # update call to queue_job:
    queue_job(jid)
    return job_dict

Take-Home Exercise. Finish the execute_job function. This function needs a decorator (which one?) and it needs a function body.

The function body needs to:

  • update the status at the start (to something like “in progress”).

  • update the status when finished (to something like “complete”).

For the body, we will use the following (incomplete) simplification:

update_job_status(jid, .....)
# todo -- replace with real job.
time.sleep(15)
update_job_status(jid, .....)

Solution. As discussed before, we saw in class we can use the q.worker decorator to turn the worker into a consumer.

As for execute_job itself, we are given the body, we just need to fix the calls to the update_job_status() function. The first call puts the job “in progress” while the second sets it to “complete”. So the function becomes:

@<...>   # fill in
def execute_job(jid):
    update_job_status(jid, "in progress")
    time.sleep(15)
    update_job_status(jid, "complete")

Note that we are using the update_job_status function from jobs.py now, so we need to import it. The final worker.py is thus:

from jobs import q, update_job_status

@q.worker
def execute_job(jid):
    jobs.update_job_status(jid, 'in progress')
    time.sleep(15)
    jobs.update_job_status(jid, 'complete')

Take-Home Exercise. Modify the definition of the q and rd objects to not use a hard-coded IP address but to instead read the IP address from an environment variable, REDIS_IP.

Solution. We can use os.environ.get("some_string") to get the value of an environment variable.

q = HotQueue("queue", host='172.17.0.1', port=6379, db=1)
rd = redis.StrictRedis(host='172.17.0.1', port=6379, db=0)

becomes

import os

# read the ip address from the variable REDIS_IP, and provide a default value in case it is not
# set
redis_ip = os.environ.get('REDIS_IP', '172.17.0.1')
# create the q and rd objects using the variable
q = HotQueue("queue", host=redis_ip, port=6379, db=1)
rd = redis.StrictRedis(host=redis_ip, port=6379, db=0)