Contents

  1. Introduction
  2. Protocol

    1. Authentication
    2. Active room selection
    3. A new message
  3. Realization

    1. Django application for chat
    2. Write a simple websocket-server
    3. Subscribe to worker_response topic
    4. Processing of connection and client's messages
    5. Write a handler
    6. Implementation of messages processing
      1. Message processing with action=authenticate
      2. Processing of message with action=new_message
      3. Processing of message with action=select_room
    7. Send response to connections
    8. Summary
    9. Useful links

In this article we consider one of the methods to implement a chat written in WebSockets into existing django project using aiohttp and aioredis.

Introduction

A server written in aiohttp is single-threaded, which means that any blocking operation will block performance of the entire flow. From this it follows, that we cannot use Django ORM directly in this flow.

One of the common solutions to this problem is to use the message queue. On client’s request we will publish a message in one of the worker process topics. A handler that is subscribed to this topic processes and returns the response to worker response topic. WebSocket -server receives a response, processes it and sends to necessary connections.

For simplicity in the example we will use Redis PUB/SUB.

Protocol

Before writing a web-server we need to develop communication protocol between client and server. In our example we will have chat with rooms on the same page, so we will need the following messages:

  • authentication;
  • active room selection;
  • a new message.

Authentication

For simplicity, we will use authentication via token. Each user has unique token, and when they connect to the WebSocket server we check the presence of this token and its authenticity.

The client will send the following message:

{
    "uuid": uuid,  // unique message identifier
    "action": "authenticate",
    "token": str
}

In response you will receive:

{
    "uuid": uuid,
    "action": "authenticate",
    "status": "success/error",  // message processing status
    "error_message": str  // error message in case of status=error
}

Active room selection

The message about room selection is needed to receive the latest posts from this chat and then display them for the user. When the user changes the room, the client sends the following message:

{
    "uuid": uuid,  // unique message identifier
    "action": "new_message",
    "room": int,  // id of the room selected by user
    "text": str  // message text
}

And receives as the response:

{
    "uuid": uuid,  // unique identifier of message received
    "action": "select_room",
    "room": int,  // id of the room selected by user
    "status": "success/error",  // message processing status
    "error_message": str  // error message in case of status=error
    "room_messages": list of room messages  // last N messages in case of status=success
}

A new message

After room selection the user is able to send messages. Client will send the following message:

{
    "uuid": uuid,  // unique message identifier
    "action": "new_message",
    "room": int,  // id of the room selected by user
    "text": str  // message text
}

Since it is assumed that the list of chats is on a single page, with a successful processing we will send the answer to all users of this chat. This will allow us to note, that chat rooms have new messages and even enable us to give short previews.

{
    "uuid": uuid,  // unique message identifier
    "action": "new_message",
    "status": "success",  // message processing status
    "room": int,  // id of the room selected by user
    "message": dict (serialized message)
}

When processing fails, the user receives an error response:

{
    "uuid": uuid,  // unique identifier of message received
    "action": "new_message",
    "room": int,  // id of the room selected by user
    "status": "error",
    "error_message": str  // error message in case of status=error
}

Now when a simple protocol is designed, let’s proceed with realization.

Realization

Django application for chat

First of all we create a django-application in which we will add chat and messages models. Now let’s create a migration and migrate. That’s what we should get:

# models.py
import uuid

from django.db import models
from django.conf import settings
from django.utils.translation import ugettext_lazy as _


class ChatRoom(models.Model):
    id = models.UUIDField(verbose_name=_('Room ID'), primary_key=True, default=uuid.uuid4, editable=False)
    date_created = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
    users = models.ManyToManyField(settings.AUTH_USER_MODEL, verbose_name=_('User'))

    def __str__(self):
        return 'Room {}'.format(self.id)


class ChatMessage(models.Model):
    user = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('User'))
    room = models.ForeignKey('chat.ChatRoom', verbose_name=_('Chat room ID'), related_name='messages')
    date_created = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
    text = models.TextField(verbose_name=_('Message'))

    def __str__(self):
        return 'Message from {}'.format(self.user)

Link to changes

Write a simple websocket-server

Now let’s create a new Django application called websockets with core module inside, as well as empty server.py, settings.py, views.py files. You should end up with the following structure of files and folders:

websockets/
|-- admin.py
|-- core
|   |-- __init__.py
|   |-- server.py
|   |-- settings.py
|   `-- views.py
|-- apps.py
|-- __init__.py
|-- models.py
|-- tests.py
`-- views.py

