1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RESTfulなDjangoのWebsocketライブラリを作った

Posted at

はじめに

DjangoのWebsocketライブラリのchannelsを拡張して,rest_frameworkの便利な機能であるserializerやpermissionをほぼ同じインターフェースで実現できるライブラリrest_framework_channelsを作成しました.作成過程や実装のポイントなどを備忘録も兼ねて書き殴っていけたらと思います.

経緯

とあるDjangoのプロジェクトでWebsocketを使用して双方向の通信を実現したい要件が出てきました.このDjangoのプロジェクトはrest_frameworkを用いたRESTAPIの仕様になっているため,できるだけSerializerやPermissionの機能を使いたいです.

channelsはDjangoのプロジェクトでWebSocketを導入できる素晴らしいライブラリです.しかし,あくまでもWebsocketを導入するためのライブラリであるため,Serializerなどの機能は一切用意されていないという問題があります.

Serializerを使いたい

serializerはDjangoのモデルとclientのリクエストをよしなに処理してくれます.
コーディングの記述量も劇的に減るため,RESTfulなプロジェクトでは是非とも使いたい機能です.

djangochannelsrestframework

はじめはこちらを使っていました.しかし,使っていくうちにConnection単位でしかSerializerやPermissionを設定できないということがわかりました.Connection後に複数のモデルを扱うケースが実運用ではほとんどだと思うので,この仕様は大きな欠点だと思います.一方で,非同期処理周りの実装は非常に参考になりそうです.幸いにもMITライセンスなので,これをベースとした新たなライブラリを作れそうだと思ったのがことのきっかけです.

Channels API

(実は後から見つけました.)が,最終更新が6年前なので最新のchannelsには対応していなさそうです.

It requires Python 2.7 or 3.x, Channels <=1.1.8.1, Django <=1.11, and Django Rest Framework 3.x

実際,対応バージョンが古すぎるので実用的ではなさそうです.

要件整理

以上を踏まえて,実現したいライブラリの要件は以下のようになります.

  • Serializer・Permissionを使えるようにする
  • Serializer・PermissionはConnection単位だけでなくConnection後のやり取りに対しても設定できるようにする
  • できるだけ既存の有名ライブラリの概念を導入する
    • django
    • channels
    • rest_framework

実装

紆余曲折ありましたが,何とか実装できました.ここでは機能といくつかの実装のポイントを紹介できたらと思います.

主な機能

action

actionはConnectionが確立した後のメッセージのやり取りで扱う概念です.(先述のdjangorestframeworkchannelsを踏襲しました.)Consumer内で指定されたactionに対応するメソッドを呼ぶことで,処理を分離することが可能です.
以下がactionの例です.actionは後述のようにDecoratorで定義できるようにしています.

class ParentConsumer(generics.GenericAsyncAPIConsumer):
    serializer_class = TestSerializer
    queryset = TestModel.objects.all()

    @async_action()
    def get_test(self):
        testmodel = self.get_object(self.action) # rest_frameworkのGenericsView同様にget_objectが実装されている
        ...
        return { 'id': testmodel.pk }, 200

このactionはConsumer内のHTTP Requestといえばわかりやすいでしょうか.
以下のメッセージを送ると,定義したget_testメソッドが非同期で呼ばれ

{
    'action': 'get_test',
}

以下のResponseが返ってきます.

{
    'errors': [],
    'data': {
        'id': 1,
    },
    'action': 'get_test',
    'route': '',
    'status': 200,
}

ActionHandler

ActionHandlerはrest_framework_channelsの肝となるクラスで,その名の通りactionを処理する役割を持ちます.

rest_framework_channelsのシーケンス図は以下のようになっていて,従来通りConsumerがConnectionの役割を担い,Consumer内で送られてきたメッセージのactionに基づき,処理を振り分けるようになっています.

image.png

Consumer内で定義した@async_action()に振り分けることで十分な気もしますが,これだと要件の

Serializer・PermissionはConnection単位だけでなくConnection後のやり取りに対しても設定できるようにする

を満たしません.そこでこの要件を満たすのに一役買うのがActionHandlerです.

ActionHandlerはViewやConsumerのようなもの(callback関数としてAsync API Action Handlerの略となる後述のaaahを返します.)です.Consumer内で以下のように定義することで,Connection後のやり取りに対してSerializerやPermissionを設定できるようにしています.既存のpathre_pathを用いて,まるでurlpatternsを定義するのと同じように,Consumer内部でのactionの振り分けを記述できます.我ながらナイスな実装です.

