Flask + RQ: Async polling a Cloud subscription

December 2018 · 16 minute read
The following assume familiarity with the Google Cloud Platform and an existing GCP project with Pub/Sub API enabled as well as Heroku, for the deployment. Ubuntu 16.04 for local dev.

Pub/Sub flask monitor app user view

Publish/subscribe messaging, or pub/sub messaging, is an asynchronous messaging service. It is often used in service-to-service communication in serverless and microservices architectures. Cloud Pub/Sub is such a service, built on a core Google infrastructure component that many Google products (Ads, Search, Gmail) have relied upon. You can learn more about Pub/Sub key consepts here and here.

Pub/Sub may be used in a GCP project in relation to other APIs to communicate status, data and other API-specific information, for scheduled or unscheduled operations. Messages can be accessed using the Console, the gcloud command-line tool, or using client libraries. We will be using the Python Google Cloud client libraries to build a Flask application that can asynchronously listen for new messages on a Cloud Pub/Sub Subscription, store them in a database and serve views on a client-side browser. The application is a proof-of-concept work, with the intention of demonstrating combined use of these techs to enable increased usability and monitoring of Cloud APIs and processes in your Cloud projects.

According to one’s needs and resources the application could be scaled to handle numerous publish/subscribe asynchronous tasks, advanced ui monitoring tools and metrics, user event-based notifications, triggering of other tasks, etc. For the purpose of this demo app we are deploying using the free-tier on Heroku platform, which offers one web dyno and one worker dyno, around which the application has been adjusted to work with the available free resources. Adding more workers allows for task-queues with more asynchronous tasks, and thus increasing the ability to process event messages from multiple subscriptions, or even simultaneous publishing of messages to subscriptions.

The sources for this demo-app are available on the GitHub repo pubsub-monitor.

What the app does

The application can listen for new messages on a subscription created in a GCP project. When a new message is received it is stored in the database as a JSON string and also displayed in the current user’s view (client side). Bootstrap (Flask-Bootstrap), JavaScript, JQuery and plugins (Infinite-Scroll) have been used on the client-side to make the user experience more interesting. On the server-side, SQLite has been used for local development, while PostgreSQL and Gunicorn for Heroku deployment. RQ was chosen instead of Celery as a lighter solution, but one could similarly use Celery instead of RQ. There is also another task-queue, psq, which is a Cloud Pub/Sub Task Queue for Python, developed by Google for this purpose. This is not an official product and is still in development, so RQ was chosen instead of psq, as the latter does not work on Heroku platform.

How to use the source files

Requirements:

  1. pipenv
  2. SQLite
  3. A working GCP project, setup with Pub/Sub API

To run the app locally you will need to have a Python development environment setup containing all libraries required for the app to work, and you must also have SQLite installed. The Python virtualenv for this project is created and managed using pipenv, which also adds/removes packages from your Pipfile as you install/uninstall packages and generates the Pipfile.lock (used to produce deterministic builds). You will need pipenv to be able to quickly and reliably reproduce a working environment to run the app locally. If you are unfamiliar with pipenv have a look at a few quick tips to get you started. You also need to have a working GCP project, setup with Pub/Sub API, and a service account to get your GOOGLE_APPLICATION_CREDENTIALS key file. You then need to create a topic and a subscription.

Once you are set-up with the requirements, clone the repo, cd in the project’s root directory (e.g. gcpubsub) then create virtualenv and install dependencies:

git clone https://github.com/chrisliatas/pubsub-monitor.git gcpubsub
cd gcpubsub
pipenv install

This will install all project dependencies. Please note that the project is built using Python 3.7. When installing all of requirements is finished you will have a new virtualenv for the application that you can work-on by running:

pipenv shell

You will need to have your new virtualenv activated. You then have to:

export FLASK_APP=gcpubsub.py
export GOOGLE_APPLICATION_CREDENTIALS=path_to_key_file.json
export GCP_PROJECT_ID=your_GCP_project_name
export PUBSUB_TOPIC=your_pubsub_topic_name
export SUBSCRIPTION_NAME=your_subscription_name

