How To Use RabbitMQ with Masonite Queues

#python #framework #masonite
Written By: Joe Mancuso

Introduction

This feature requires Masonite 2.0.30 +

Queues are a fantastic way to keep your application quick and snappy. Anything that does not require a return value and/or is time intensive can be placed in a queue and ran at a later date. Using queues can decrease the time it takes to load pages and increase the user experience. There are many situations where using a queue is beneficial. These include but not limited to:

  • Updating arbitrary values in the database when a user signs up (like account login information)
  • Sending a welcome or promotion email
  • Firing API calls to kick off certain action chains like firing external web-hooks

There are many reasons to use message queues. Masonite comes with the default async queue driver which simply takes a Job and runs it asynchronously. This is beneficial for doing small things like firing an API but still not good for larger tasks that may take several seconds simply because these tasks are simply just using a separate thread and don't have the power and flexibility of a full message queue.

Alternatively, Masonite also supports an amqp driver which is a protocol used by several message queues including the popular RabbitMQ.

We will go into how to setup RabbitMQ with Masonite and create some Jobs we can pass to our queue which will process through RabbitMQ

Installation

First we will need RabbitMQ locally. In order to do this we will head over to the installation page and download RabbitMQ depending on your operating system. Windows will require the installer on that page but if you are using Mac OSX you can simply use brew:

$ brew install rabbitmq

Running RabbitMQ

Once installed you can then run it:

$ rabbitmq-server

If you get something like a command not found then take a look at this StackOverflow answer. You may need to add it to your PATH.

Configuration

pip install

First in order to use the amqp driver successfully you will need to install pika. This is a lower level amqp library that Masonite uses to connect to RabbitMQ using the worker and publisher.

$ pip install pika

Config file

Masonite configuration is simple. In config/queue.py you will need to set the DRIVER to be amqp and set some default configuration options in the DRIVERS constant:

DRIVER = 'amqp'
...
DRIVERS = {
    'amqp': {
        'username': 'guest',
        'password': 'guest',
        'host': 'localhost',
        'port': '5672',
        'channel': 'default',
    }
}

This is the default setting you need to connect to RabbitMQ locally. RabbitMQ will use the username and password of guest by default.

Advanced Configuration:

Some RabbitMQ instances, if you are using RabbitMQ with a third party service or in production, may require a vhost and no port. In this instance our configuration will be a little more advanced:

DRIVERS = {
    'amqp': {
        'username': 'guest',
        'vhost': '/',
        'password': 'guest',
        'host': 'localhost',
        'port': None,
        'channel': 'default',
    }
}

Notice that port accepts a value of None and vhost can be either left out completely or kept in. / is the default vhost but in production systems it may be the same as your username.

Running the worker

The worker is what sits between your Masonite application and RabbitMQ. This worker will sit and listen for Masonite Jobs and will run them. You should do this in a new terminal:

$ craft queue:work

This worker always needs to be running for the duration of your Masonite application in order to process jobs. If this worker is not running then jobs will not be processed.

It's important to note that if the worker is not running and Masonite receives a job, it will cache that job and run when the worker starts back up again. This is known as a "durable" queue.

Once ran successfully you should see something like:

Notice that we are listening on the default channel. We can specify which channel to listen on:

$ craft queue:work --channel rabbit

This will now listen on the rabbit channel. Whichever channel we use needs to be the same inside our config file. If we are listening to rabbit, our channel in our config file needs to be rabbit:

DRIVERS = {
    'amqp': {
        'username': 'guest',
        'vhost': '/',
        'password': 'guest',
        'host': 'localhost',
        'port': None,
        'channel': 'rabbit',
    }
}

Creating Jobs

Ok great! Now we have RabbitMQ setup and we are listening to jobs that are coming in, Now let's go ahead and make a job. For this we can use craft to scaffold one for us:

$ craft job HelloWorld

This will create a new job in app/jobs/HelloWorld.py:

''' A HelloWorld Queue Job '''

from masonite.queues.Queueable import Queueable

class HelloWorld(Queueable):

    def __init__(self):
        pass

    def handle(self):
        pass

Now we have a handle method which we will put our logic into. We want to pass in a message into our handle method and print it to the terminal. So we should set this up like so:

class HelloWorld(Queueable):

    ...

    def handle(self, message):
        print("hello " + message)

Seems easy enough. Now let's send this job to the queue!

Sending Jobs

First make sure our queue is running. If it is not then run again:

craft queue:work

In order to send a job to the queue from our controller, we can use the Queue from the IOC container:

from app.jobs.HelloWorld import HelloWorld
...
def show(self, Queue):
    Queue.push(HelloWorld, args=['world'])

args will pass in whatever we pass whatever iterable we pass in to the handle method. We should see inside the terminal:

Congratulations! You just processed your first Masonite job inside RabbitMQ!

Advanced

This section will go a little more in depth into things you may need to know:

Constructors

Job constructors are resolved via the container. So we could have put anything we need to in the constructor and it will be resolved from the container:

from masonite.request import Request:

class HelloWorld(Queueable):

    def __init__(self, request: Request):
        self.request = request

Now whenever we push this job it will be resolved:

from app.jobs.HelloWorld import HelloWorld
...
def show(self, Queue):
    Queue.push(HelloWorld)

Optionally we can also pass in what we need into the constructor. In this case it will NOT be resolved:

from app.jobs.HelloWorld import HelloWorld
from masonite.request import Request
...
def show(self, Queue, request: Request):
    Queue.push(HelloWorld(request))

Multiple Jobs.

Queue.push() accepts multiple jobs:

from app.jobs import SomeJob, AnotherJob
...
def show(self, Queue):
    # do your normal logic
    Queue.push(SomeJob, AnotherJob(1,2))

passing in args here will pass them into BOTH jobs handle methods:

def show(self, Queue):
    # do your normal logic
    Queue.push(SomeJob, AnotherJob(1,2), args=['var1', 'var2'])

This is only useful if both jobs handle method definitions are the same. If one job requires a different amount of parameters then you will need to send them in 2 different pushes:

def show(self, Queue):
    # do your normal logic
    Queue.push(SomeJob, args=['var1', 'var2'])
    Queue.push(AnotherJob(1,2), args=['var1', 'var2', 'var3'])
Copyright Masonite 2019