from django.urls import path, re_path
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer
from rest_framework_channels.permissions import IsAuthenticated
from rest_framework_channels.decorators import async_action

class ChildActionHandler(generics.RetrieveAPIActionHandler):
    serializer_class = TestSerializer
    queryset = TestModel.objects.all()
    permission_classes = (IsAuthenticated,)

    @async_action
    def recieve_path(self, *args, **kwargs):
        return {'path': 'handled'}, 200

    @async_action
    def recieve_repath(self, *args, **kwargs):
        return {'repath': 'handled'}, 200

class ParentConsumer(AsyncAPIConsumer):
    # You can define the routing inside the consumer similar with original django's urlpatterns
    routepatterns = [
        path('test_child_route', ChildActionHandler.as_aaah()),
        re_path(
            r'test_child_route/(?P<pk>[-\w]+)/$',
            ChildActionHandler.as_aaah(),
        ),
    ]

以下のようなメッセージを送ると

{
    'action': 'receive_repath', 
    'route': 'test_child_route/1/',
}

receive_repathが呼ばれます.しかも,kwargspk=1が入った状態で呼ばれるため,元のdjangoと同じようなコーディングができるはずです.

generics

rest_framework同様のGenericsを用意しました.使い勝手はほぼ同じであるため,ほとんど違いを意識することなくWebsocketにSerializerやPermissionを導入できます!

以下が実装したGenericsとrest_frameworkの対応表です.

Action Handler Consumer rest_framework's Generics action
GenericAsyncAPIActionHandler GenericAsyncAPIConsumer GenericAPIView n/a
CreateAPIActionHandler CreateAPIConsumer CreateAPIView create
ListAPIActionHandler ListAPIConsumer ListAPIView list
RetrieveAPIActionHandler RetrieveAPIConsumer RetrieveAPIView retrieve
UpdateAPIActionHandler UpdateAPIConsumer UpdateAPIView update/partial_update
DestroyAPIActionHandler DestroyAPIConsumer DestroyAPIView remove
ListCreateAPIActionHandler ListCreateAPIConsumer ListCreateAPIView list/create
RetrieveUpdateAPIActionHandler RetrieveUpdateAPIConsumer RetrieveUpdateAPIView retrieve/update/partial_update
RetrieveDestroyAPIActionHandler RetrieveDestroyAPIConsumer RetrieveDestroyAPIView retrieve/remove
RetrieveUpdateDestroyAPIActionHandler RetrieveUpdateDestroyAPIConsumer RetrieveUpdateDestroyAPIView retrieve/update/partial_update/remove

Serializer

先の例のようにGenericsと組み合わせれば数行で処理を実装できたり,以下のようにget_objectget_serializerを用いて独自処理を定義できます.

class ActionHandler(generics.RetrieveAPIActionHandler):
    serializer_class = TestSerializer
    queryset = TestModel.objects.all()

    @async_action()
    def your_custom_action(self, *args, **kwargs):
        action = kwargs.get('action', 'your_custom_action')
        # get_object must recieve the action unlike original rest_framework
        instance = self.get_object(action)
        serializer = self.get_serializer(instance)

        # your logic here
        ...

        return serializer.data, 200

Permission

個人的に重要だと思っているPermissionです.こちらもオリジナル同様です.

from rest_framework_channels.handlers import AsyncAPIActionHandler
from rest_framework_channels.decorators import async_action
from rest_framework_channels.consumers import AsyncAPIConsumer
from rest_framework_channels.permissions import IsAuthenticated

class ChildActionHandler(AsyncAPIActionHandler):
    permission_classes = (IsAuthenticated,)

    @async_action()
    def your_custom_action(self, pk, *args, **kwargs):
        # Note: you should return the data and status code.
        return { 
            'message': 'your_custom_action will be handled',
            'model_id': pk
        }, 200

class ParentConsumer(AsyncAPIConsumer):
    routepatterns = [
        re_path(
            r'test_child_route/(?P<pk>[-\w]+)/$',
            ChildActionHandler.as_aaah(),
        ),
    ]

認証していないユーザーがyour_custom_actionを指定したメッセージを送信すると,

{
    'data': None,
    'action': 'your_custom_action',
    'route': 'test_child_route/',
    'status': 403,
}

403エラーでしっかり弾いてくれます.

ちなみにdjangorestframeworkchannelsの実装を拝借して,can_connectメソッドも用意しました.以下のようにBasePermissionを継承し,can_connectメソッドを上書きするとそもそものConnectionができるかを評価できます.

from rest_framework_channels.permissions import BasePermission

