はじめに
DjangoのWebsocketライブラリのchannelsを拡張して,rest_frameworkの便利な機能であるserializerやpermissionをほぼ同じインターフェースで実現できるライブラリrest_framework_channelsを作成しました.作成過程や実装のポイントなどを備忘録も兼ねて書き殴っていけたらと思います.
経緯
とあるDjangoのプロジェクトでWebsocketを使用して双方向の通信を実現したい要件が出てきました.このDjangoのプロジェクトはrest_frameworkを用いたRESTAPIの仕様になっているため,できるだけSerializerやPermissionの機能を使いたいです.
channelsはDjangoのプロジェクトでWebSocketを導入できる素晴らしいライブラリです.しかし,あくまでもWebsocketを導入するためのライブラリであるため,Serializerなどの機能は一切用意されていないという問題があります.
Serializerを使いたい
serializerはDjangoのモデルとclientのリクエストをよしなに処理してくれます.
コーディングの記述量も劇的に減るため,RESTfulなプロジェクトでは是非とも使いたい機能です.
djangochannelsrestframework
はじめはこちらを使っていました.しかし,使っていくうちにConnection単位でしかSerializerやPermissionを設定できないということがわかりました.Connection後に複数のモデルを扱うケースが実運用ではほとんどだと思うので,この仕様は大きな欠点だと思います.一方で,非同期処理周りの実装は非常に参考になりそうです.幸いにもMITライセンスなので,これをベースとした新たなライブラリを作れそうだと思ったのがことのきっかけです.
Channels API
(実は後から見つけました.)が,最終更新が6年前なので最新のchannelsには対応していなさそうです.
It requires Python 2.7 or 3.x, Channels <=1.1.8.1, Django <=1.11, and Django Rest Framework 3.x
実際,対応バージョンが古すぎるので実用的ではなさそうです.
要件整理
以上を踏まえて,実現したいライブラリの要件は以下のようになります.
- Serializer・Permissionを使えるようにする
- Serializer・PermissionはConnection単位だけでなくConnection後のやり取りに対しても設定できるようにする
- できるだけ既存の有名ライブラリの概念を導入する
- django
- channels
- rest_framework
実装
紆余曲折ありましたが,何とか実装できました.ここでは機能といくつかの実装のポイントを紹介できたらと思います.
主な機能
action
actionはConnectionが確立した後のメッセージのやり取りで扱う概念です.(先述のdjangorestframeworkchannelsを踏襲しました.)Consumer内で指定されたactionに対応するメソッドを呼ぶことで,処理を分離することが可能です.
以下がactionの例です.actionは後述のようにDecoratorで定義できるようにしています.
class ParentConsumer(generics.GenericAsyncAPIConsumer):
serializer_class = TestSerializer
queryset = TestModel.objects.all()
@async_action()
def get_test(self):
testmodel = self.get_object(self.action) # rest_frameworkのGenericsView同様にget_objectが実装されている
...
return { 'id': testmodel.pk }, 200
このactionはConsumer内のHTTP Requestといえばわかりやすいでしょうか.
以下のメッセージを送ると,定義したget_test
メソッドが非同期で呼ばれ
{
'action': 'get_test',
}
以下のResponseが返ってきます.
{
'errors': [],
'data': {
'id': 1,
},
'action': 'get_test',
'route': '',
'status': 200,
}
ActionHandler
ActionHandler
はrest_framework_channelsの肝となるクラスで,その名の通りactionを処理する役割を持ちます.
rest_framework_channelsのシーケンス図は以下のようになっていて,従来通りConsumerがConnectionの役割を担い,Consumer内で送られてきたメッセージのactionに基づき,処理を振り分けるようになっています.
Consumer内で定義した@async_action()
に振り分けることで十分な気もしますが,これだと要件の
Serializer・PermissionはConnection単位だけでなくConnection後のやり取りに対しても設定できるようにする
を満たしません.そこでこの要件を満たすのに一役買うのがActionHandler
です.
ActionHandler
はViewやConsumerのようなもの(callback関数としてAsync API Action Handlerの略となる後述のaaahを返します.)です.Consumer内で以下のように定義することで,Connection後のやり取りに対してSerializerやPermissionを設定できるようにしています.既存のpath
やre_path
を用いて,まるでurlpatterns
を定義するのと同じように,Consumer内部でのactionの振り分けを記述できます.我ながらナイスな実装です.
from django.urls import path, re_path
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer
from rest_framework_channels.permissions import IsAuthenticated
from rest_framework_channels.decorators import async_action
class ChildActionHandler(generics.RetrieveAPIActionHandler):
serializer_class = TestSerializer
queryset = TestModel.objects.all()
permission_classes = (IsAuthenticated,)
@async_action
def recieve_path(self, *args, **kwargs):
return {'path': 'handled'}, 200
@async_action
def recieve_repath(self, *args, **kwargs):
return {'repath': 'handled'}, 200
class ParentConsumer(AsyncAPIConsumer):
# You can define the routing inside the consumer similar with original django's urlpatterns
routepatterns = [
path('test_child_route', ChildActionHandler.as_aaah()),
re_path(
r'test_child_route/(?P<pk>[-\w]+)/$',
ChildActionHandler.as_aaah(),
),
]
以下のようなメッセージを送ると
{
'action': 'receive_repath',
'route': 'test_child_route/1/',
}
receive_repath
が呼ばれます.しかも,kwargs
にpk=1
が入った状態で呼ばれるため,元のdjangoと同じようなコーディングができるはずです.
generics
rest_framework同様のGenericsを用意しました.使い勝手はほぼ同じであるため,ほとんど違いを意識することなくWebsocketにSerializerやPermissionを導入できます!
以下が実装したGenericsとrest_frameworkの対応表です.
Action Handler | Consumer | rest_framework's Generics | action |
---|---|---|---|
GenericAsyncAPIActionHandler | GenericAsyncAPIConsumer | GenericAPIView | n/a |
CreateAPIActionHandler | CreateAPIConsumer | CreateAPIView | create |
ListAPIActionHandler | ListAPIConsumer | ListAPIView | list |
RetrieveAPIActionHandler | RetrieveAPIConsumer | RetrieveAPIView | retrieve |
UpdateAPIActionHandler | UpdateAPIConsumer | UpdateAPIView | update/partial_update |
DestroyAPIActionHandler | DestroyAPIConsumer | DestroyAPIView | remove |
ListCreateAPIActionHandler | ListCreateAPIConsumer | ListCreateAPIView | list/create |
RetrieveUpdateAPIActionHandler | RetrieveUpdateAPIConsumer | RetrieveUpdateAPIView | retrieve/update/partial_update |
RetrieveDestroyAPIActionHandler | RetrieveDestroyAPIConsumer | RetrieveDestroyAPIView | retrieve/remove |
RetrieveUpdateDestroyAPIActionHandler | RetrieveUpdateDestroyAPIConsumer | RetrieveUpdateDestroyAPIView | retrieve/update/partial_update/remove |
Serializer
先の例のようにGenericsと組み合わせれば数行で処理を実装できたり,以下のようにget_object
やget_serializer
を用いて独自処理を定義できます.
class ActionHandler(generics.RetrieveAPIActionHandler):
serializer_class = TestSerializer
queryset = TestModel.objects.all()
@async_action()
def your_custom_action(self, *args, **kwargs):
action = kwargs.get('action', 'your_custom_action')
# get_object must recieve the action unlike original rest_framework
instance = self.get_object(action)
serializer = self.get_serializer(instance)
# your logic here
...
return serializer.data, 200
Permission
個人的に重要だと思っているPermissionです.こちらもオリジナル同様です.
from rest_framework_channels.handlers import AsyncAPIActionHandler
from rest_framework_channels.decorators import async_action
from rest_framework_channels.consumers import AsyncAPIConsumer
from rest_framework_channels.permissions import IsAuthenticated
class ChildActionHandler(AsyncAPIActionHandler):
permission_classes = (IsAuthenticated,)
@async_action()
def your_custom_action(self, pk, *args, **kwargs):
# Note: you should return the data and status code.
return {
'message': 'your_custom_action will be handled',
'model_id': pk
}, 200
class ParentConsumer(AsyncAPIConsumer):
routepatterns = [
re_path(
r'test_child_route/(?P<pk>[-\w]+)/$',
ChildActionHandler.as_aaah(),
),
]
認証していないユーザーがyour_custom_action
を指定したメッセージを送信すると,
{
'data': None,
'action': 'your_custom_action',
'route': 'test_child_route/',
'status': 403,
}
403エラーでしっかり弾いてくれます.
ちなみにdjangorestframeworkchannelsの実装を拝借して,can_connect
メソッドも用意しました.以下のようにBasePermission
を継承し,can_connect
メソッドを上書きするとそもそものConnectionができるかを評価できます.
from rest_framework_channels.permissions import BasePermission
class StrictAuth(BasePermission):
async def can_connect(
self, scope: dict[str, Any], handler: AsyncActionHandler, message=None
) -> bool:
return check_have_authority(scope['user'])
Pagination
ページネーションは大きなモデルの一覧を知りたい時に便利な機能です.こちらもこんな感じでほぼ同じ使い方で実装できます.
from rest_framework.pagination import PageNumberPagination
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer
class TestPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class ChildActionHandler(generics.ListAPIActionHandler):
serializer_class = TestSerializer
queryset = TestModel.objects.all()
pagination_class = TestPagination
class ParentConsumer(AsyncAPIConsumer):
routepatterns = [
path('test_child_route/', ChildActionHandler.as_aaah()),
]
以下のメッセージを送ると
{
'action': 'list', # pagination will be used in list action at the almost cases
'route': 'test_child_route/?page=4',
}
しっかり指定数分を返してくれます.
{
'errors': [],
'action': 'list',
'route': 'test_child_route/?page=4',
'status': 200,
'data': {
'count': 100,
'next': 'test_child_route/?page=5',
'previous': 'test_child_route/?page=3',
'results': [
{
'id': 31,
'title': 'Title31',
'content': 'Content31',
},
{
'id': 32,
'title': 'Title32',
'content': 'Content32',
},
...
{
'id': 40,
'title': 'Title40',
'content': 'Content40',
},
]
}
}
実装のポイント
今回の実装で勉強になった点を実装のポイントとして書いていきたいと思います.
metaclass
Pythonのmetaclassではこちらの記事にあるようにクラスの定義チェックを行うことができます.
メタクラスを利用することで,クラスを実装した時点でそのクラスの定義チェックを行えます
メタクラスの使いどころ
(中略)
クラス変数のチェックをしたい(今回のコード例)
インスタンス生成時に必ず特定の関数を実行したい.(デシリアライズ時にジェネリックに行えるよう,シリアライズ時にクラスの存在を登録したいときなど)
今回は上記性質を活かして,ActionHandler
内で
①どのメソッドがActionを処理するメソッドか(=@async_action
のdecoratorで囲まれているか)を保存する,
②routepatterns
に定義されたルーティング情報の登録を行う
ことをしています.
これらは1回の処理で十分で,保守的な観点からクラス変数として共通の情報を持たせた方が良いです.そのような理由からmetaclassとして定義することが望ましいということですね.
一見__init__
や__new__
で良いじゃないかと思うかもしれませんが,__init__
や__new__
だとインスタンスを生成する時に毎回行われてしまいますので,やはり今回はmetaclassが適任のようです.
class APIActionHandlerMetaclass(type):
"""
Metaclass that records action and route methods
"""
def __new__(mcs, name, bases, body):
cls = type.__new__(mcs, name, bases, body)
cls.available_actions = {}
cls.routing = RoutingManager()
# ①の処理
for method_name in dir(cls):
attr = getattr(cls, method_name)
is_action = getattr(attr, 'is_action', False)
if is_action:
kwargs = getattr(attr, 'kwargs', {})
name = kwargs.get('name', method_name)
cls.available_actions[name] = method_name
# ②の処理
for route in getattr(cls, 'routepatterns', []):
pattern = route.pattern
if isinstance(pattern, RegexPattern):
arg = pattern._regex
elif isinstance(pattern, RoutePattern):
arg = pattern._route
else:
raise ValueError(f'Unsupported pattern type: {type(pattern)}')
route.pattern = pattern.__class__(arg, pattern.name, is_endpoint=False)
if not route.callback and isinstance(route, URLResolver):
raise ImproperlyConfigured(f'{route}: include() is not supported.')
assert isinstance(route, URLPattern)
cls.routing.append(route)
return cls
class AsyncActionHandler(metaclass=APIActionHandlerMetaclass):
"""
Action Handler class
Note: This class is "Action Handler" NOT Consumer
When you want to use this as consumer, use consumers.AsyncAPIConsumer instead
"""
...
decorator
どのメソッドがActionを処理するメソッドか
を判断するためにdecoratorを用いています.
こちらの「デコレータに引数を渡す」と同じ処理を行っています.今回は引数は不要ですが,今後追加する可能性があるので,下記の実装のようにネストさせてasync_action
の**kwargs
がdecoratorに渡される引数,action_wrapper
のfunc
が実際に囲まれたメソッド(=Actionを処理する関数),そのメソッドをWebsocketの処理としてAsync化しているのがasync_function
ということになります.(ややこしい)
そして,decorator内で渡されるfunc
にis_action
とkwargs
をセットし,この情報から先述のmetaclassでメソッドがactionを処理するものかを判定しているわけです.
ちなみに,wraps
はメタ情報を正しく書きかえるdecoratorです.参考
import asyncio
from functools import wraps
from channels.db import database_sync_to_async
def async_action(**kwargs):
"""Set the method as async action.
Note that use `async_to_sync`
if you call the method decorated by this decorator in sync method
"""
def action_wrapper(func):
func.is_action = True
func.kwargs = kwargs
if asyncio.iscoroutinefunction(func):
return func
# convert sync to async function
@wraps(func)
async def async_function(self, *args, **kwargs):
response = await database_sync_to_async(func)(self, *args, **kwargs)
return response
async_function.is_action = True
async_function.kwargs = kwargs
return async_function
return action_wrapper
as_aaah
rest_framework_channelsはConsumer(=ActionHandlerでもある)内に定義されたroutepatterns
を先述のmetaclass内で解析し,Actionの処理を振り分けるルーティング機能を持っています.このroutepatterns
はDjangoのurlpatterns
とほぼ同様にpath
とre_path
を用いれるようにしています.これもナイスな実装です
from rest_framework.pagination import PageNumberPagination
from rest_framework_channels import generics
from rest_framework_channels.consumers import AsyncAPIConsumer
class TestPagination(PageNumberPagination):
page_size = 10
page_size_query_param = 'page_size'
max_page_size = 100
class ChildActionHandler(generics.ListAPIActionHandler):
serializer_class = TestSerializer
queryset = TestModel.objects.all()
pagination_class = TestPagination
class ParentConsumer(AsyncAPIConsumer):
routepatterns = [
path('test_child_route/', ChildActionHandler.as_aaah()),
]
このpath
の2番目の引数に渡しているas_aaah
がActionHandler用に新たに導入したものになります.
path
やre_path
は,2番目の引数にCallback関数を受け付けるようになっていて(公式ドキュメントではviewとなっています),最終的にURLPattern
インスタンスを返す関数です.
このURLPattern
をこちらのRoutingManager
で,与えられたroute
にマッチするものがあるかを判定します.マッチするものがあれば,引数として渡したCallback関数がURLPattern
のcallback
変数に保存されているので,そちらを呼び出しているという仕組みです.
この処理はchannelsのルーティングのコードを参考にしていて,おそらくDjangoのルーティングも同様の処理を行っているのではないかと思います.
(余談ですが,channelsのルーティングではinclude
を使えないので,使えるようにするPRを出しています.マージされると嬉しい...)
from __future__ import annotations
from typing import TYPE_CHECKING, Any, Callable
from django.urls import URLPattern
from django.urls.exceptions import Resolver404
from rest_framework.exceptions import NotFound
if TYPE_CHECKING:
from .handlers import AsyncActionHandler
class RoutingManager:
def __init__(self):
self.routes: list[URLPattern] = []
def append(self, pattern: URLPattern) -> None:
"""Add new route to be managed
Parameters
----------
pattern: URLPattern
The URLPattern instance
"""
self.routes.append(pattern)
# Any is intended for avoiding vscode's testing error due to circular import
async def resolve(
self, route: str, scope: dict, receive: Callable, send: Callable, **kwargs
) -> AsyncActionHandler:
"""Resolve a given route
Parameters
----------
route : str
The route path to be resolved
scope : dict
The scope dict
receive : Callable
The recieve function to be passed into a matched action handler
send : Callable
The send function to be passed into a matched action handler
Returns
-------
AsyncActionHandler
The matched action handler
"""
for routing in self.routes:
try:
match = routing.pattern.match(route)
if match:
new_path, args, kwargs = match
# Add args or kwargs into the scope
outer = scope.get('url_route', {})
handler = routing.callback
return await handler(
dict(
scope,
path_remaining=new_path,
url_route={
'args': outer.get('args', ()) + args,
'kwargs': {**outer.get('kwargs', {}), **kwargs},
},
route=route
),
receive,
send,
)
except Resolver404:
pass
raise NotFound(f'No route found: {route}')
そして,as_aaah
は以下のようになっていて,ActionHandlerのclassmethodとして定義しています.
class AsyncActionHandler(metaclass=APIActionHandlerMetaclass):
"""
Action Handler class
Note: This class is "Action Handler" NOT Consumer
When you want to use this as consumer, use consumers.AsyncAPIConsumer instead
"""
...
@classmethod
def as_aaah(cls, **initkwargs) -> Self:
"""
Return an Async API Action Handler (not scream) single callable that
instantiates a action handler instance per scope.
Similar in purpose to Django's as_view().
initkwargs will be used to instantiate the action handler instance.
"""
async def app(scope, receive, send):
handler = cls(**initkwargs)
return await handler(scope, receive, send)
app.handler_class = cls
app.handler_initkwargs = initkwargs
# take name and docstring from class
update_wrapper(app, cls, updated=())
return app
ポイントは自身のActionHandlerを返すのではなく,非同期に自身のインスタンスを作成できる関数のapp
関数を返している点です.これにより,Callback関数にこのapp
が保存されることになります.マッチした際に以下の部分でActionHandlerのインスタンスが作成されるようになっていて,Connection時のscope
情報やメッセージのroute
情報などをインスタンスに保存できるという仕組みです.
async def resolve(
self, route: str, scope: dict, receive: Callable, send: Callable, **kwargs
) -> AsyncActionHandler:
...
return await handler(
dict(
scope,
path_remaining=new_path,
url_route={
'args': outer.get('args', ()) + args,
'kwargs': {**outer.get('kwargs', {}), **kwargs},
},
route=route
),
receive,
send,
)
sphinxでのドキュメンテーション
今回,ドキュメンテーションもそれっぽいものにするために少し頑張ってみました.よくあるsphinxを導入しています.英語が伝わるかは知りませんが,それっぽさは十分です.
cd sphinx
sudo apt-get -y install plantuml
pip install \
sphinx
myst_parser # markdownでの記述を可能にする拡張機能
sphinx_rtd_theme # モダンなUIにする拡張機能
sphinxcontrib-plantuml # plantumlの描画を可能にする拡張機能
sphinx-autobuild # 自動Buildする拡張機能
sphinxcontrib-napoleon # Docstringを読子m流
以下のコマンドで初期設定を行いました.基本的にこのまま進めて良かったはずです.
cd sphinx
sphinx-quickstart
出来上がるconf.py
に以下のように設定して
# Configuration file for the Sphinx documentation builder.
#
# For the full list of built-in configuration values, see the documentation:
# https://www.sphinx-doc.org/en/master/usage/configuration.html
import os
import sys
sys.path.insert(0, os.path.abspath('../'))
import rest_framework_channels
# -- Project information -----------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information
project = 'rest_framework_channels'
copyright = '2024, jjjkkkjjj'
author = 'jjjkkkjjj'
release = rest_framework_channels.__version__
# -- General configuration ---------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.napoleon',
'sphinx.ext.githubpages',
'sphinx.ext.autosectionlabel',
'myst_parser',
'sphinxcontrib.plantuml',
]
plantuml_output_format = 'png'
plantuml_syntax_error_image = True
templates_path = ['_templates']
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store', 'tests']
source_suffix = {
'.rst': 'restructuredtext',
'.md': 'markdown',
}
# -- Options for HTML output -------------------------------------------------
# https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output
html_theme = 'sphinx_rtd_theme'
html_static_path = ['_static']
ソースコードからドキュメントを自動生成するようにします.
sphinx-apidoc -f -o ./source ../rest_framework_channels
そして,sphinx
フォルダ直下のindex.rst
をindex.md
に変えて適宜ファイルを読み込めるようにしました.
# rest_framework_channels documentation
% rest_framework_channels documentation master file, created by
% sphinx-quickstart on Mon Aug 12 15:53:43 2024.
% You can adapt this file completely to your liking, but it should at least
% contain the root `toctree` directive.
```{toctree}
:maxdepth: 2
:caption: Contents
./docs/00_introduction/index
./docs/01_installation/index
./docs/02_rest_framework/index
./source/modules
```
## Indices and tables
* {ref}`genindex`
* {ref}`modindex`
* {ref}`search`
```{include} ../README.md
```
Markdownでdirectiveを設定するには
.. toctree::
:maxdepth: 2
:caption: Contents:
のような場合
```{toctree}
:maxdepth: 2
:caption: Contents
```
とすれば良いそうです.
おわりに
まだプロジェクトで動くかは試していないのでこれからな部分はあると思います.しかし,テストは通っていますし,正しく運用できたら結構便利なライブラリとなっているのではないかなと思っています.もし良かったら是非使っていただき,バグなどを報告いただけると嬉しいなと思います.
また,今回の開発を通してPythonの結構コアな部分も理解できて良かったなと思います.