Then we will write a simple websocket-server in aiohttp_core/server.py file and connection handler in aiohttp_core/views.py file. And try to run it.

# websockets/core/server.py
from aiohttp import web
from django_aiohttp_websockets.websockets.core import views


app = web.Application()
app.router.add_get('/ws', views.WebSocketView)
# websockets/core/views.py
from aiohttp import web, WSMsgType


class WebSocketView(web.View):

    def __init__(self, *args, **kwargs):
        super(WebSocketView, self).__init__(*args, **kwargs)
        self.app = self.request.app

    async def get(self):

        ws = web.WebSocketResponse()
        await ws.prepare(self.request)

        async for msg in ws:
            if msg.type == WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                else:
                    ws.send_str(msg.data + '/answer')
            elif msg.type == WSMsgType.ERROR:
                print('ws connection closed with exception %s' %
                      ws.exception())

        print('websocket connection closed')
        return ws

We will run websocket-server with the help of gunicorn. To do this, go to the root folder of the project and run the command:

gunicorn django_aiohttp_websockets.websockets.core.server:app --bind localhost:8080 --worker-class aiohttp.worker.GunicornWebWorker

Verify if everything is working by clicking to address http://localhost:8080/ws

No WebSocket UPGRADE hdr: None
 Can "Upgrade" only to «WebSocket".

Link to changes

Subscribe to worker_response topic

Before you subscribe to a topic, create a WSApplication class inherited from aiohttp.web.Application and specify the methods to be executed in it at setups and shutdowns. Add settings to connect to redis and topic names to websockets/core/settings.py file and submit the creation of Application object in app.py file.

At setup we need to create a connection to redis and add a task, which will process incoming messages from worker response topic. To do this, we define the method _setup and subscribe_to_channel.

async def _setup(self):
    self.router.add_get('/ws', views.WebSocketView)
    self.redis_subscriber = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
    self.tasks.append(self.loop.create_task(self.subscribe_to_channel(settings.WORKER_RESPONSE_TOPIC)))

async def subscribe_to_channel(self, topic):
    self.logger.info('Subscribe to channel: %s', topic)
    try:
        channel, *_ = await self.redis_subscriber.subscribe(topic)

        while await channel.wait_message():
            try:
                raw_msg = await channel.get()
                msg = json.loads(raw_msg.decode('utf-8'))
                # process message here

            except (json.JSONDecodeError, ValueError, Exception) as e:
                self.logger.error('Exception while processing redis msg: %s', e)

    except asyncio.CancelledError:
        self.logger.error('CancelledError exception received. Unsubscribe from: %s', topic)
        await self.redis_subscriber.unsubscribe(topic)

At shutdown we need to close all connections and complete the fulfillment of all tasks. To do this, we define a method _on_shutdown_handler and add it to the list of methods, which will be called at shutdown signal self.on_shutdown.append(self._on_shutdown_handler). In _on_shutdown_handler method we go through the whole list of connections and shut them down, as well as cancel all added tasks.

async def _on_shutdown_handler(self, app):
    for task in self.tasks:
        task.cancel()
        await task

    for ws in self.websockets:
        await ws.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')

    if self.redis_subscriber and not self.redis_subscriber.closed:
        self.redis_subscriber.close()
        await self.redis_subscriber.wait_closed()

That’s what you should end up with:

# websockets/core/server.py
import asyncio
import json
import logging

import aioredis
from aiohttp import web, WSCloseCode