class StrictAuth(BasePermission):
    async def can_connect(
        self, scope: dict[str, Any], handler: AsyncActionHandler, message=None
    ) -> bool:
        
        return check_have_authority(scope['user'])

Pagination

ページネーションは大きなモデルの一覧を知りたい時に便利な機能です.こちらもこんな感じでほぼ同じ使い方で実装できます.

from rest_framework.pagination import PageNumberPagination
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer

class TestPagination(PageNumberPagination):
    page_size = 10
    page_size_query_param = 'page_size'
    max_page_size = 100

class ChildActionHandler(generics.ListAPIActionHandler):
    serializer_class = TestSerializer
    queryset = TestModel.objects.all()
    pagination_class = TestPagination

class ParentConsumer(AsyncAPIConsumer):

    routepatterns = [
        path('test_child_route/', ChildActionHandler.as_aaah()),
    ]

以下のメッセージを送ると

{
    'action': 'list', # pagination will be used in list action at the almost cases
    'route': 'test_child_route/?page=4',
}

しっかり指定数分を返してくれます.

{
    'errors': [],
    'action': 'list',
    'route': 'test_child_route/?page=4',
    'status': 200,
    'data': {
        'count': 100,
        'next': 'test_child_route/?page=5',
        'previous': 'test_child_route/?page=3',
        'results': [
            {
                'id': 31,
                'title': 'Title31',
                'content': 'Content31',
            },
            {
                'id': 32,
                'title': 'Title32',
                'content': 'Content32',
            },
            ...

            {
                'id': 40,
                'title': 'Title40',
                'content': 'Content40',
            },
        ]
    }
}

実装のポイント

今回の実装で勉強になった点を実装のポイントとして書いていきたいと思います.

metaclass

Pythonのmetaclassではこちらの記事にあるようにクラスの定義チェックを行うことができます.

メタクラスを利用することで,クラスを実装した時点でそのクラスの定義チェックを行えます
メタクラスの使いどころ
(中略)
クラス変数のチェックをしたい(今回のコード例)
インスタンス生成時に必ず特定の関数を実行したい.(デシリアライズ時にジェネリックに行えるよう,シリアライズ時にクラスの存在を登録したいときなど)

今回は上記性質を活かして,ActionHandler内で
①どのメソッドがActionを処理するメソッドか(=@async_actionのdecoratorで囲まれているか)を保存する,
routepatternsに定義されたルーティング情報の登録を行う
ことをしています.

これらは1回の処理で十分で,保守的な観点からクラス変数として共通の情報を持たせた方が良いです.そのような理由からmetaclassとして定義することが望ましいということですね.
一見__init____new__で良いじゃないかと思うかもしれませんが,__init____new__だとインスタンスを生成する時に毎回行われてしまいますので,やはり今回はmetaclassが適任のようです.

class APIActionHandlerMetaclass(type):
    """
    Metaclass that records action and route methods
    """

    def __new__(mcs, name, bases, body):
        cls = type.__new__(mcs, name, bases, body)

        cls.available_actions = {}
        cls.routing = RoutingManager()
        # ①の処理
        for method_name in dir(cls):
            attr = getattr(cls, method_name)
            is_action = getattr(attr, 'is_action', False)
            if is_action:
                kwargs = getattr(attr, 'kwargs', {})
                name = kwargs.get('name', method_name)
                cls.available_actions[name] = method_name
        # ②の処理
        for route in getattr(cls, 'routepatterns', []):

            pattern = route.pattern
            if isinstance(pattern, RegexPattern):
                arg = pattern._regex
            elif isinstance(pattern, RoutePattern):
                arg = pattern._route
            else:
                raise ValueError(f'Unsupported pattern type: {type(pattern)}')
            route.pattern = pattern.__class__(arg, pattern.name, is_endpoint=False)

            if not route.callback and isinstance(route, URLResolver):
                raise ImproperlyConfigured(f'{route}: include() is not supported.')

            assert isinstance(route, URLPattern)
            cls.routing.append(route)

        return cls

class AsyncActionHandler(metaclass=APIActionHandlerMetaclass):
    """
    Action Handler class

    Note: This class is "Action Handler" NOT Consumer
    When you want to use this as consumer, use consumers.AsyncAPIConsumer instead
    """
    ...

decorator

どのメソッドがActionを処理するメソッドか

を判断するためにdecoratorを用いています.

こちらの「デコレータに引数を渡す」と同じ処理を行っています.今回は引数は不要ですが,今後追加する可能性があるので,下記の実装のようにネストさせてasync_action**kwargsがdecoratorに渡される引数,action_wrapperfuncが実際に囲まれたメソッド(=Actionを処理する関数),そのメソッドをWebsocketの処理としてAsync化しているのがasync_functionということになります.(ややこしい)