Start a separate terminal in the same directory, activate your virtualenv (pipenv shell) and run:

rq worker gcpubsub-tasks

# You will get an output similar to:
09:16:13 RQ worker 'rq:worker:host-01.30454' started, version 0.12.0
09:16:13 *** Listening on gcpubsub-tasks...
09:16:13 Cleaning registries for queue: gcpubsub-tasks

On your first terminal run:

flask deploy
flask run

You should be able now, to view the application at http://localhost:5000. The application has a default user admin with password cannotguess, created by flask deploy command, that you can use for your initial login. You only need to run flask deploy for your first run. The default password can be changed after you login, from within the web interface.

Clicking on Start polling will initiate the background task that listens for messages in your subscription. If you have any messages, these will be loaded and displayed as soon as they are received. You can use the publish.py under the directory helpers if you need to create and publish sample messages to test the app.

Heroku deployment

Requirements:

  1. Heroku account
  2. Heroku CLI

Deploying on Heroku requires to have an account. You will also need to install Heroku CLI and login. You may then follow two approaches. You may either fork my repo on GitHub and use the Heroku’s integration with GitHub, or, assuming you have cloned the sources locally, create a new Heroku app and push the code from your local machine. I will further expand on the second approach, however most steps (except from adding sources to your Heroku app) apply to the first approach as well. To create a new Heroku app, from the local project’s root directory (gcpubsub if you followed my git clone example):

heroku create [unique-name-of-your-app]
# You will get an output similar to:
Creating ⬢ unique-name-of-your-app... !
Creating ⬢ unique-name-of-your-app... done
https://unique-name-of-your-app.herokuapp.com/ | https://git.heroku.com/unique-name-of-your-app.git

Your local repo should now have an extra remote, heroku, for fetch and push:

git remote -v
# Output similar to:
heroku	https://git.heroku.com/unique-name-of-your-app.git (fetch)
heroku	https://git.heroku.com/unique-name-of-your-app.git (push)
# You may have other remotes as well

We also need to create a PostgreSQL database on Heroku, to work with, as we cannot use SQLite. To enable the service:

heroku addons:add heroku-postgresql:hobby-dev
# Output similar to:
Creating heroku-postgresql:hobby-dev on ⬢ unique-name-of-your-app... free
Database has been created and is available
 ! This database is empty. If upgrading, you can transfer
 ! data from another database with pg:copy
Created postgresql-flexible-XXXXX as DATABASE_URL
Use heroku addons:docs heroku-postgresql to view documentation

To have RQ working on Heroku we must also enable the Redis service for our application:

heroku addons:create heroku-redis:hobby-dev
# Output similar to:
Creating heroku-redis:hobby-dev on ⬢ unique-name-of-your-app... free
Your add-on should be available in a few minutes.
! WARNING: Data stored in hobby plans on Heroku Redis are not persisted.
redis-convex-XXXXX is being created in the background. The app will restart when complete...
Use heroku addons:info redis-convex-XXXXX to check creation progress
Use heroku addons:docs heroku-redis to view documentation

Note: When running the above command you might get an output, prompting you to enter your credit-card, similar to the following:

Creating heroku-redis:hobby-dev on ⬢ unique-name-of-your-app... !
 ▸    Please verify your account to install this add-on plan (please enter a credit card) For more
 ▸    information, see https://devcenter.heroku.com/categories/billing Verify now at
 ▸    https://heroku.com/verify

This means that you have to verify your account to proceed, enabling the addon. Heroku’s verification process involves entering your credit-card, so billing may be enabled if you use services other than the free ones. However, if you would like to proceed you won’t have charges incurred with the redis service enabled for this app, as long as we are in the free-tier.

Similarly to local development, some environmental variables must also be added to Heroku app’s environment:

heroku config:set LOG_TO_STDOUT=1
# Output similar to:
Setting LOG_TO_STDOUT and restarting ⬢ unique-name-of-your-app... done, v6
LOG_TO_STDOUT: 1