from django_aiohttp_websockets.websockets.core import views, settings


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(process)d - %(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
console.setFormatter(formatter)
logger.addHandler(console)


class WSApplication(web.Application):

    def __init__(self, **kwargs):
        super(WSApplication, self).__init__(**kwargs)
        self.tasks = []
        self.websockets = {}
        self.logger = logger

        self.on_shutdown.append(self._on_shutdown_handler)
        self.loop.run_until_complete(self._setup())

    async def _setup(self):
        self.router.add_get('/ws', views.WebSocketView)
        self.redis_subscriber = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
        self.tasks.append(self.loop.create_task(self.subscribe_to_channel(settings.WORKER_RESPONSE_TOPIC)))

    async def _on_shutdown_handler(self, app):
        for task in self.tasks:
            task.cancel()
            await task

        for ws in self.websockets:
            await ws.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')

        if self.redis_subscriber and not self.redis_subscriber.closed:
            self.redis_subscriber.close()
            await self.redis_subscriber.wait_closed()

    async def subscribe_to_channel(self, topic):
        self.logger.info('Subscribe to channel: %s', topic)
        try:
            channel, *_ = await self.redis_subscriber.subscribe(topic)

            while await channel.wait_message():
                try:
                    raw_msg = await channel.get()
                    msg = json.loads(raw_msg.decode('utf-8'))
                    # process message here

                except (json.JSONDecodeError, ValueError, Exception) as e:
                    self.logger.error('Exception while processing redis msg: %s', e)

        except asyncio.CancelledError:
            self.logger.error('CancelledError exception received. Unsubscribe from: %s', topic)
            await self.redis_subscriber.unsubscribe(topic)
# websockets/core/app.py
from django_aiohttp_websockets.websockets.core.server import WSApplication

app = WSApplication()
# websockets/core/settings.py
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
WORKER_RESPONSE_TOPIC = 'worker_response'
WORKER_PROCESS_TOPICS = ['worker_process_1']  # , 'worker_process_2', 'worker_process_3']

Due to the fact that we have moved the creation of WSApplication to the websockets/core/app.py file, the run command has changed:

gunicorn django_aiohttp_websockets.websockets.core.app:app --bind 0.0.0.0:8080 --worker-class aiohttp.worker.GunicornWebWorker

Link to changes

Processing of connection and client's messages

When a client connects, we will keep it in self.websockets of our WSApplication. As you can see in _on_shutdown_handler method, we look through the whole list of connections and close them.

To do this, define two methods in WSApplication class: handle_ws_connect и handle_ws_disconnect.

def handle_ws_connect(self, ws, view):
    self.websockets[ws] = {
        'view': view,
        'messages_ids': [],
        'session_data': {
            'user_pk': None
        }
    }
    self.logger.debug('[%s] Websocket was added to websocket list', id(ws))

def handle_ws_disconnect(self, ws):
    self.websockets.pop(ws, None)
    self.logger.debug('[%s] Websocket was removed from websockets list', id(ws))

Then add in _setup method of WSApplication class one more connection to redis, which will be used to send messages to worker process topic and write a method which will do it.

async def publish_message_to_worker(self, ws, msg):
    if not all(msg.get(key) for key in self.WS_MESSAGE_REQUIRED_KEYS):
        raise Exception('Missing required keys')

    msg_id = msg['uuid']
    publish_topic = random.choice(settings.WORKER_PROCESS_TOPICS)

    msg['session_data'] = self.websockets[ws]['session_data']
    self.websockets[ws]['messages_ids'].append(msg_id)
    self.logger.debug('[%s] Publish message with id \'%s\' to topic \'%s\'', id(ws), msg_id, publish_topic)
    await self.redis_publisher.publish_json(publish_topic, msg)

Now use these methods in websockets/core/views.py file.

# websockets/core/views.py
import json

from aiohttp import web, WSMsgType, WSCloseCode


class WebSocketView(web.View):

    def __init__(self, *args, **kwargs):
        super(WebSocketView, self).__init__(*args, **kwargs)
        self.app = self.request.app
        self.logger = self.app.logger

    async def get(self):
        ws = web.WebSocketResponse()
        await ws.prepare(self.request)

        ws_id = id(ws)
        self.logger.debug('[%s] New websocket connection', ws_id)
        self.app.handle_ws_connect(ws, self)

        async for msg_raw in ws:
            if msg_raw.tp == WSMsgType.TEXT:
                try:
                    msg = json.loads(msg_raw.data)
                    self.logger.debug('[%s] Publish message %s to redis', ws_id, msg)
                    await self.app.publish_message_to_worker(ws, msg)
                except Exception as e:
                    self.logger.error('[%s] Invalid message format. Exception: %s', ws_id, e)
                    await ws.close(code=WSCloseCode.UNSUPPORTED_DATA, message=str(e))
                    break

            elif msg_raw.tp == WSMsgType.ERROR:
                self.logger.error('[%s] ERROR WS connection closed with exception: %s', ws_id, ws.exception())

        self.logger.debug('[%s] Websocket connection closed', ws_id)
        self.app.handle_ws_disconnect(ws)
        return ws

Link to changes

Write a handler

Before we start writing a handler, let’s have a look at what we need:

  • the ability to use Django ORM
  • the ability to specify which of the worker process topic subscribe to

To fulfill these conditions django management commands will suit perfectly. Let’s create one in websockets application and call it message_process_worker:

# websockets/management/commands/message_process_worker.py
from django.core.management import BaseCommand

from django_aiohttp_websockets.websockets.core import settings


class Command(BaseCommand):

    def add_arguments(self, parser):
        parser.add_argument('--host', type=str, default=settings.REDIS_HOST)
        parser.add_argument('--port', type=int, default=settings.REDIS_PORT)
        parser.add_argument('--subscribe_topic', type=str, required=True)

    def handle(self, *args, **options):
        pass

Then createwebsockets/core/worker.py file and write a class, which will connect to subscribe_topic and process messages, then publish them back to worker response topic andwebsockets/core/worker_message_handlers.pyfile in which we will specify a class with the method for processing the received message.

websockets/core/worker_message_handlers.py

class MessageProcessHandler(object):
    def __init__(self, logger):
        self.logger = logger

    def process_message(self, msg):
        return {'test': 'test'}
# websockets/core/worker.py
import asyncio
import json
import signal

import aioredis

from django_aiohttp_websockets.websockets.core import settings
from django_aiohttp_websockets.websockets.core.worker_message_handlers import MessageProcessHandler


class AioredisWorker(object):
    def __init__(self, host, port, subscribe_topic, logger, loop=None, **kwargs):
        self.logger = logger
        self.loop = loop or asyncio.get_event_loop()
        self.host = host
        self.port = port
        self.subscribe_topic = subscribe_topic
        self.redis_subscriber = None
        self.redis_publisher = None
        self.tasks = []        self.message_process_handler = MessageProcessHandler(logger=self.logger)

        self.loop.add_signal_handler(signal.SIGTERM, self.shutdown)
        self.loop.add_signal_handler(signal.SIGINT, self.shutdown)

    async def _shutdown(self):
        for task in self.tasks:
            task.cancel()
            await task

        for redis_conn in [self.redis_subscriber, self.redis_publisher]:
            if redis_conn and not redis_conn.closed:
                redis_conn.close()
                await redis_conn.wait_closed()

    def shutdown(self):
        self.logger.info('Shutdown initiated. Unsubscribing from all channels')
        self.loop.run_until_complete(self._shutdown())
        self.loop.stop()
        self.loop.close()

    async def subscribe_to_channel(self, ch):
        try:
            channel, *_ = await self.redis_subscriber.subscribe(ch)
            while (await channel.wait_message()):
                try:
                    raw_msg = await channel.get()
                    msg = json.loads(raw_msg.decode('utf-8'))
                    self.logger.debug('Processing message %s', msg)
                    response = self.message_process_handler.process_message(msg)
                    await self.redis_publisher.publish_json(settings.WORKER_RESPONSE_TOPIC, response)
                except Exception as e:
                    self.logger.error('Exception while processing redis msg: %s', e)

        except asyncio.CancelledError:
            self.logger.error('CancelledError exception received. Unsubscribe from channel %s', ch)
            await self.redis_subscriber.unsubscribe(ch)

    async def _run(self):
        self.logger.info('Redis connection at %s:%s. Subscribed to: %s.', self.host, self.port, self.subscribe_topic)
        self.redis_subscriber = await aioredis.create_redis((self.host, self.port), loop=self.loop)
        self.redis_publisher = await aioredis.create_redis((self.host, self.port), loop=self.loop)
        self.tasks.append(self.loop.create_task(self.subscribe_to_channel(self.subscribe_topic)))

    def run(self):
        self.loop.run_until_complete(self._run())
        self.loop.run_forever()

Now add AioredisWorker object’s creation and its setup to our management command.

# websockets/management/commands/message_process_worker.py
import logging

from django.core.management import BaseCommand

from django_aiohttp_websockets.websockets.core import settings
from django_aiohttp_websockets.websockets.core.worker import AioredisWorker


logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(process)d - %(message)s', datefmt='%Y-%m-%dT%H:%M:%S')  # noqa
console.setFormatter(formatter)
logger.addHandler(console)


class Command(BaseCommand):

    def add_arguments(self, parser):
        parser.add_argument('--host', type=str, default=settings.REDIS_HOST)
        parser.add_argument('--port', type=int, default=settings.REDIS_PORT)
        parser.add_argument('--subscribe_topic', type=str, required=True)

    def handle(self, *args, **options):
        options['logger'] = logger
        AioredisWorker(**options).run()

Link to changes

Implementation of messages processing

When implementing the messages processing, I am going to use django-rest-framework, as it initially has token authentication, and also because of the serializers. Of course, you can use something else.

First, connect django-rest-framework and create serializers for User and ChatMessage models.

# chat/serializers.py
from rest_framework import serializers

from django_aiohttp_websockets.users.serializers import UserSerializer
from django_aiohttp_websockets.chat.models import ChatMessage


class ChatMessageSerializer(serializers.ModelSerializer):
    room = serializers.SerializerMethodField()
    timestamp = serializers.SerializerMethodField()
    user = UserSerializer()

    class Meta:
        model = ChatMessage
        fields = ('id', 'user', 'room', 'timestamp', 'text', )

    def get_timestamp(self, instance):
        return instance.date_created.timestamp()

    def get_room(self, instance):
        return instance.room.pk.hex
# users/serializers.py
from rest_framework import serializers

from django.contrib.auth import get_user_model

User = get_user_model()


class UserSerializer(serializers.ModelSerializer):

    class Meta:
        model = User
        fields = ('username', )

Now add validation that received messages meet our format. They all must necessarily have uuid and action fields. If they don’t, we will return the error. To do this, we change the process_messagemethod, add attributes of REQUIRED_KEYS, ACTIONS and error_messages, and methods _validate_message and _error_response.

In case of an error in the processing, we will send a websocket-server a message with ERROR_RESPONSE_TYPE, in case of successful processing - SUCCESS_RESPONSE_TYPE. Websocket-server will be guided by these types and decide how to send the responses. Let’s specify them in websockets/core/utils.py file.

# websockets/core/utils.py
ERROR_RESPONSE_TYPE = 'error_response'
SUCCESS_RESPONSE_TYPE = 'success_response'
# websockets/core/worker_message_handlers.py
from django_aiohttp_websockets.websockets.core import utils


class MessageProcessHandler(object):
    REQUIRED_KEYS = ['action', 'uuid', ]
    ACTIONS = ['authenticate', 'select_room', 'new_message', ]
    error_messages = {
        'invalid_message_format': 'Some of required keys are absent or empty. Required keys: %s' % REQUIRED_KEYS,
        'invalid_payload': 'Invalid message action. Next actions are allowed: %s' % ACTIONS,
    }

    def __init__(self, logger):
        self.logger = logger

    def _error_response(self, msg, error_message):
        return {
            'type': utils.ERROR_RESPONSE_TYPE,
            'response': {
                'uuid': msg.get('uuid'),
                'action': msg.get('action'),
                'error_message': str(error_message),
                'status': 'error',
            }
        }

    def process_message(self, msg):
        try:
            self._validate_message(msg)
            response = getattr(self, 'process_%s' % msg['action'])(msg)
        except Exception as e:
            self.logger.error("Error occurred while processing action: %s", str(e))
            return self._error_response(msg, e)

        return response

    def _validate_message(self, msg):
        if not all(msg.get(key) for key in self.REQUIRED_KEYS):
            raise Exception(self.error_messages['invalid_message_format'])

        if msg['action'] not in self.ACTIONS:
            raise Exception(self.error_messages['invalid_payload'])

Message processing with action=authenticate

When we receive the message with action=authenticate we will check whether the user with this token exists or not. When the processing is successful, we will add session data, so that we could understand which user owns the connection. We will process the received session data and save it in websocket-server code. To enable this, add _success_response and process_authenticate methods to our WorkerHandler class, as well as error message about invalid token:

class MessageProcessHandler(object):
    ...
    error_messages = {
        ...
        'invalid_token': 'Invalid authentication token',
    }

    def _success_response(self, msg, response=None, send_to=None, session_data=None):
        if response is None:
            response = {}

        response.update({
            'uuid': msg.get('uuid'),
            'action': msg.get('action'),
            'status': 'success',
        })

        resp = {
            'type': utils.SUCCESS_RESPONSE_TYPE,
            'send_to': send_to,
            'session_data': session_data,
            'response': response
        }
        return resp

    def process_authenticate(self, msg):
        try:
            user = User.objects.get(auth_token__key=msg.get('token'))
        except User.DoesNotExist:
            raise Exception(self.error_messages['invalid_token'])

        return self._success_response(msg, session_data={'user_pk': user.pk})

Processing of message with action=select_room

When you receive a message with action=select_room we should return the list of recent messages of this chat room, but first we need to make sure that the user is authenticated and is a member of this room.

To do this specify _get_room and process_select_room methods and add validation of session data to _validate_message if action doesn’t equal authenticate.

class MessageProcessHandler(object):
    ...
    error_messages = {
        ...
        'authentication_required': 'Authentication required before sending any other messages',
        'invalid_room_id': 'Invalid room id',
    }

    def _validate_message(self, msg):
        ...
        if msg['action'] != 'authenticate' and not msg.get('session_data', {}).get('user_pk'):
            raise Exception(self.error_messages['authentication_required'])

    def _get_room(self, msg):
        try:
            return ChatRoom.objects.get(pk=msg.get('room'), users__pk=msg['session_data']['user_pk'])
        except ChatRoom.DoesNotExist:
            raise Exception(self.error_messages['invalid_room_id'])

    def process_select_room(self, msg):
        room = self._get_room(msg)
        messages = reversed(room.messages.all().order_by('-date_created')[:20])
        response = {
            'room': room.pk.hex,
            'room_messages': ChatMessageSerializer(messages, many=True).data,
        }
        return self._success_response(msg, response=response)

Processing of message with action=new_message

When you receive a message withaction=new_message, check whether the user is a member of this chat room and if there is a text of the message. These conditions fulfilled, we create a new message. In order to send information about a new message we add send_to key if the processing is successful, which will store the ids of all the users who should receive this information. Websocket-server will look for all of the connections of these users and send the response to them. To do this, add the text of the error message and process_new_message method.

class MessageProcessHandler(object):
    ...
    error_messages = {
        ...
        'empty_text': 'Message text can\'t be empty',
    }

    def process_new_message(self, msg):
        room = self._get_room(msg)
        send_to = list(room.users.all().values_list('id', flat=True))

        if not msg.get('text', '').strip():
            raise Exception(self.error_messages['empty_text'])

        chat_message = ChatMessage.objects.create(
            user_id=msg['session_data']['user_pk'],
            text=msg['text'],
            room=room,
        )

        response = {
            'room': room.pk.hex,
            'message': ChatMessageSerializer(chat_message).data,
        }
        return self._success_response(msg, response=response, send_to=send_to)

Link to changes

Send response to connections

Let’s consider the situations that may arise. In case of processing error, the connection which has sent us the message should get a response. Exactly the same situation is with messages about authentication and room selection. In other case, all connection that are included in send_to parameter should receive a response. Also, when we receive a successful response from workers, we have to update the session data.

Before sending a message to worker process topic we save it to uuid, i.e. when we receive a response we can find the connection to send a response to. If there is send_to in response, we have to send it to all suitable connections.

To do this, create methods _find_ws_by_message_uuid, _find_ws_by_send_to, _update_session and process_worker_response in WSApplication class.

class WSApplication(web.Application):
    ...
    def _find_ws_by_message_uuid(self, msg_uuid):
        for ws, ws_data in self.websockets.items():
            if msg_uuid in ws_data.get('messages_ids', []):
                return ws

    def _find_ws_by_send_to(self, send_to):
        websockets = []
        for ws, ws_data in self.websockets.items():
            if ws_data.get('session_data', {}).get('user_pk') in send_to:
                websockets.append(ws)
        return websockets

    def _update_session(self, ws, response_msg):
        if response_msg.get('session_data'):
            self.websockets[ws]['session_data'] = response_msg['session_data']

    async def process_worker_response(self, response_msg):
        response = response_msg['response']
        msg_uuid = response['uuid']
        send_to = response_msg.get('send_to')
        self.logger.debug('Processing response for msg with id \'%s\'', msg_uuid)

        ws = self._find_ws_by_message_uuid(msg_uuid)
        if response_msg['type'] == utils.ERROR_RESPONSE_TYPE:
            if ws:
                ws.send_str(json.dumps(response))

        elif response_msg['type'] == utils.SUCCESS_RESPONSE_TYPE:
            self._update_session(ws, response_msg)
            if not send_to:
                if ws:
                    ws.send_str(json.dumps(response))
            else:
                websockets = self._find_ws_by_send_to(send_to)
                for ws in websockets:
                    ws.send_str(json.dumps(response))

Link to changes

Summary

Ultimately, we have created a simple websocket-server, which can be used for writing a chat. If you have any questions please contact us by email and we will be happy to answer!

Projects source code is available on Repository.

Useful links

  1. Python asyncIO documentation
  2. aioHTTP documentation
  3. aioRedis documentation
  4. An example of a simple chat written in aiohttp asynchronous web framework