そして,decorator内で渡されるfuncis_actionkwargsをセットし,この情報から先述のmetaclassでメソッドがactionを処理するものかを判定しているわけです.

ちなみに,wrapsはメタ情報を正しく書きかえるdecoratorです.参考

import asyncio
from functools import wraps

from channels.db import database_sync_to_async


def async_action(**kwargs):
    """Set the method as async action.
    Note that use `async_to_sync`
    if you call the method decorated by this decorator in sync method
    """

    def action_wrapper(func):
        func.is_action = True
        func.kwargs = kwargs

        if asyncio.iscoroutinefunction(func):
            return func

        # convert sync to async function
        @wraps(func)
        async def async_function(self, *args, **kwargs):
            response = await database_sync_to_async(func)(self, *args, **kwargs)
            return response

        async_function.is_action = True
        async_function.kwargs = kwargs

        return async_function

    return action_wrapper

as_aaah

rest_framework_channelsはConsumer(=ActionHandlerでもある)内に定義されたroutepatternsを先述のmetaclass内で解析し,Actionの処理を振り分けるルーティング機能を持っています.このroutepatternsはDjangoのurlpatternsとほぼ同様にpathre_pathを用いれるようにしています.これもナイスな実装です

from rest_framework.pagination import PageNumberPagination
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer

class TestPagination(PageNumberPagination):
    page_size = 10
    page_size_query_param = 'page_size'
    max_page_size = 100

class ChildActionHandler(generics.ListAPIActionHandler):
    serializer_class = TestSerializer
    queryset = TestModel.objects.all()
    pagination_class = TestPagination

class ParentConsumer(AsyncAPIConsumer):

    routepatterns = [
        path('test_child_route/', ChildActionHandler.as_aaah()),
    ]

このpathの2番目の引数に渡しているas_aaahがActionHandler用に新たに導入したものになります.

pathre_pathは,2番目の引数にCallback関数を受け付けるようになっていて(公式ドキュメントではviewとなっています),最終的にURLPatternインスタンスを返す関数です.

このURLPatternをこちらのRoutingManagerで,与えられたrouteにマッチするものがあるかを判定します.マッチするものがあれば,引数として渡したCallback関数がURLPatterncallback変数に保存されているので,そちらを呼び出しているという仕組みです.
この処理はchannelsのルーティングのコードを参考にしていて,おそらくDjangoのルーティングも同様の処理を行っているのではないかと思います.

(余談ですが,channelsのルーティングではincludeを使えないので,使えるようにするPRを出しています.マージされると嬉しい...)

from __future__ import annotations

from typing import TYPE_CHECKING, Any, Callable

from django.urls import URLPattern
from django.urls.exceptions import Resolver404
from rest_framework.exceptions import NotFound

if TYPE_CHECKING:
    from .handlers import AsyncActionHandler


class RoutingManager:
    def __init__(self):
        self.routes: list[URLPattern] = []

    def append(self, pattern: URLPattern) -> None:
        """Add new route to be managed

        Parameters
        ----------
        pattern: URLPattern
            The URLPattern instance
        """
        self.routes.append(pattern)

    # Any is intended for avoiding vscode's testing error due to circular import
    async def resolve(
        self, route: str, scope: dict, receive: Callable, send: Callable, **kwargs
    ) -> AsyncActionHandler:
        """Resolve a given route

        Parameters
        ----------
        route : str
            The route path to be resolved
        scope : dict
            The scope dict
        receive : Callable
            The recieve function to be passed into a matched action handler
        send : Callable
            The send function to be passed into a matched action handler

        Returns
        -------
        AsyncActionHandler
            The matched action handler
        """

        for routing in self.routes:
            try:
                match = routing.pattern.match(route)
                if match:
                    new_path, args, kwargs = match

                    # Add args or kwargs into the scope
                    outer = scope.get('url_route', {})
                    handler = routing.callback
                    return await handler(
                        dict(
                            scope,
                            path_remaining=new_path,
                            url_route={
                                'args': outer.get('args', ()) + args,
                                'kwargs': {**outer.get('kwargs', {}), **kwargs},
                            },
                            route=route
                        ),
                        receive,
                        send,
                    )
            except Resolver404:
                pass

        raise NotFound(f'No route found: {route}')

そして,as_aaahは以下のようになっていて,ActionHandlerのclassmethodとして定義しています.