heroku config:set FLASK_APP=gcpubsub.py
# Output similar to:
Setting FLASK_APP and restarting ⬢ unique-name-of-your-app... done, v7
FLASK_APP: gcpubsub.py

heroku config:set GCP_PROJECT_ID=[your_GCP_project_name]
# Output similar to:
Setting GCP_PROJECT_ID and restarting ⬢ unique-name-of-your-app... done, v11
GCP_PROJECT_ID: your_GCP_project_name

heroku config:set PUBSUB_TOPIC=your_pubsub_topic_name
# Output similar to:
Setting PUBSUB_TOPIC and restarting ⬢ unique-name-of-your-app... done, v12
PUBSUB_TOPIC: your_pubsub_topic_name

heroku config:set SUBSCRIPTION_NAME=your_subscription_name
# Output similar to:
Setting SUBSCRIPTION_NAME and restarting ⬢ unique-name-of-your-app... done, v13
SUBSCRIPTION_NAME: your_subscription_name

To further explain the above, LOG_TO_STDOUT=1 will redirect our application’s logs to stdout, as expected by Heroku platform. FLASK_APP sets the flask application, while the rest of the variables are related to your specific GCP project.

Next we have to add our GOOGLE_APPLICATION_CREDENTIALS keyfile from our authorized service account. If you have not done already, you need to obtain service account credentials to authorize your app on your GCP project. When creating the service account for your app, the role required to authorize our app’s tasks is roles/pubsub.subscriber, which must be chosen from the Role drop-down list, select Project > pubsub.subscriber. Having obtained your keyfile.json, we will create another environmental variable to pass the creadentials to our Heroku app:

g_cred_path=/path/to/keyfile.json
heroku config:set GOOGLE_APPLICATION_CREDENTIALS="$(< $g_cred_path)"
# Output similar to:
Setting GOOGLE_APPLICATION_CREDENTIALS and restarting ⬢ unique-name-of-your-app... done, v10
GOOGLE_APPLICATION_CREDENTIALS: {
  "type": "service_account",
  "project_id": "awesome-project",
  "private_key_id": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
  "private_key": "-----BEGIN PRIVATE KEY-----\nXXXXXXXXXXXXXXXXXXXXXXX\n-----END PRIVATE KEY-----\n",
  "client_email": "pub-sub-subscriber@awesome-project.iam.gserviceaccount.com",
  "client_id": "XXXXXXXXXXXXXXXXXXX",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/pub-sub-subscriber%40awesome-project.iam.gserviceaccount.com"
}

We can now deploy the app. Provided that you do not have any uncommitted changes:

git push heroku master
# Output similar to:
Counting objects: X, done.
Delta compression using up to 4 threads.
Compressing objects: 100% (X/X), done.
Writing objects: 100% (X/X), XXX bytes | 0 bytes/s, done.
Total X (delta X), reused 0 (delta 0)
remote: Compressing source files... done.
remote: Building source:
remote: 
remote: -----> Python app detected
remote: -----> Installing dependencies with Pipenv 2018.5.18…
remote:        Installing dependencies from Pipfile.lock (3416cd)…
remote: -----> Discovering process types
remote:        Procfile declares types -> web, worker
remote: 
remote: -----> Compressing...
remote:        Done: 79.5M
remote: -----> Launching...
remote:        Released v17
remote:        https://unique-name-of-your-app.herokuapp.com/ deployed to Heroku
remote: 
remote: Verifying deploy... done.
To https://git.heroku.com/unique-name-of-your-app.git
   xxx06c8..xxx0207  master -> master

The last step is to start the worker dyno:

heroku ps:scale worker=1
# Output similar to:
Scaling dynos... done, now running worker at 1:Free

You may also check that your 2 dynos are running and your free-quota:

