(1)源码入口
rest_framework_jwt.views.ObtainJSONWebToken 的 父类 JSONWebTokenAPIView 的 post 方法
PS:只有post方法 接受username 与 password请求
(2)校验方式
post方法将请求数据交给 rest_framework_jwt.serializer.JSONWebTokenSerializer 处理 PS 完成数据的校验,会走序列化类的 全局钩子校验规则,校验得到登录用户并签发token存储在序列化对象中
(3)核心源码
def validate(self, attrs): # 账号密码字典 credentials = { self.username_field: attrs.get(self.username_field), ‘password‘: attrs.get(‘password‘) } if all(credentials.values()): # 签发token第1步:用账号密码得到user对象 user = authenticate(**credentials) if user: if not user.is_active: msg = _(‘User account is disabled.‘) raise serializers.ValidationError(msg) # 签发token第2步:通过user得到payload,payload包含着用户信息与过期时间 payload = jwt_payload_handler(user) # 在视图类中,可以通过 序列化对象.object.get(‘user‘或者‘token‘) 拿到user和token return { # 签发token第3步:通过payload签发出token ‘token‘: jwt_encode_handler(payload), ‘user‘: user } else: msg = _(‘Unable to log in with provided credentials.‘) raise serializers.ValidationError(msg) else: msg = _(‘Must include "{username_field}" and "password".‘) msg = msg.format(username_field=self.username_field) raise serializers.ValidationError(msg)
(1)通过username password 生成user对象
(2)通过user对象生成载荷(payload)
jwt_payload_handler(user) => payload
from rest_framework_jwt.serializers import jwt_payload_handler
(3)通过载荷签发token
jwt_encode_handler(payload) => token from rest_framework_jwt.serializers import jwt_encode_handler
(1)前提:一个配置JWT认证的视图类 就必须提供token进行认证 在认证的过程中完成token的校验
rest_framework_jwt.authentication.JSONWebTokenAuthentication 的 父类 BaseJSONWebTokenAuthentication 的 authenticate 方法
# 请求头拿认证信息jwt-token => 通过反爬小规则确定有用的token => payload => user
(2)核心源码
rest_framework_jwt.authentication.BaseJSONWebTokenAuthentication的authenticate(self, request)方法
def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ # 带有反爬小规则的获取token:前台必须按 "jwt token字符串" 方式提交 # 校验user第1步:从请求头 HTTP_AUTHORIZATION 中拿token,并提取 jwt_value = self.get_jwt_value(request) # 游客 if jwt_value is None: return None # 校验 try: # 校验user第2步:token => payload payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _(‘Signature has expired.‘) raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _(‘Error decoding signature.‘) raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() # 校验user第3步:token => payload user = self.authenticate_credentials(payload) return (user, jwt_value)
(3)检验token
(1)从请求头中获取token
(2)根据token解析出payload(载荷)
jwt_decode_handler(token) => payloay from rest_framework_jwt.authentication import jwt_decode_handler
(3)根据载荷解析出用户
self.authenticate_credentials(payload) => user
继承drf-jwt的BaseJSONWebTokenAuthentication,拿到父级的authenticate_credentials方法
(1)model层
# 模型层 from django.db import models from django.contrib.auth.models import AbstractUser class User(AbstractUser): mobile = models.CharField(max_length=11, unique=True) class Meta: db_table = ‘api_user‘ verbose_name = ‘用户表‘ verbose_name_plural = verbose_name def __str__(self): return self.username
(2)api/serializers序列化层
import re from rest_framework_jwt.serializers import jwt_payload_handler # 生成载荷 from rest_framework_jwt.serializers import jwt_encode_handler # 解析user from . import models # 导入模型层 from rest_framework import serializers # 序列化组件 class UserModelSerializer(serializers.ModelSerializer): # 定义校验类
# 自定义反序列字段:一定要设置write_only,只参与反序列化,不会与model类字段映射
user = serializers.CharField(write_only=True) pwd = serializers.CharField(write_only=True) class Meta: model = models.User fields = [‘user‘, ‘pwd‘, ‘username‘, ‘mobile‘, ‘email‘] extra_kwargs = { ‘username‘: { ‘read_only‘: True }, ‘mobile‘: { ‘read_only‘: True }, ‘email‘: { ‘read_only‘: True }, } def validate(self, attrs): # 设置校验的数据 user = attrs.get(‘user‘) pwd = attrs.get(‘pwd‘) # 设置校验的方式 if re.match(r‘.*@.*‘, user): # 匹配邮箱号 进行邮箱登录 user_obj = models.User.objects.filter(email=user).first() elif re.match(r‘1[3-9][0-9]{9}‘, user): # 匹配13-19开头的手机号 进行手机登录 user_obj = models.User.objects.filter(mobile=user).first() # 用户名登录 else: user_obj = models.User.objects.filter(username=user).first() if user_obj and user_obj.check_password(pwd): # 判断是否有用户与传入的密码是否正确 payload = jwt_payload_handler(user_obj) # 生成载荷 print(payload) # {‘user_id‘: 1, ‘username‘: ‘admin‘, ‘exp‘: datetime.datetime(2019, 10, 23, 12, 37, 52, 240300), ‘email‘: ‘1234@qq.com‘} token = jwt_encode_handler(payload) # 生成token print(token) # eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VyX2lkIjoxLCJ1c2VybmFtZSI6ImFkbWluIiwiZXhwIjoxNTcxODM0MjcyLCJlbWFpbCI6IjEyMzRAcXEuY29tIn0.zLeCBgT6j6zi2g59iTIGIB5ELSmiWmgf8IdMk1mZors self.user = user_obj # 将用户传给后端 self.token = token # 将token传入给后端 return attrs # 上述条件 都不满足 直接抛出异常 raise serializers.ValidationError( { ‘msg‘: "数据错误" } )
‘‘‘
1:设置自定义反序列化字段
2:设置全局校验钩子
3:通过前台输入的数据 对数据进行匹配生成user_obj
4:将生成的user_obj 生成载荷
5:调用生成的载荷生成token
6:将生成user_obj 与 token传入后端
‘‘‘
(3)路由层
url(r‘^login/$‘, views.Login.as_view()),
(4)视图层
from . import serializers class Login(APIView): permission_classes = [] # 不进行权限校验 authentication_classes = [] # 不进行认证校验 def post(self, request, *args, **kwargs): user_ser = serializers.UserModelSerializer(data=request.data) # 拿到传入的数据进行反序列化校验
user_ser.is_valid(raise_exception=True) # 序列化类校验得到登录用户与token存放在序列化对象中 token = user_ser.token user_data = serializers.UserModelSerializer(user_ser.user).data # 获取传入的数据 进行序列化 return Response( { ‘status‘: 0, ‘msg‘: "测试成功", ‘results‘: user_data, ‘token‘: token } )
‘‘‘
1:获取前台传入的数据 进行反序列化 拿到user_ser
2:判断数据是否有效 如果无效直接抛出异常
3:在将前台传入的数据进行序列化 传入给前台展示
‘‘‘
(1)api/
import jwt from rest_framework_jwt.authentication import BaseJSONWebTokenAuthentication from rest_framework_jwt.authentication import jwt_decode_handler from rest_framework.exceptions import AuthenticationFailed class JWTAuthentication(BaseJSONWebTokenAuthentication): def authenticate(self, request): jwt_token = request.META.get(‘HTTP_AUTHORIZATION‘) # 自定义校验规则:auth token jwt token = self.parse_jwt_token(jwt_token) if token is None: return None try: # token => payload payload = jwt_decode_handler(token) except jwt.ExpiredSignature: raise AuthenticationFailed(‘token已过期‘) except: raise AuthenticationFailed(‘非法用户‘) # payload => user user = self.authenticate_credentials(payload) return (user, token) # 自定义校验规则:auth token jwt,auth为前盐,jwt为后盐 def parse_jwt_token(self, jwt_token): tokens = jwt_token.split() if len(tokens) != 3 or tokens[0].lower() != ‘auth‘ or tokens[2].lower() != ‘jwt‘: return None return tokens[1]
(2)视图层
from rest_framework.views import APIView from utils.response import APIResponse # 必须登录后才能访问 - 通过了认证权限组件 from rest_framework.permissions import IsAuthenticated # 自定义jwt校验规则 from .authentications import JWTAuthentication class UserDetail(APIView): authentication_classes = [JWTAuthentication] permission_classes = [IsAuthenticated] def get(self, request, *args, **kwargs): return APIResponse(results={‘username‘: request.user.username})
from django.contrib import admin from . import models # 自定义User表,admin后台管理,采用密文密码 from django.contrib.auth.admin import UserAdmin class MyUserAdmin(UserAdmin): add_fieldsets = ( (None, { ‘classes‘: (‘wide‘,), ‘fields‘: (‘username‘, ‘password1‘, ‘password2‘, ‘mobile‘, ‘email‘), }), ) admin.site.register(models.User, MyUserAdmin)
(1)模型层
class Car(models.Model): name = models.CharField(max_length=16, unique=True, verbose_name=‘车名‘) price = models.DecimalField(max_digits=10, decimal_places=2, verbose_name=‘价格‘) brand = models.CharField(max_length=16, verbose_name=‘品牌‘) class Meta: db_table = ‘api_car‘ verbose_name = ‘汽车表‘ verbose_name_plural = verbose_name def __str__(self): return self.name
(2)admin注册
admin.site.register(models.Car)
(3)序列化层api/
class CarModelSerializer(serializers.ModelSerializer): class Meta: model = models.Car fields = [‘name‘, ‘price‘, ‘brand‘]
(4)搜索过滤组件
from rest_framework.generics import ListAPIView # 进行群查 from rest_framework.filters import SearchFilter # 导入搜索组件 from . import models class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer filter_backends = [SearchFilter] # 局部过滤 search_fields = [‘name‘,‘price‘] # 设置查找的字段
(5)排序过滤组件
from rest_framework.generics import ListAPIView # 第一步:drf的OrderingFilter - 排序过滤 from rest_framework.filters import OrderingFilter class CarListAPIView(ListAPIView): queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 第二步:局部配置 过滤类 们(全局配置用DEFAULT_FILTER_BACKENDS) filter_backends = [OrderingFilter] # 第三步:OrderingFilter过滤类依赖的过滤条件 => 接口:/cars/?ordering=... ordering_fields = [‘pk‘, ‘price‘] # eg:/cars/?ordering=-price,pk,先按price降序,如果出现price相同,再按pk升序
(6)分页组件
api/pahenations
from rest_framework.pagination import PageNumberPagination class MyPageNumberPagination(PageNumberPagination): # ?page=页码 page_query_param = ‘page‘ # ?page=页面 下默认一页显示的条数 page_size = 3 # ?page=页面&page_size=条数 用户自定义一页显示的条数 page_size_query_param = ‘page_size‘ # 用户自定义一页显示的条数最大限制:数值超过5也只显示5条 max_page_size = 5
视图层
from rest_framework.generics import ListAPIView class CarListAPIView(ListAPIView): # 如果queryset没有过滤条件,就必须 .all(),不然分页会出问题 queryset = models.Car.objects.all() serializer_class = serializers.CarModelSerializer # 分页组件 - 给视图类配置分页类即可 - 分页类需要自定义,继承drf提供的分页类即可 pagination_class = pagenations.MyPageNumberPagination
原文:https://www.cnblogs.com/SR-Program/p/11728757.html