class AsyncActionHandler(metaclass=APIActionHandlerMetaclass):
    """
    Action Handler class

    Note: This class is "Action Handler" NOT Consumer
    When you want to use this as consumer, use consumers.AsyncAPIConsumer instead
    """

    ...
    
    @classmethod
    def as_aaah(cls, **initkwargs) -> Self:
        """
        Return an Async API Action Handler (not scream) single callable that
        instantiates a action handler instance per scope.
        Similar in purpose to Django's as_view().

        initkwargs will be used to instantiate the action handler instance.
        """

        async def app(scope, receive, send):
            handler = cls(**initkwargs)
            return await handler(scope, receive, send)

        app.handler_class = cls
        app.handler_initkwargs = initkwargs

        # take name and docstring from class
        update_wrapper(app, cls, updated=())
        return app

ポイントは自身のActionHandlerを返すのではなく,非同期に自身のインスタンスを作成できる関数のapp関数を返している点です.これにより,Callback関数にこのappが保存されることになります.マッチした際に以下の部分でActionHandlerのインスタンスが作成されるようになっていて,Connection時のscope情報やメッセージのroute情報などをインスタンスに保存できるという仕組みです.

    async def resolve(
        self, route: str, scope: dict, receive: Callable, send: Callable, **kwargs
    ) -> AsyncActionHandler:

        ...

                return await handler(
                        dict(
                            scope,
                            path_remaining=new_path,
                            url_route={
                                'args': outer.get('args', ()) + args,
                                'kwargs': {**outer.get('kwargs', {}), **kwargs},
                            },
                            route=route
                        ),
                        receive,
                        send,
                    )

sphinxでのドキュメンテーション

今回,ドキュメンテーションもそれっぽいものにするために少し頑張ってみました.よくあるsphinxを導入しています.英語が伝わるかは知りませんが,それっぽさは十分です.

cd sphinx
sudo apt-get -y install plantuml
pip install \
sphinx 
myst_parser # markdownでの記述を可能にする拡張機能
sphinx_rtd_theme # モダンなUIにする拡張機能
sphinxcontrib-plantuml # plantumlの描画を可能にする拡張機能
sphinx-autobuild # 自動Buildする拡張機能
sphinxcontrib-napoleon # Docstringを読子m流

以下のコマンドで初期設定を行いました.基本的にこのまま進めて良かったはずです.

cd sphinx
sphinx-quickstart

出来上がるconf.pyに以下のように設定して

conf.py
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
import os
import sys

sys.path.insert(0, os.path.abspath('../'))
import rest_framework_channels


# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information

project = 'rest_framework_channels'
copyright = '2024, jjjkkkjjj'
author = 'jjjkkkjjj'
release = rest_framework_channels.__version__

# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration

extensions = [
    'sphinx.ext.autodoc',
    'sphinx.ext.napoleon',
    'sphinx.ext.githubpages',
    'sphinx.ext.autosectionlabel',
    'myst_parser',
    'sphinxcontrib.plantuml',
]
plantuml_output_format = 'png'
plantuml_syntax_error_image = True

templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'tests']

source_suffix = {
    '.rst': 'restructuredtext',
    '.md': 'markdown',
}

# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output

html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']

ソースコードからドキュメントを自動生成するようにします.

sphinx-apidoc -f -o ./source ../rest_framework_channels

そして,sphinxフォルダ直下のindex.rstindex.mdに変えて適宜ファイルを読み込めるようにしました.

index.md
# rest_framework_channels documentation

% rest_framework_channels documentation master file, created by
% sphinx-quickstart on Mon Aug 12 15:53:43 2024.
% You can adapt this file completely to your liking, but it should at least
% contain the root `toctree` directive.

```{toctree}
:maxdepth: 2
:caption: Contents

./docs/00_introduction/index
./docs/01_installation/index
./docs/02_rest_framework/index
./source/modules
```

## Indices and tables

* {ref}`genindex`
* {ref}`modindex`
* {ref}`search`

```{include} ../README.md
```

Markdownでdirectiveを設定するには

rst
.. toctree::
   :maxdepth: 2
   :caption: Contents:

のような場合

Markdown
```{toctree}
:maxdepth: 2
:caption: Contents
```

とすれば良いそうです.

おわりに

まだプロジェクトで動くかは試していないのでこれからな部分はあると思います.しかし,テストは通っていますし,正しく運用できたら結構便利なライブラリとなっているのではないかなと思っています.もし良かったら是非使っていただき,バグなどを報告いただけると嬉しいなと思います.

また,今回の開発を通してPythonの結構コアな部分も理解できて良かったなと思います.

参考

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?