heroku ps
# Output similar to:
Free dyno hours quota remaining this month: 315h 39m (33%)
Free dyno usage for this app: 2h 20m (0%)
For more information on dyno sleeping and how to upgrade, see:
https://devcenter.heroku.com/articles/dyno-sleeping

=== web (Free): flask deploy; gunicorn gcpubsub:app (1)
web.1: up 2018/12/06 17:28:20 +0200 (~ 3m ago)

=== worker (Free): rq worker -u $REDIS_URL gcpubsub-tasks (1)
worker.1: up 2018/12/06 17:28:11 +0200 (~ 3m ago)

Assuming everything has gone well, your app will be available at https://unique-name-of-your-app.herokuapp.com/. Client interface and usage are the same as for the local deployment.

Asynchronous tasks

Asynchronous tasks run independently of the main application on a worker process. Communication between application and workers is happening with a task queue. The task queue we are using for this project is RQ (Redis Queue), but others exist for python including the popular Celery. As we saw in local deployment, the worker is started separately (rq worker gcpubsub-tasks) waiting for tasks from the application. RQ is initialized together with Redis (we need a message broker for the task queue), in the application factory create_app, so the task queue can be easily accessed through out the flask app (snippet from app/__init__.py):

# ...
def create_app(config_class=Config):
    app = Flask(__name__)
    app.config.from_object(config_class)

    db.init_app(app)
    migrate.init_app(app, db)
    login.init_app(app)
    bootstrap.init_app(app)
    moment.init_app(app)
    app.redis = Redis.from_url(app.config['REDIS_URL'])
    app.task_queue = rq.Queue('gcpubsub-tasks', connection=app.redis)
    app.jsonlexer = lexers.JsonLexer()
    app.htmlformater = formatters.HtmlFormatter(style=app.config['PYGMENTS_STYLE'])
    # ...

A new listening task is started, unless another one is already running, when the user on the client-side interface clicks on the button Start polling. This sends a GET request on the server-side /polling_pubsub endpoint. The same endpoint terminates a running task, when it receives a POST request. The response from the endpoint is a JSON object (app/admin/routes.py):

@admin.route('/polling_pubsub', methods=['GET', 'POST'])
@login_required
def polling_pubsub():
    current_task = current_user.get_task_in_progress('polling_subscription')
    if request.method == 'GET':
        # Start polling
        if current_task:
            current_app.logger.info('A polling task is currently in progress')
            resp_o = {
                'status': 'polling',
                'task_id': current_task.id
            }
            status = 200
        else:
            task = current_user.launch_task('polling_subscription', 'Listening for pub/sub notifications', timeout=3600)
            db.session.commit()
            resp_o = {
                'status': 'success',
                'task_id': task.id
            }
            status = 202
    else:
        # Stop polling
        if current_task:
            current_app.logger.info('Terminating task polling for builds')
            current_task.complete = True
            db.session.commit()
            resp_o = {
                'status': 'terminated',
                'task_id': current_task.id
            }
            status = 202
        else:
            resp_o = {
                'status': 'error',
                'task_id': 0
            }
            status = 200
    return jsonify(resp_o), status

The task polling_subscription (in app/tasks.py) is continuously polling a Pull-subscription from a Google Cloud Pub/Sub subscription. I have used a Cloud Build subscription but the same principles apply to other GCP Pub/Sub subscriptions. The function can use two flavors of GOOGLE_APPLICATION_CREDENTIALS input, a keyfile.json or an environmental variable containing the contents of a keyfile.json in a string. Then a subscription object is created for the topic we want to subscribe to.

def polling_subscription(user_id):
    job = get_current_job()
    if job:
        fut_timeout = 30
        project_id = app.config['GCP_PROJECT_ID']
        subscription_name = app.config['SUBSCRIPTION_NAME']
        credentials_env = app.config['GOOGLE_APPLICATION_CREDENTIALS']
        if not credentials_env:
            app.logger.error('Environment variable GOOGLE_APPLICATION_CREDENTIALS is empty')
            return

        try:
            # if you already have the service account file loaded
            service_account_info = json.loads(credentials_env)
            credentials = service_account.Credentials.from_service_account_info(service_account_info)
        except json.JSONDecodeError:
            # OR using service account private key JSON file
            credentials = service_account.Credentials.from_service_account_file(credentials_env)

        subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
        subscription_path = subscriber.subscription_path(project_id, subscription_name)
        # ...

