Contents
- Introduction
In this article we consider one of the methods to implement a chat written in WebSockets into existing django project using aiohttp and aioredis.
Introduction
A server written in aiohttp is single-threaded, which means that any blocking operation will block performance of the entire flow. From this it follows, that we cannot use Django ORM directly in this flow.
One of the common solutions to this problem is to use the message queue. On client’s request we will publish a message in one of the worker process topics. A handler that is subscribed to this topic processes and returns the response to worker response topic. WebSocket -server receives a response, processes it and sends to necessary connections.
For simplicity in the example we will use Redis PUB/SUB.

Protocol
Before writing a web-server we need to develop communication protocol between client and server. In our example we will have chat with rooms on the same page, so we will need the following messages:
- authentication;
- active room selection;
- a new message.
Authentication
For simplicity, we will use authentication via token. Each user has unique token, and when they connect to the WebSocket server we check the presence of this token and its authenticity.
The client will send the following message:
{
"uuid": uuid, // unique message identifier
"action": "authenticate",
"token": str
}
In response you will receive:
{
"uuid": uuid,
"action": "authenticate",
"status": "success/error", // message processing status
"error_message": str // error message in case of status=error
}
Active room selection
The message about room selection is needed to receive the latest posts from this chat and then display them for the user. When the user changes the room, the client sends the following message:
{
"uuid": uuid, // unique message identifier
"action": "new_message",
"room": int, // id of the room selected by user
"text": str // message text
}
And receives as the response:
{
"uuid": uuid, // unique identifier of message received
"action": "select_room",
"room": int, // id of the room selected by user
"status": "success/error", // message processing status
"error_message": str // error message in case of status=error
"room_messages": list of room messages // last N messages in case of status=success
}
A new message
After room selection the user is able to send messages. Client will send the following message:
{
"uuid": uuid, // unique message identifier
"action": "new_message",
"room": int, // id of the room selected by user
"text": str // message text
}
Since it is assumed that the list of chats is on a single page, with a successful processing we will send the answer to all users of this chat. This will allow us to note, that chat rooms have new messages and even enable us to give short previews.
{
"uuid": uuid, // unique message identifier
"action": "new_message",
"status": "success", // message processing status
"room": int, // id of the room selected by user
"message": dict (serialized message)
}
When processing fails, the user receives an error response:
{
"uuid": uuid, // unique identifier of message received
"action": "new_message",
"room": int, // id of the room selected by user
"status": "error",
"error_message": str // error message in case of status=error
}
Now when a simple protocol is designed, let’s proceed with realization.
Realization
Django application for chat
First of all we create a django-application in which we will add chat and messages models. Now let’s create a migration and migrate. That’s what we should get:
# models.py
import uuid
from django.db import models
from django.conf import settings
from django.utils.translation import ugettext_lazy as _
class ChatRoom(models.Model):
id = models.UUIDField(verbose_name=_('Room ID'), primary_key=True, default=uuid.uuid4, editable=False)
date_created = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
users = models.ManyToManyField(settings.AUTH_USER_MODEL, verbose_name=_('User'))
def __str__(self):
return 'Room {}'.format(self.id)
class ChatMessage(models.Model):
user = models.ForeignKey(settings.AUTH_USER_MODEL, verbose_name=_('User'))
room = models.ForeignKey('chat.ChatRoom', verbose_name=_('Chat room ID'), related_name='messages')
date_created = models.DateTimeField(verbose_name=_('Created'), auto_now_add=True)
text = models.TextField(verbose_name=_('Message'))
def __str__(self):
return 'Message from {}'.format(self.user)
Write a simple websocket-server
Now let’s create a new Django application called websockets with core module inside, as well as empty server.py, settings.py, views.py files. You should end up with the following structure of files and folders:
websockets/
|-- admin.py
|-- core
| |-- __init__.py
| |-- server.py
| |-- settings.py
| `-- views.py
|-- apps.py
|-- __init__.py
|-- models.py
|-- tests.py
`-- views.py
Then we will write a simple websocket-server in aiohttp_core/server.py file and connection handler in aiohttp_core/views.py file. And try to run it.
# websockets/core/server.py
from aiohttp import web
from django_aiohttp_websockets.websockets.core import views
app = web.Application()
app.router.add_get('/ws', views.WebSocketView)
# websockets/core/views.py
from aiohttp import web, WSMsgType
class WebSocketView(web.View):
def __init__(self, *args, **kwargs):
super(WebSocketView, self).__init__(*args, **kwargs)
self.app = self.request.app
async def get(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
async for msg in ws:
if msg.type == WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
else:
ws.send_str(msg.data + '/answer')
elif msg.type == WSMsgType.ERROR:
print('ws connection closed with exception %s' %
ws.exception())
print('websocket connection closed')
return ws
We will run websocket-server with the help of gunicorn. To do this, go to the root folder of the project and run the command:
gunicorn django_aiohttp_websockets.websockets.core.server:app --bind localhost:8080 --worker-class aiohttp.worker.GunicornWebWorker
Verify if everything is working by clicking to address http://localhost:8080/ws
No WebSocket UPGRADE hdr: None
Can "Upgrade" only to «WebSocket".
Subscribe to worker_response topic
Before you subscribe to a topic, create a WSApplication class inherited from aiohttp.web.Application and specify the methods to be executed in it at setups and shutdowns. Add settings to connect to redis and topic names to websockets/core/settings.py file and submit the creation of Application object in app.py file.
At setup we need to create a connection to redis and add a task, which will process incoming messages from worker response topic. To do this, we define the method _setup
and subscribe_to_channel
.
async def _setup(self):
self.router.add_get('/ws', views.WebSocketView)
self.redis_subscriber = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
self.tasks.append(self.loop.create_task(self.subscribe_to_channel(settings.WORKER_RESPONSE_TOPIC)))
async def subscribe_to_channel(self, topic):
self.logger.info('Subscribe to channel: %s', topic)
try:
channel, *_ = await self.redis_subscriber.subscribe(topic)
while await channel.wait_message():
try:
raw_msg = await channel.get()
msg = json.loads(raw_msg.decode('utf-8'))
# process message here
except (json.JSONDecodeError, ValueError, Exception) as e:
self.logger.error('Exception while processing redis msg: %s', e)
except asyncio.CancelledError:
self.logger.error('CancelledError exception received. Unsubscribe from: %s', topic)
await self.redis_subscriber.unsubscribe(topic)
At shutdown we need to close all connections and complete the fulfillment of all tasks. To do this, we define a method _on_shutdown_handler
and add it to the list of methods, which will be called at shutdown signal self.on_shutdown.append(self._on_shutdown_handler)
. In _on_shutdown_handler
method we go through the whole list of connections and shut them down, as well as cancel all added tasks.
async def _on_shutdown_handler(self, app):
for task in self.tasks:
task.cancel()
await task
for ws in self.websockets:
await ws.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')
if self.redis_subscriber and not self.redis_subscriber.closed:
self.redis_subscriber.close()
await self.redis_subscriber.wait_closed()
That’s what you should end up with:
# websockets/core/server.py
import asyncio
import json
import logging
import aioredis
from aiohttp import web, WSCloseCode
from django_aiohttp_websockets.websockets.core import views, settings
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(process)d - %(message)s', datefmt='%Y-%m-%dT%H:%M:%S')
console.setFormatter(formatter)
logger.addHandler(console)
class WSApplication(web.Application):
def __init__(self, **kwargs):
super(WSApplication, self).__init__(**kwargs)
self.tasks = []
self.websockets = {}
self.logger = logger
self.on_shutdown.append(self._on_shutdown_handler)
self.loop.run_until_complete(self._setup())
async def _setup(self):
self.router.add_get('/ws', views.WebSocketView)
self.redis_subscriber = await aioredis.create_redis((settings.REDIS_HOST, settings.REDIS_PORT), loop=self.loop)
self.tasks.append(self.loop.create_task(self.subscribe_to_channel(settings.WORKER_RESPONSE_TOPIC)))
async def _on_shutdown_handler(self, app):
for task in self.tasks:
task.cancel()
await task
for ws in self.websockets:
await ws.close(code=WSCloseCode.GOING_AWAY, message='Server shutdown')
if self.redis_subscriber and not self.redis_subscriber.closed:
self.redis_subscriber.close()
await self.redis_subscriber.wait_closed()
async def subscribe_to_channel(self, topic):
self.logger.info('Subscribe to channel: %s', topic)
try:
channel, *_ = await self.redis_subscriber.subscribe(topic)
while await channel.wait_message():
try:
raw_msg = await channel.get()
msg = json.loads(raw_msg.decode('utf-8'))
# process message here
except (json.JSONDecodeError, ValueError, Exception) as e:
self.logger.error('Exception while processing redis msg: %s', e)
except asyncio.CancelledError:
self.logger.error('CancelledError exception received. Unsubscribe from: %s', topic)
await self.redis_subscriber.unsubscribe(topic)
# websockets/core/app.py
from django_aiohttp_websockets.websockets.core.server import WSApplication
app = WSApplication()
# websockets/core/settings.py
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
WORKER_RESPONSE_TOPIC = 'worker_response'
WORKER_PROCESS_TOPICS = ['worker_process_1'] # , 'worker_process_2', 'worker_process_3']
Due to the fact that we have moved the creation of WSApplication to the websockets/core/app.py file, the run command has changed:
gunicorn django_aiohttp_websockets.websockets.core.app:app --bind 0.0.0.0:8080 --worker-class aiohttp.worker.GunicornWebWorker
Processing of connection and client's messages
When a client connects, we will keep it in self.websockets
of our WSApplication. As you can see in _on_shutdown_handler
method, we look through the whole list of connections and close them.
To do this, define two methods in WSApplication class: handle_ws_connect
и handle_ws_disconnect
.
def handle_ws_connect(self, ws, view):
self.websockets[ws] = {
'view': view,
'messages_ids': [],
'session_data': {
'user_pk': None
}
}
self.logger.debug('[%s] Websocket was added to websocket list', id(ws))
def handle_ws_disconnect(self, ws):
self.websockets.pop(ws, None)
self.logger.debug('[%s] Websocket was removed from websockets list', id(ws))
Then add in _setup
method of WSApplication class one more connection to redis, which will be used to send messages to worker process topic and write a method which will do it.
async def publish_message_to_worker(self, ws, msg):
if not all(msg.get(key) for key in self.WS_MESSAGE_REQUIRED_KEYS):
raise Exception('Missing required keys')
msg_id = msg['uuid']
publish_topic = random.choice(settings.WORKER_PROCESS_TOPICS)
msg['session_data'] = self.websockets[ws]['session_data']
self.websockets[ws]['messages_ids'].append(msg_id)
self.logger.debug('[%s] Publish message with id \'%s\' to topic \'%s\'', id(ws), msg_id, publish_topic)
await self.redis_publisher.publish_json(publish_topic, msg)
Now use these methods in websockets/core/views.py
file.
# websockets/core/views.py
import json
from aiohttp import web, WSMsgType, WSCloseCode
class WebSocketView(web.View):
def __init__(self, *args, **kwargs):
super(WebSocketView, self).__init__(*args, **kwargs)
self.app = self.request.app
self.logger = self.app.logger
async def get(self):
ws = web.WebSocketResponse()
await ws.prepare(self.request)
ws_id = id(ws)
self.logger.debug('[%s] New websocket connection', ws_id)
self.app.handle_ws_connect(ws, self)
async for msg_raw in ws:
if msg_raw.tp == WSMsgType.TEXT:
try:
msg = json.loads(msg_raw.data)
self.logger.debug('[%s] Publish message %s to redis', ws_id, msg)
await self.app.publish_message_to_worker(ws, msg)
except Exception as e:
self.logger.error('[%s] Invalid message format. Exception: %s', ws_id, e)
await ws.close(code=WSCloseCode.UNSUPPORTED_DATA, message=str(e))
break
elif msg_raw.tp == WSMsgType.ERROR:
self.logger.error('[%s] ERROR WS connection closed with exception: %s', ws_id, ws.exception())
self.logger.debug('[%s] Websocket connection closed', ws_id)
self.app.handle_ws_disconnect(ws)
return ws
Write a handler
Before we start writing a handler, let’s have a look at what we need:
- the ability to use Django ORM
- the ability to specify which of the worker process topic subscribe to
To fulfill these conditions django management commands will suit perfectly. Let’s create one in websockets application and call it message_process_worker
:
# websockets/management/commands/message_process_worker.py
from django.core.management import BaseCommand
from django_aiohttp_websockets.websockets.core import settings
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--host', type=str, default=settings.REDIS_HOST)
parser.add_argument('--port', type=int, default=settings.REDIS_PORT)
parser.add_argument('--subscribe_topic', type=str, required=True)
def handle(self, *args, **options):
pass
Then createwebsockets/core/worker.py
file and write a class, which will connect to subscribe_topic
and process messages, then publish them back to worker response topic andwebsockets/core/worker_message_handlers.py
file in which we will specify a class with the method for processing the received message.
websockets/core/worker_message_handlers.py
class MessageProcessHandler(object):
def __init__(self, logger):
self.logger = logger
def process_message(self, msg):
return {'test': 'test'}
# websockets/core/worker.py
import asyncio
import json
import signal
import aioredis
from django_aiohttp_websockets.websockets.core import settings
from django_aiohttp_websockets.websockets.core.worker_message_handlers import MessageProcessHandler
class AioredisWorker(object):
def __init__(self, host, port, subscribe_topic, logger, loop=None, **kwargs):
self.logger = logger
self.loop = loop or asyncio.get_event_loop()
self.host = host
self.port = port
self.subscribe_topic = subscribe_topic
self.redis_subscriber = None
self.redis_publisher = None
self.tasks = [] self.message_process_handler = MessageProcessHandler(logger=self.logger)
self.loop.add_signal_handler(signal.SIGTERM, self.shutdown)
self.loop.add_signal_handler(signal.SIGINT, self.shutdown)
async def _shutdown(self):
for task in self.tasks:
task.cancel()
await task
for redis_conn in [self.redis_subscriber, self.redis_publisher]:
if redis_conn and not redis_conn.closed:
redis_conn.close()
await redis_conn.wait_closed()
def shutdown(self):
self.logger.info('Shutdown initiated. Unsubscribing from all channels')
self.loop.run_until_complete(self._shutdown())
self.loop.stop()
self.loop.close()
async def subscribe_to_channel(self, ch):
try:
channel, *_ = await self.redis_subscriber.subscribe(ch)
while (await channel.wait_message()):
try:
raw_msg = await channel.get()
msg = json.loads(raw_msg.decode('utf-8'))
self.logger.debug('Processing message %s', msg)
response = self.message_process_handler.process_message(msg)
await self.redis_publisher.publish_json(settings.WORKER_RESPONSE_TOPIC, response)
except Exception as e:
self.logger.error('Exception while processing redis msg: %s', e)
except asyncio.CancelledError:
self.logger.error('CancelledError exception received. Unsubscribe from channel %s', ch)
await self.redis_subscriber.unsubscribe(ch)
async def _run(self):
self.logger.info('Redis connection at %s:%s. Subscribed to: %s.', self.host, self.port, self.subscribe_topic)
self.redis_subscriber = await aioredis.create_redis((self.host, self.port), loop=self.loop)
self.redis_publisher = await aioredis.create_redis((self.host, self.port), loop=self.loop)
self.tasks.append(self.loop.create_task(self.subscribe_to_channel(self.subscribe_topic)))
def run(self):
self.loop.run_until_complete(self._run())
self.loop.run_forever()
Now add AioredisWorker object’s creation and its setup to our management command.
# websockets/management/commands/message_process_worker.py
import logging
from django.core.management import BaseCommand
from django_aiohttp_websockets.websockets.core import settings
from django_aiohttp_websockets.websockets.core.worker import AioredisWorker
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)
console = logging.StreamHandler()
console.setLevel(logging.DEBUG)
formatter = logging.Formatter(fmt='%(asctime)s - %(levelname)s - %(process)d - %(message)s', datefmt='%Y-%m-%dT%H:%M:%S') # noqa
console.setFormatter(formatter)
logger.addHandler(console)
class Command(BaseCommand):
def add_arguments(self, parser):
parser.add_argument('--host', type=str, default=settings.REDIS_HOST)
parser.add_argument('--port', type=int, default=settings.REDIS_PORT)
parser.add_argument('--subscribe_topic', type=str, required=True)
def handle(self, *args, **options):
options['logger'] = logger
AioredisWorker(**options).run()
Implementation of messages processing
When implementing the messages processing, I am going to use django-rest-framework, as it initially has token authentication, and also because of the serializers. Of course, you can use something else.
First, connect django-rest-framework and create serializers for User and ChatMessage models.
# chat/serializers.py
from rest_framework import serializers
from django_aiohttp_websockets.users.serializers import UserSerializer
from django_aiohttp_websockets.chat.models import ChatMessage
class ChatMessageSerializer(serializers.ModelSerializer):
room = serializers.SerializerMethodField()
timestamp = serializers.SerializerMethodField()
user = UserSerializer()
class Meta:
model = ChatMessage
fields = ('id', 'user', 'room', 'timestamp', 'text', )
def get_timestamp(self, instance):
return instance.date_created.timestamp()
def get_room(self, instance):
return instance.room.pk.hex
# users/serializers.py
from rest_framework import serializers
from django.contrib.auth import get_user_model
User = get_user_model()
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ('username', )
Now add validation that received messages meet our format. They all must necessarily have uuid
and action
fields. If they don’t, we will return the error. To do this, we change the process_message
method, add attributes of REQUIRED_KEYS
, ACTIONS
and error_messages
, and methods _validate_message
and _error_response
.
In case of an error in the processing, we will send a websocket-server a message with ERROR_RESPONSE_TYPE
, in case of successful processing - SUCCESS_RESPONSE_TYPE
. Websocket-server will be guided by these types and decide how to send the responses. Let’s specify them in websockets/core/utils.py
file.
# websockets/core/utils.py
ERROR_RESPONSE_TYPE = 'error_response'
SUCCESS_RESPONSE_TYPE = 'success_response'
# websockets/core/worker_message_handlers.py
from django_aiohttp_websockets.websockets.core import utils
class MessageProcessHandler(object):
REQUIRED_KEYS = ['action', 'uuid', ]
ACTIONS = ['authenticate', 'select_room', 'new_message', ]
error_messages = {
'invalid_message_format': 'Some of required keys are absent or empty. Required keys: %s' % REQUIRED_KEYS,
'invalid_payload': 'Invalid message action. Next actions are allowed: %s' % ACTIONS,
}
def __init__(self, logger):
self.logger = logger
def _error_response(self, msg, error_message):
return {
'type': utils.ERROR_RESPONSE_TYPE,
'response': {
'uuid': msg.get('uuid'),
'action': msg.get('action'),
'error_message': str(error_message),
'status': 'error',
}
}
def process_message(self, msg):
try:
self._validate_message(msg)
response = getattr(self, 'process_%s' % msg['action'])(msg)
except Exception as e:
self.logger.error("Error occurred while processing action: %s", str(e))
return self._error_response(msg, e)
return response
def _validate_message(self, msg):
if not all(msg.get(key) for key in self.REQUIRED_KEYS):
raise Exception(self.error_messages['invalid_message_format'])
if msg['action'] not in self.ACTIONS:
raise Exception(self.error_messages['invalid_payload'])
Message processing with action=authenticate
When we receive the message with action=authenticate
we will check whether the user with this token exists or not. When the processing is successful, we will add session data, so that we could understand which user owns the connection. We will process the received session data and save it in websocket-server code. To enable this, add _success_response
and process_authenticate
methods to our WorkerHandler class, as well as error message about invalid token:
class MessageProcessHandler(object):
...
error_messages = {
...
'invalid_token': 'Invalid authentication token',
}
def _success_response(self, msg, response=None, send_to=None, session_data=None):
if response is None:
response = {}
response.update({
'uuid': msg.get('uuid'),
'action': msg.get('action'),
'status': 'success',
})
resp = {
'type': utils.SUCCESS_RESPONSE_TYPE,
'send_to': send_to,
'session_data': session_data,
'response': response
}
return resp
def process_authenticate(self, msg):
try:
user = User.objects.get(auth_token__key=msg.get('token'))
except User.DoesNotExist:
raise Exception(self.error_messages['invalid_token'])
return self._success_response(msg, session_data={'user_pk': user.pk})
Processing of message with action=select_room
When you receive a message with action=select_room
we should return the list of recent messages of this chat room, but first we need to make sure that the user is authenticated and is a member of this room.
To do this specify _get_room
and process_select_room
methods and add validation of session data to _validate_message
if action
doesn’t equal authenticate
.
class MessageProcessHandler(object):
...
error_messages = {
...
'authentication_required': 'Authentication required before sending any other messages',
'invalid_room_id': 'Invalid room id',
}
def _validate_message(self, msg):
...
if msg['action'] != 'authenticate' and not msg.get('session_data', {}).get('user_pk'):
raise Exception(self.error_messages['authentication_required'])
def _get_room(self, msg):
try:
return ChatRoom.objects.get(pk=msg.get('room'), users__pk=msg['session_data']['user_pk'])
except ChatRoom.DoesNotExist:
raise Exception(self.error_messages['invalid_room_id'])
def process_select_room(self, msg):
room = self._get_room(msg)
messages = reversed(room.messages.all().order_by('-date_created')[:20])
response = {
'room': room.pk.hex,
'room_messages': ChatMessageSerializer(messages, many=True).data,
}
return self._success_response(msg, response=response)
Processing of message with action=new_message
When you receive a message withaction=new_message
, check whether the user is a member of this chat room and if there is a text of the message. These conditions fulfilled, we create a new message. In order to send information about a new message we add send_to
key if the processing is successful, which will store the ids of all the users who should receive this information. Websocket-server will look for all of the connections of these users and send the response to them. To do this, add the text of the error message and process_new_message
method.
class MessageProcessHandler(object):
...
error_messages = {
...
'empty_text': 'Message text can\'t be empty',
}
def process_new_message(self, msg):
room = self._get_room(msg)
send_to = list(room.users.all().values_list('id', flat=True))
if not msg.get('text', '').strip():
raise Exception(self.error_messages['empty_text'])
chat_message = ChatMessage.objects.create(
user_id=msg['session_data']['user_pk'],
text=msg['text'],
room=room,
)
response = {
'room': room.pk.hex,
'message': ChatMessageSerializer(chat_message).data,
}
return self._success_response(msg, response=response, send_to=send_to)
Send response to connections
Let’s consider the situations that may arise. In case of processing error, the connection which has sent us the message should get a response. Exactly the same situation is with messages about authentication and room selection. In other case, all connection that are included in send_to parameter should receive a response. Also, when we receive a successful response from workers, we have to update the session data.
Before sending a message to worker process topic we save it to uuid, i.e. when we receive a response we can find the connection to send a response to. If there is send_to in response, we have to send it to all suitable connections.
To do this, create methods _find_ws_by_message_uuid
, _find_ws_by_send_to
, _update_session
and process_worker_response
in WSApplication
class.
class WSApplication(web.Application):
...
def _find_ws_by_message_uuid(self, msg_uuid):
for ws, ws_data in self.websockets.items():
if msg_uuid in ws_data.get('messages_ids', []):
return ws
def _find_ws_by_send_to(self, send_to):
websockets = []
for ws, ws_data in self.websockets.items():
if ws_data.get('session_data', {}).get('user_pk') in send_to:
websockets.append(ws)
return websockets
def _update_session(self, ws, response_msg):
if response_msg.get('session_data'):
self.websockets[ws]['session_data'] = response_msg['session_data']
async def process_worker_response(self, response_msg):
response = response_msg['response']
msg_uuid = response['uuid']
send_to = response_msg.get('send_to')
self.logger.debug('Processing response for msg with id \'%s\'', msg_uuid)
ws = self._find_ws_by_message_uuid(msg_uuid)
if response_msg['type'] == utils.ERROR_RESPONSE_TYPE:
if ws:
ws.send_str(json.dumps(response))
elif response_msg['type'] == utils.SUCCESS_RESPONSE_TYPE:
self._update_session(ws, response_msg)
if not send_to:
if ws:
ws.send_str(json.dumps(response))
else:
websockets = self._find_ws_by_send_to(send_to)
for ws in websockets:
ws.send_str(json.dumps(response))
Summary
Ultimately, we have created a simple websocket-server, which can be used for writing a chat. If you have any questions please contact us by email and we will be happy to answer!
Projects source code is available on Repository.