仕上がり
目次
1. 通知機能の実装
2. リアルタイムでFeedを更新する機能 (WebSockets + Channels)
3. DM機能の実装
1.通知機能の実装
1. python manage.py startapp notifications
2. settings.pyに追記
3. Notificationモデルの作成
4. リアルタイムでFeedを更新する機能 (WebSockets + Channels)
# backend/notifications/models.py
from django.db import models
from django.contrib.auth import get_user_model
User = get_user_model()
class Notification(models.Model):
recipient = models.ForeignKey(User, on_delete=models.CASCADE, related_name="notifications")
sender = models.ForeignKey(User, on_delete=models.CASCADE, related_name="sent_notifications", null=True, blank=True)
# 通知を発生させた人(likeやcommentした人など)
# 「すべての通知が「誰(ユーザー)から」発生したとは限らない」という設計上の理由でnull,blankを設定
message = models.CharField(max_length=255)
is_read = models.BooleanField(default=False)
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"Notification to {self.recipient.username} from {self.sender.username} : {self.message[:20]}"
4. Likeで通知の設定
# backend/likes/views.py
from rest_framework.views import APIView
from rest_framework import permissions, status
from rest_framework.response import Response
from tweets.models import Tweet
from notifications.models import Notification
from.serializers import LikeSerializer
from.models import Like
class LikeToggleAPIView(APIView):
permission_classes = [permissions.IsAuthenticated]
def post(self, request, tweet_id):
try:
tweet = Tweet.objects.get(id=tweet_id)
except Tweet.DoesNotExist:
return Response({"error": "Tweet not found."}, status=status.HTTP_404_NOT_FOUND)
like, created = Like.objects.get_or_create(user=request.user, tweet=tweet)
if created:
serializer = LikeSerializer(like)
# 通知を作成 ↓ ↓ ↓ ↓ ↓
Notification.objects.create(
recipient = tweet.user,
sender = request.user,
message = f"{request.user.username} liked your tweet!",
)
return Response({
"message": "Tweet liked successfully.",
"likes_count": tweet.likes.count(),
"like": serializer.data
}, status=status.HTTP_201_CREATED)
else:
return Response({"detail": "Already liked."}, status=status.HTTP_200_OK)
def delete(self, request, tweet_id):
try:
tweet = Tweet.objects.get(id=tweet_id)
like = Like.objects.get(user=request.user, tweet=tweet)
like.delete()
return Response({
"message":"Like removed.",
"likes_count": tweet.likes.count(),
}, status=status.HTTP_200_OK)
except Tweet.DoesNotExist:
return Response({"error": "Tweet not found."}, status=status.HTTP_404_NOT_FOUND)
except Like.DoesNotExist:
return Response({"error": "Like does not exist."}, status=status.HTTP_400_BAD_REQUEST)
5. 未読 / 既読の管理
APIで既読管理
# backend/notifications/views.py
from rest_framework import generics, permissions, status
from rest_framework.response import Response
from .models import Notification
from .serializers import NotificationSerializer
class NotificationListView(generics.ListAPIView):
permission_classes = [permissions.IsAuthenticated]
serializer_class = NotificationSerializer
def get_queryset(self):
# 未読だけを返す
return Notification.objects.filter(recipient=self.request.user, is_read=False).order_by("-created_at")
class MarkAllAsReadView(generics.UpdateAPIView):
permission_classes = [permissions.IsAuthenticated]
def update(self, request, *args, **kwargs):
Notification.objects.filter(recipient=self.request.user, is_read=False).update(is_read=True)
return Response({"detail": "All notifications marked as read"}, status=status.HTTP_200_OK)
class MarkOneAsReadView(generics.UpdateAPIView):
permission_classes = [permissions.IsAuthenticated]
serializer_class = NotificationSerializer # 必要なら
def patch(self, request, pk=None):
# pk で特定の通知を既読にする
notif = Notification.objects.filter(recipient=self.request.user, pk=pk).first()
if not notif:
return Response({"error": "Notification not found"}, status=status.HTTP_404_NOT_FOUND)
notif.is_read = True
notif.save()
return Response({"detail": f"Notification {pk} marked as read"})
6. ルーティングの設定
# backend/notifications/urls.py
from django.urls import path
from .views import NotificationListView, MarkAllAsReadView, MarkOneAsReadView
urlpatterns = [
path("notifications/", NotificationListView.as_view(), name="notification_list"),
path("notifications/mark_all/", MarkAllAsReadView.as_view(), name="mark_all_read"),
path("notifications/<int:pk>/mark_read/", MarkOneAsReadView.as_view(), name="mark_one_read"),
]
7. シリアライザの設定
# backend/notifications/serializers.py
from rest_framework import serializers
from.models import Notification
class NotificationSerializer(serializers.ModelSerializer):
sender_username = serializers.ReadOnlyField(source="sender.username")
recipient_username = serializers.ReadOnlyField(source="recipient.username")
class Meta:
model = Notification
fields = ["id", "sender_username", "recipient_username", "message", "is_read", "created_at"]
read_only_fields = ["id", "sender_username", "recipient_username", "created_at"]
解説
-
sender_username
/recipient_username
などを返す -
is_read
は既読操作の際に更新されるフィールド。書き込みOKにしたいなら read_only_fields から外す。
8. 通知一覧を取得(フロント)
// frontend/src/components/NotificationList.jsx
import React, { useState, useEffect } from "react";
import API from "../api";
function NotificationList() {
const [notifications, setNotifications] = useState([]);
const [error, setError] = useState("");
const [loading, setLoading] = useState(true);
useEffect(() => {
fetchNotifications();
}, []);
const fetchNotifications = async () => {
setLoading(true);
setError("");
try {
const res = await API.get("notifications/"); // GET /api/notifications/
setNotifications(res.data);
} catch (err) {
console.error("Error fetching notifications:", err);
setError("Failed to load notifications");
}
setLoading(false);
};
// 全件既読
const markAllAsRead = async () => {
try {
await API.patch("notifications/mark_all/"); // あるいはPUT,POST, etc. 形は自由
// 全件既読になったら state も反映
setNotifications((prev) => prev.map((n) => ({ ...n, is_read: true })));
} catch (err) {
console.error("Failed to mark all as read:", err);
}
};
// 1件既読
const markOneAsRead = async (id) => {
try {
await API.patch(`notifications/${id}/mark_read/`);
setNotifications((prev) =>
prev.map((n) => (n.id === id ? { ...n, is_read: true } : n))
);
} catch (err) {
console.error("Failed to mark notification as read:", err);
}
};
if (loading) return <p>Loading notifications...</p>;
if (error) return <p className="text-red-500">{error}</p>;
return (
<div className="bg-white p-4">
<h2 className="text-xl font-bold mb-4">Notifications</h2>
<button onClick={markAllAsRead} className="bg-blue-500 text-white px-3 py-1 rounded mb-2">
Mark All as Read
</button>
{notifications.length === 0 ? (
<p>No notifications</p>
) : (
notifications.map((notif) => (
<div key={notif.id} className="border-b py-2">
<p className="text-sm">
From: <strong>{notif.sender_username}</strong>
</p>
<p className="text-sm">{notif.message}</p>
<small className="text-gray-500">
{new Date(notif.created_at).toLocaleString()}
</small>
{!notif.is_read && (
<button
onClick={() => markOneAsRead(notif.id)}
className="text-blue-500 ml-4"
>
Mark as Read
</button>
)}
</div>
))
)}
</div>
);
}
export default NotificationList;
9. Navbarに埋め込む
// frontend/src/components/Navbar.jsx
import React, { useContext } from 'react';
import { Link } from 'react-router-dom';
import { AuthContext } from '../contexts/AuthContext';
function Navbar() {
const { user, logout } = useContext(AuthContext);
return (
<nav className='bg-blue-600 p-4 text-white flex justify-between'>
<div>
<Link to="/" className='mr-4'>Home</Link>
{user && <Link to="/tweets" className='mr-4'>Tweets</Link>}
</div>
<div>
{ user ? (
<>
<Link to={`/users/${user.username}/profile`} className='mr-4'>{user.username}</Link>
<Link to="/notifications" className='mr-4'>Notifications</Link>
<button onClick={logout} className='bg-red-500 px-3 py-1 rounded'>Logout</button>
</>
) : (
<>
<Link to="/login" className='mr-4'>Login</Link>
<Link to="/register" className='bg-green-500 px-3 py-1 rounded'>Register</Link>
</>
)}
</div>
</nav>
);
}
export default Navbar;
2. リアルタイムでFeedを更新する機能 (WebSockets + Channels)
まずは軽くWebSocketを理解する。
- やりたいこと
- ユーザーがツイートを投稿すると、他のユーザーのタイムラインにもすぐ反映される
- ブラウザをリロードしなくても、新着ツイートが画面に出てくる
- Django Channels + Redis + WebSocket を用いて実装
1. settings.py (Channels設定)
# backend/settings.py
INSTALLED_APPS = [
...
"channels", # Channelsを使う
"tweets", # ツイートアプリ
...
]
ASGI_APPLICATION = "backend.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [(REDIS_HOST, int(REDIS_PORT))],
},
},
}
2. .envでホストとPORTを設定
# backend/.end
REDIS_HOST=127.0.0.1
REDIS_PORT=6379
3. asgi.pyの設定
# backend/asgi.py
import os
import django
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import tweets.routing
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
django.setup()
application = get_asgi_application()
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": URLRouter(
tweets.routing.websocket_urlpatterns
),
})
4. TweetConsumerの設定
# backend/tweets/consumer.py
import json
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class TweetConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
print("WebSocket connected!")
# 全員が参加するグループ "tweets" としてみる
await self.channel_layer.group_add("tweets", self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard("tweets", self.channel_name)
async def receive_json(self, content):
# クライアントからメッセージを受け取ったときの処理
# 今回は特になにもしない
pass
async def tweet_message(self, event):
# 送られてきた新着ツイート情報をクライアントにブロードキャスト
await self.send_json({
"type": "tweet_message",
"tweet": event["tweet"],
})
5. routing.pyの設定
# backend/tweets/routing.py
from django.urls import re_path
from.consumer import TweetConsumer
websocket_urlpatterns = [
re_path(r'^ws/tweets/$', TweetConsumer.as_asgi()),
]
tweets/views.pyの設定
# backend/tweets/views.py
from rest_framework import viewsets, permissions
from.models import Tweet
from.serializers import TweetSerializer
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
# Create your views here.
class TweetViewSet(viewsets.ModelViewSet):
queryset = Tweet.objects.all().order_by("-created_at")
serializer_class = TweetSerializer
permission_classes = [permissions.IsAuthenticatedOrReadOnly]
def perform_create(self,serializer):
# 新規作成時に "user" を自動的にセット
tweet = serializer.save(user=self.request.user)
# 新しいツイートが投稿されたらWebsocketで通知
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
"tweets",
{
"type": "tweet_message",
"tweet": {
"id": tweet.id,
"content": tweet.content,
"username": tweet.user.username,
"created_at": str(tweet.created_at),
}
}
)
6. Feed.jsxの作成
// frontend/src/Feed.jsx
import React, { useState, useEffect, useRef } from "react";
import API from "../api";
function Feed() {
const [tweets, setTweets] = useState([]);
const [error, setError] = useState("");
const [loading, setLoading] = useState(true);
// WebSocketのリファレンス
const wsRef = useRef(null);
useEffect(() => {
API.get("tweets/")
.then((res) => {
setTweets(res.data);
})
.catch((err) => {
console.error(err);
setError("Failed to load tweets.");
})
.finally(() => setLoading(false));
// WebSocket接続
const wsUrl = `ws://localhost:8000/ws/tweets/`; // asgi.pyで設定
const socket = new WebSocket(wsUrl);
wsRef.current = socket;
socket.open = () => {
console.log("WebSocket connected for Feed!");
};
socket.onmessage = (e) => {
const data = JSON.parse(e.data);
console.log("New message from WS:", data);
if (data.type === "tweet_message") {
// 新しいツイートが届いた
setTweets((prev) => [data.tweet, ...prev]);
}
};
socket.onerror = (err) => {
console.error("WebSocket error:", err);
};
socket.onclose = () => {
console.log("WebSocket closed for Feed.");
// 必要に応じて再接続処理
};
// コンポーネントアンマウント時にソケット切断
return () => {
if (wsRef.current) {
wsRef.current.close();
}
};
}, []);
if (loading) return <p>Loading feed...</p>;
if (error) return <p className="text-red-500">{error}</p>;
return (
<div>
<h2>Real-time Feed</h2>
{tweets.map((tweet) => (
<div key={tweet.id} className="border p-2 mb-2">
<p><strong>{tweet.username}</strong> - {tweet.content}</p>
<small>{tweet.created_at}</small>
</div>
))}
</div>
);
}
export default Feed;
DM機能の実装
1. python manage.py startapp dm
2. settings.pyに追記
3. DirectMessageモデルの作成 & makemigration & migrate
# baxkend/dm/models.py
from django.db import models
from django.conf import settings
class DirectMessage(models.Model):
sender = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='sent_dms')
recipient = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, related_name='received_dms')
content = models.TextField()
created_at = models.DateTimeField(auto_now_add=True)
def __str__(self):
return f"DM from {self.sender} to {self.recipient}"
4. Consumerの作成
# dackend/dm/consumer.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
from django.contrib.auth import get_user_model
from asgiref.sync import sync_to_async
from .models import DirectMessage
User = get_user_model()
class DMConsumer(AsyncWebsocketConsumer):
# 1対1チャット用。
# ws://localhost:8000/ws/dm/<相手ユーザーID>/
async def connect(self):
# URLのパラメータとして user_id が来る想定
self.other_username = self.scope['url_route']['kwargs']['username']
self.user = self.scope['user'] # ログイン中のユーザー
print(f"DMConsumer connect: {self.user.username} -> username= {self.other_username}")
if self.user.is_anonymous:
# 未ログインなら接続拒否
print("User is anonymous.")
await self.close()
return
try:
recipient = await sync_to_async(User.objects.get)(username=self.other_username)
self.other_user_id = recipient.id
print(f"Recipient found: {recipient.username} (ID: {recipient.id})")
except User.DoesNotExist:
print(f"Error: Recipient with username {self.other_username} does not exist.")
await self.close()
return
self.room_name = f"dm_{min(self.user.id, self.other_user_id)}_{max(self.user.id, self.other_user_id)}"
print(f"Room name: {self.room_name}")
await self.channel_layer.group_add(self.room_name, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_name, self.channel_name)
async def receive(self, text_data=None, bytes_data=None):
# テキストメッセージを受け取る
if text_data is not None:
data = json.loads(text_data)
message_content = data.get('content')
# DBに保存
dm_obj = await self.save_dm(message_content)
if dm_obj is None:
print("Message was not saved due to an error.")
return
# 他方にもブロードキャスト
await self.channel_layer.group_send(
self.room_name,
{
'type': 'dm_message',
'dm_id': dm_obj.id,
'content': message_content,
'sender_id': self.user.id
}
)
async def dm_message(self, event):
"""
グループ内の全クライアントにメッセージを転送
"""
await self.send(json.dumps({
'dm_id': event['dm_id'],
'content': event['content'],
'sender_id': event['sender_id']
}))
@sync_to_async
def save_dm(self, content):
try:
recipient = User.objects.get(id=int(self.other_user_id))
print(f"Saving message: sender={self.user.id}, recipient={recipient.id}")
except User.DoesNotExist:
print(f"Error: recipient user {self.other_user_id} does not exist")
return None
return DirectMessage.objects.create(
sender=self.user,
recipient=recipient,
content=content
)
解説
1. モジュールのインポート
-
json
:WebSocket で送受信するデータを JSON 形式にエンコード・デコードするために使う -
AsyncWebsocketConsumer
:Django Channels の WebSocketコンシューマ(クライアントとの通信を処理するクラス) -
sync_to_async
:同期処理を非同期で実行するためのデコレーター(DBアクセスのため)
2. scopeとは?
- WebSocket では
self.scope
にリクエスト情報(ユーザー情報やURLのパラメータ)が入る -
self.scope['url_route']['kwargs']['username']
:URL に含まれる 相手ユーザーの ID を取得 - 例)
ws://localhost:8000/ws/dm/test/
→ username=test -
self.scope['user']
:現在ログインしている 自分のユーザー情報
3. group_add, group_discard, group_sendとは?
-
group_add()
:Django Channels のグループ機能で、複数の WebSocket 接続をまとめる -
self.room_name
という名前の チャットルームに参加 する -
accept()
で WebSocket 接続を受け入れる -
group_discard()
:チャットルーム(グループ)から退出 -
close_code
は切断理由(エラーコードなど) -
group_send()
を使って、チャットルームの全メンバーにメッセージを送信
4. save_dmメソッドについて
- Django の ORM は同期処理なので、そのままだと 非同期処理の receive() から使えない
-
@sync_to_async
を使って、同期処理(DB 書き込み)を非同期で実行する
5. routing.pyの作成
# backend/dm/routing.py
from django.urls import path
from .consumers import DMConsumer
websocket_urlpatterns = [
path("ws/dm/<str:username>/", DMConsumer.as_asgi()),
]
解説
- ws/dm// に対して DMConsumer を適用する
-
(?P<user_id>\d+)
→ user_id というパラメータを整数(\d+)
として取得 する正規表現 -
$
→ URL の終端を意味する(それ以降に余計な文字列があるとマッチしない)
6. asgiのへの追記
# backend/asgi.py
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
django.setup()
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from .middleware import JWTAuthMiddlewareStack
import tweets.routing
import dm.routing
application = get_asgi_application()
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": JWTAuthMiddlewareStack(
URLRouter(
tweets.routing.websocket_urlpatterns +
dm.routing.websocket_urlpatterns
)
),
})
解説
1. Djangoを読み込む
-
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "backend.settings")
- Djangoの設定ファイル(backend/settings.py)を環境変数として指定
- これにより、Djangoの各種設定(データベースやアプリの設定)が読み込まれる
-
django.setup()
- Djangoの初期化処理を実行し、モデルやアプリを使用できるようにする
2. モジュールについて
-
ProtocolTypeRouter
:- リクエストのプロトコル(HTTP, WebSocket など)に応じて処理を分岐するルーター
-
URLRouter
:- WebSocketのURLルーティング を定義するために使用
-
get_asgi_application()
:- DjangoのASGIアプリケーション(HTTP用)を取得する関数
-
JWTAuthMiddlewareStack
:- WebSocket通信でJWT認証を行うミドルウェア(
backend/middleware.py
に定義されている)
- WebSocket通信でJWT認証を行うミドルウェア(
7. DMコンポーネントの作成
// frontend/src/components/DM.jsx
import React, { useState, useEffect, useRef, useCallback, useContext } from "react";
import { useParams } from "react-router-dom";
import API from "../api";
import { connectDMWebSocket } from "../api/dm";
import { AuthContext } from "../contexts/AuthContext";
function DMPage() {
const { username } = useParams(); // URLパラメータの相手ユーザーID
const { user } = useContext(AuthContext);
const currentUserId = user ? user.id : null;
const [messages, setMessages] = useState([]);
const [inputContent, setInputContent] = useState("");
const socketRef = useRef(null);
// useCallback を使ってfetchDMHistoryをメモ化
const fetchDMHistory = useCallback(async () => {
if(!username) return;
try {
const res = await API.get(`dm/?username=${username}`);
setMessages(res.data);
} catch (err) {
console.error("Failed to fetch DM history:", err);
}
}, [username]); // `userId` を依存関係に追加
// fetchDMHistoryをuseEffectの依存配列に追加
useEffect(() => {
fetchDMHistory();
}, [fetchDMHistory]);
// 2) WebSocket接続
useEffect(() => {
const socket = connectDMWebSocket(username); // WebSocket接続関数を使用
socketRef.current = socket;
socket.onopen = () => {
console.log("DM WebSocket connected");
};
socket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log("DM message received:", data);
if (data.dm_id) {
setMessages((prev) => [...prev, data]);
}
};
socket.onerror = (err) => {
console.error("DM WebSocket error:", err);
};
socket.onclose = () => {
console.log("DM WebSocket closed");
};
return () => {
if (socketRef.current) {
socketRef.current.close();
}
};
}, [username]);
// 3) メッセージ送信
const sendMessage = () => {
if (!socketRef.current || !inputContent.trim()) return;
socketRef.current.send(JSON.stringify({ content: inputContent }));
setInputContent("");
};
return (
<div className="container mx-auto p-4">
<h2 className="text-xl font-bold mb-4">DM with User {username}</h2>
<div className="border p-4 mb-4 "style={{ height: '300px', overflowY: 'auto' }}>
{messages.map((msg, idx) => {
console.log("message obj =>", msg);
console.log(" msg.sender =>", msg.sender, "type:", typeof msg.sender);
console.log(" currentUserId =>", currentUserId, "type:", typeof currentUserId);
const isMyMessage = (String(msg.sender) === String(currentUserId));
console.log("isMyMessage =>", isMyMessage);
return(
<div key={idx} className={`flex w-full mb-2 ${isMyMessage ? "justify-end" : "justify-start"}`}>
<div className={`rounded p-2 max-w-sm ${isMyMessage ? "bg-blue-500 text-white" : "bg-gray-300 text-black"}`}>
{msg.content}
</div>
</div>
);
})}
</div>
<div className="flex items-center gap-2">
<input
className="flex-1 p-2 border rounded"
type="text"
value={inputContent}
onChange={(e) => setInputContent(e.target.value)}
onKeyDown={(e) => {
if (e.key === "Enter") {
e.preventDefault(); // エンターでフォームが送信されないようにする
sendMessage();
}
}}
/>
<button
className="bg-blue-500 hover:bg-blue-600 text-white px-4 py-2 rounded"
onClick={sendMessage}
>
Send
</button>
</div>
</div>
);
}
export default DMPage;
8. frontend/src/utils/auth.jsを作成
アクセストークンを取得する関数を作成
// frontend/src/utils/auth.js
export function getAccessToken() {
return localStorage.getItem("access_token");
}
9. frontend/src/api/dm.jsの作成
// frontend/src/api/dm.js
import { getAccessToken } from "../utils/auth";
export function connectDMWebSocket(username) {
const token = getAccessToken(); // JWTトークンを取得
const wsUrl = `ws://localhost:8000/ws/dm/${username}/?token=${token}`;
console.log("Connect WebSocket:", wsUrl);
return new WebSocket(wsUrl);
}
10. backend/middleware.pyの作成
Django Channels
の WebSocket
接続時に JWT 認証
を行うためのmiddleware.py
です。これにより、クライアントが WebSocket 接続を開始するときに、URL のクエリパラメータに含まれるトークンを検証し、認証済みのユーザー情報を scope["user"]
にセットする仕組みになっています。
# backend/backend/middleware.py
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.tokens import AccessToken
from django.contrib.auth import get_user_model
from channels.auth import AuthMiddlewareStack
User = get_user_model()
class JWTAuthMiddleware:
"""
WebSocketリクエストでJWT認証を行うカスタムミドルウェア
"""
def __init__(self, inner):
self.inner = inner
async def __call__(self, scope, receive, send):
# クエリ文字列から token を取得
query_string = parse_qs(scope["query_string"].decode())
token = query_string.get("token", [None])[0]
if token:
try:
access_token = AccessToken(token)
user = await database_sync_to_async(User.objects.get)(id=access_token["user_id"])
scope["user"] = user
except Exception as e:
print(f"failed JWT: {e}")
scope["user"] = AnonymousUser()
else:
scope["user"] = AnonymousUser()
return await self.inner(scope, receive, send)
# Django Channels のミドルウェアとして適用
def JWTAuthMiddlewareStack(inner):
return JWTAuthMiddleware(AuthMiddlewareStack(inner))
解説
1. モジュールについて
parse_qs:
- URL のクエリ文字列をパース(解析)して、辞書形式に変換する関数
- これを使って、WebSocket 接続時に URL に含まれるトークンを抽出
database_sync_to_async:
- Django の同期的なデータベース操作(例えば、ユーザーの取得)を非同期コード内で実行できるようにするためのユーティリティ
AnonymousUser:
- 認証されていない(未ログイン)ユーザーを表す Django のクラス
- トークンが無かったり、認証に失敗した場合に
scope["user"]
に設定
AccessToken:
-
Django REST Framework Simple JWT
のトークンクラス - トークンの検証や、トークンに埋め込まれたペイロード(ここでは
username
など)を抽出するために使用
AuthMiddlewareStack:
-
Django Channels
に標準で用意されている認証ミドルウェアスタック
-このスタックは、通常の HTTP リクエストの認証処理に似た方法で、WebSocket の接続に対してもユーザー認証情報を設定しますが、今回はそれをラップするために使用
2. innerについて
inner
というのは、このミドルウェアがラップしている次のASGIアプリケーションのことを指します。
ASGIミドルウェアはチェーン状に連結され、各ミドルウェアがリクエスト処理の前後で追加の処理を実行。その際、ミドルウェア自体は「次に呼び出すべきアプリケーション」を内部に保持。これを inner
と呼ぶ。
3. query_string
-
scope["query_string"]
はバイト列(b"token=xxxxxx")なので、 .decode() して文字列化 -
parse_qs()
でクエリ文字列を辞書型に変換 -
例
: "token=abcd1234" → {'token': ['abcd1234']}
4. JWT の検証とユーザー取得
AccessToken(token):
-受け取ったトークンを AccessToken
クラスに渡し、トークンの検証とデコードを実行。
- これにより、トークンのペイロードから
username
などの情報を取得。
database_sync_to_async(User.objects.get)(id=access_token["user_id"]):
- トークンから取得した
username
を使って、User モデルから該当するユーザーを取得。 - これは同期処理なので、
database_sync_to_async
を使って非同期環境で呼び出せるように対応。 - 結果、認証に成功したユーザーオブジェクトが取得。
scope["user"] = user:
- 取得したユーザーオブジェクトを、接続の
scope
に設定。 - その後、接続先のコンシューマ(例: TweetConsumer や DMConsumer)では、この
scope["user"]
を使って認証済みのユーザー情報にアクセス可能。