The task will continue to renew itself as long as the user has not clicked on Stop polling button. A notification will be created when the task runs, to be used by the client-side for the task status display (app/tasks.py). In each cycle the code checks if the user has requested termination of the task and proceeds accordingly. When a new message is received, a callback processes the received message and updates the database.

        # ...
        future = None
        terminate = False
        _set_task_progress(job, 'Polling', terminate)
        while True:
            try:
                future = subscriber.subscribe(subscription_path,
                                              callback=partial(_process_msg, user_id=user_id, job_id=job.id))
                app.logger.info('Listening for messages on {}'.format(subscription_path))
                future.result(timeout=fut_timeout)
            except TimeoutError:
                with app.app_context():
                    task = Task.query.get(job.id)
                terminate = task.complete
                if terminate:
                    app.logger.info('Stopped listening for messages on {}'.format(subscription_name))
                else:
                    app.logger.info('Renewing listener for messages on {}'.format(subscription_name))
            except Exception as e:
                terminate = True
                app.logger.error('Stopped listening for messages on {} - Exception thrown: {}'
                                 .format(subscription_name, e))
            finally:
                if future is not None:
                    future.cancel()
                if terminate:
                    break

        _set_task_progress(job, 'Finished', terminate)

Server Side

Apart from the endpoint handling asynchronous tasks, server-side provides a route, /useritems returning new user messages as rendered html in a JSON object (app/admin/routes.py):

@admin.route('/useritems')
@login_required
def builds():
    items = current_user.new_items()
    current_user.last_item_loaded_ts = datetime.utcnow()
    db.session.commit()
    if items:
        resp_o = {
            'status': 'success',
            'items': [render_template('_useritem.html', item=i,) for i in items]
        }
    else:
        resp_o = {'status': 'unchanged', 'items': []}
    return jsonify(resp_o)

The route /notifications provides data for client-side notifications concerning the status of current tasks (app/admin/routes.py).

@admin.route('/notifications')
@login_required
def notifications():
    since = request.args.get('since', 0.0, type=float)
    notifications = current_user.notifications.filter(
        Notification.timestamp > since).order_by(Notification.timestamp.asc())
    return jsonify([{
        'name': n.name,
        'data': n.get_data(),
        'timestamp': n.timestamp
    } for n in notifications])

The endpoint /pygments.css returns the styling for the JSON on the client-side when it receives a GET request (app/admin/routes.py).

@admin.route('/pygments.css')
def pygments_css():
    return current_app.htmlformater.get_style_defs(), 200, {'Content-Type': 'text/css'}

The rest of the routes provide a landing page with authentication form (/index), an authenticated user page (/user/<username>), a password-change and a log out endpoints. Most of the endpoints are accessed with AJAX requests from the client-side.

Client Side

The hart of the client-side functionality is the javascript app/static/main.js. As soon as the user is logged in, we start checking for notifications about the status of asynchronous tasks and for new user-items (messages) at 5 sec intervals. Two AJAX requests at the relevant server endpoints are made. The first is to handle the behavior of user-alert messages, while the second appends the html <table> with the incoming messages:

// ...
$(function() {
    let since = 0;
    setInterval(function() {
        $.ajax("/notifications?since=" + since).done(
            function(notifications) {
                for (let i = 0; i < notifications.length; i++) {
                    if (notifications[i].name === "task_progress") {
                        console.log("notif.data.progress: " + notifications[i].data.progress);
                        set_task_progress(notifications[i].data.description,
                            notifications[i].data.progress, (notifications[i].data.progress !== "Finished"));
                    }
                    since = notifications[i].timestamp;
                }
            }
        );
        $.getJSON("/useritems").done(function (jsondata) {
            const table = $('#useritems');
            $.each(jsondata.items, function (i, item) {
                table.prepend(item);
            });
        });
    }, 5000);
});

User alerts are displayed without page refresh using javascript and JQuery. There are two alerts, informing the user of the current task-status and the requested action (button click) progress. The behavior is handled with the set_task_progress function that fiddles with the page’s html depending on the notifications (green box):

function set_task_progress(description, progress, showalert) {
    const task = $("#current-task");
    if (showalert && task.is(":hidden")) {
        $("#current-task span:first").text(description + ": ");
        $("#task-progress strong").text(progress + "...");
        task.fadeIn(1000);
    }
    if (!showalert && task.is(":visible")) {
        $("#task-progress strong").text(progress + "!");
        setTimeout(function() {
            task.fadeOut(1000);
        }, 5000);
    }
}

While the second alert (blue box), informs the user of the start and termination of the asynchronous task depending on the user’s actions (button click).

var poll = true;
function start_polling() {
    $.get("/polling_pubsub", function (data, status) {
        resp_o_log(data, status);
        info_message((data.status === "success") ?
            "Started background task: polling Pull-subscription" : "A polling task is currently in progress!");
    });
}
function stop_polling() {
    $.post("/polling_pubsub", function (data, status) {
        resp_o_log(data, status);
        info_message((data.status === "terminated") ?
            "Terminating background task: polling Pull-subscription" : "There was an error in background tasks!");
    });
}
function resp_o_log(data, status) {
    console.log("Status: " + status);
    console.log("Data: status: " + data.status + ", task_id: " + data.task_id);
}
function info_message(message) {
    $("#clnt-alert").text(message).fadeIn(1000);
    setTimeout(function() {
        $("#clnt-alert").fadeOut(1000);
    }, 7000);
}
$("#pollnav").click(function (e) {
    e.preventDefault();
    $(this).text(($(this).text() === "Start polling") ? "Stop polling" : "Start polling");
    poll ? start_polling() : stop_polling();
    poll = !poll;
});

Fade-in and fade-out transitions have been used to ease out the on/off alerts state for the eye. The blue-box alert will be displayed for 7 sec, set with a setTimeout. The click function also changes the text of the button from the default Start polling to Stop polling and back.

The client view after a user click’s on Start polling button:

User alerts after click on 'Start polling'

A simple change password form is also included in user view. The form allows the user to change its password when logged in. When the button Change is clicked the two password fields are compared to verify the password is entered correctly and an AJAX POST request is made to the server-side /change_password endpoint. An alert is shown to the user with the request’s result. The form is reset after submit.

function change_pwd(npwd) {
    let posting = $.post("/change_password", { newpwd: npwd });
    posting.done(function (data, status) {
        alert("Password: " + data.status);
    });
}
$("#changepwd").submit(function (e) {
    e.preventDefault();
    const newpass = $('#newpass').val();
    const cnfrmpass = $('#cnfrmpass').val();
    (newpass !== cnfrmpass) ? alert("Passwords do not match!") : change_pwd(newpass);
    $(this).trigger("reset");
});

TL;DR

A simple Flask application that can asynchronously listen for new messages on a Cloud Pub/Sub Subscription using the Python Google Cloud client libraries. A task queue (RQ) is used to handle the asynchronous polling tasks and processing of the received messages. Server-side provides a number of endpoints to feed data to client-side, start/stop polling tasks and authenticate users. Client-side provides an interactive user interface, with asynchronous AJAX request promoting usability and enabling monitoring of Cloud APIs and processes in your Cloud projects.

Enjoy coding!

References

  1. Example flask application from Miguel Grinberg.
Above opinions and any mistakes are my own. I am not affiliated in any way with companies, or organizations mentioned above. The code samples provided are licensed under the Apache 2.0 License and rest content of this page is licensed under the Creative Commons Attribution 3.0 License, except if noted otherwise.

comments powered by Disqus