RBAC: (Role-BasedAccessControl): 基于角色的访问控制
Django中的Auth组件采用的是权限六表: User, Group, Permission, UserGroup关系表, UserPermission关系表, GroupPermission关系表
‘‘‘
# ...\d_proj\api\models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class MyUser(AbstractUser):
mobile = models.CharField(max_length=11, verbose_name=‘电话号码‘, unique=True)
class Meta:
db_table = ‘old_boy_user‘
verbose_name_plural = ‘用户表‘
def __str__(self):
return self.username
‘‘‘
‘‘‘
# ...\d_proj\api\admin.py
from django.contrib import admin
from .models import MyUser
from django.contrib.auth.admin import UserAdmin
class MyUserAdmin(UserAdmin):
# 设置admin后台添加用户时需要提供的字段值
add_fieldsets = (
(None, {
‘classes‘: (‘wide‘,),
‘fields‘: (‘username‘, ‘password1‘, ‘password2‘, ‘is_staff‘, ‘mobile‘),
}),
)
# 设置admin后台展示用户表时展示出的字段信息
list_display = (‘username‘, ‘email‘, ‘mobile‘, ‘is_staff‘)
admin.site.register(MyUser, MyUserAdmin)
‘‘‘
‘‘‘
# ...\Lib\site-packages\rest_framework\views.py
from rest_framework.settings import api_settings
...
class APIView(View):
...
authentication_classes = api_settings.DEFAULT_AUTHENTICATION_CLASSES
...
def get_authenticators(self):
return [auth() for auth in self.authentication_classes] # 返回包含身份认证类的对象的列表
...
def perform_authentication(self, request): # request为新的request对象
request.user # .user表示调用@property装饰的方法
...
def initialize_request(self, request, *args, **kwargs):
...
return Request(
request,
...,
authenticators=self.get_authenticators(), # 将包含身份认证类的对象的列表传给Request类进行实例化
...
)
def initial(self, request, *args, **kwargs):
...
self.perform_authentication(request) # 进入身份认证
...
def dispatch(self, request, *args, **kwargs):
...
request = self.initialize_request(request, *args, **kwargs)
...
try:
self.initial(request, *args, **kwargs) # 进入三大认证
...
# ...\Lib\site-packages\rest_framework\request.py
...
class Request:
def __init__(self, request, ..., authenticators=None,
...):
...
self.authenticators = authenticators or ()
...
@property
def user(self): # 通过request.user获取的是request._user的值
if not hasattr(self, ‘_user‘):
with wrap_attributeerrors():
self._authenticate()
return self._user
@user.setter # 通过request.user = value, 设置的是request._user = value
def user(self, value):
self._user = value
self._request.user = value # 兼容原request对象
...
def _authenticate(self):
for authenticator in self.authenticators: # self为request对象
try:
user_auth_tuple = authenticator.authenticate(self) # 认证器调用authenticate(authenticator, request)方法得到(user, auth)元组
except exceptions.APIException:
self._not_authenticated() # 将访问者作匿名用户处理
raise
if user_auth_tuple is not None:
self._authenticator = authenticator
self.user, self.auth = user_auth_tuple
return
self._not_authenticated() # 将访问者作匿名用户处理
‘‘‘
‘‘‘
# ...\Lib\site-packages\rest_framework\settings.py
from django.conf import settings # django的settings对象作了配置的插拔式设计
...
DEFAULTS = {
...,
‘DEFAULT_AUTHENTICATION_CLASSES‘: [
‘rest_framework.authentication.SessionAuthentication‘,
‘rest_framework.authentication.BasicAuthentication‘
],
...,
}
...
class APISettings:
def __init__(self, user_settings=None, defaults=None, import_strings=None):
...
self.defaults = defaults or DEFAULTS
...
@property
def user_settings(self):
if not hasattr(self, ‘_user_settings‘):
self._user_settings = getattr(settings, ‘REST_FRAMEWORK‘, {})
return self._user_settings
def __getattr__(self, attr):
if attr not in self.defaults:
raise AttributeError("Invalid API setting: ‘%s‘" % attr)
try:
val = self.user_settings[attr]
except KeyError:
val = self.defaults[attr]
...
api_settings = APISettings(None, DEFAULTS, IMPORT_STRINGS)
‘‘‘
‘‘‘
auth = ‘Basic cql‘
auth = auth.encode(‘utf8‘)
print(auth) # b‘Basic cql‘
auth = auth.split()
print(auth) # [b‘Basic‘, b‘cql‘]
print(auth[0]) # b‘Basic‘
print(auth[0].lower()) # b‘basic‘
# split(sep, num): sep为分隔符, num为分隔次数
str1 = "C S\nG\tO"
print(str1.split()) # [‘C‘, ‘S‘, ‘G‘, ‘O‘], 不写sep时, 默认会用空格, \n, \t分隔字符串
‘‘‘
‘‘‘
# ...\Lib\site-packages\rest_framework\authentication.py
import base64
from django.contrib.auth import authenticate
...
def get_authorization_header(request):
auth = request.META.get(‘HTTP_AUTHORIZATION‘, b‘‘)
if isinstance(auth, str):
auth = auth.encode(HTTP_HEADER_ENCODING)
return auth
class BasicAuthentication(BaseAuthentication):
...
def authenticate(self, request):
auth = get_authorization_header(request).split() # 从请求头中获取二进制形式的Authorization值
# 如果没有携带Authorization值或者Authorization值不以basic开头, 则表示访问者身份为游客, 直接通过身份认证
if not auth or auth[0].lower() != b‘basic‘:
return None
# 如果Authorization值以basic开头但校验失败则抛异常
if len(auth) == 1:
msg = _(‘Invalid basic header. No credentials provided.‘)
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = _(‘Invalid basic header. Credentials string should not contain spaces.‘)
raise exceptions.AuthenticationFailed(msg)
try:
auth_parts = base64.b64decode(auth[1]).decode(HTTP_HEADER_ENCODING).partition(‘:‘)
except (TypeError, UnicodeDecodeError, binascii.Error):
msg = _(‘Invalid basic header. Credentials not correctly base64 encoded.‘)
raise exceptions.AuthenticationFailed(msg)
userid, password = auth_parts[0], auth_parts[2] # 通过解析Authorization值中token字符串得到userid和password
return self.authenticate_credentials(userid, password, request) # 进入userid和password的校验
def authenticate_credentials(self, userid, password, request=None):
# 将userid和password处理成字典形式
credentials = {
get_user_model().USERNAME_FIELD: userid,
‘password‘: password
}
user = authenticate(request=request, **credentials) # 借助auth模块的authenticate方法校验userid和password得到user对象
if user is None:
raise exceptions.AuthenticationFailed(_(‘Invalid username/password.‘))
if not user.is_active:
raise exceptions.AuthenticationFailed(_(‘User inactive or deleted.‘))
return (user, None)
‘‘‘
安装: pip install djangorestframework-jwt
优点:
Nginx: 负载均衡, 动静分离(CDN)
数据库读写分离
可逆加密与不可逆加密: 不可逆加密是输入明文后由系统直接经过加密算法处理成密文, 这种加密后的数据是无法被解密的
对称加密与非对称加密: 对称加密的加密密钥与解密密钥相同, 非对称加密有一对不同的密钥用于加密和解密
// Header
{
"alg": "HS256",
"typ": "JWT"
}
// Payload
{
... // 包含校验账号密码后得到的用户信息和过期时间
}
// $Signature
HS256(Base64(Header) + "." + Base64(Payload), secretKey)
// JWT
JWT = Base64(Header) + "." + Base64(Payload) + "." + $Signature
‘‘‘
# ...\Lib\site-packages\rest_framework_jwt\views.py
...
jwt_response_payload_handler = api_settings.JWT_RESPONSE_PAYLOAD_HANDLER # ‘rest_framework_jwt.utils.jwt_response_payload_handler‘
class JSONWebTokenAPIView(APIView):
# 签发接口不进行身份认证和权限认证
permission_classes = ()
authentication_classes = ()
...
def post(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
if serializer.is_valid():
user = serializer.object.get(‘user‘) or request.user
token = serializer.object.get(‘token‘)
response_data = jwt_response_payload_handler(token, user, request)
response = Response(response_data)
...
return response
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class ObtainJSONWebToken(JSONWebTokenAPIView):
serializer_class = JSONWebTokenSerializer
# ...\Lib\site-packages\rest_framework_jwt\utils.py
def jwt_response_payload_handler(token, user=None, request=None):
return {
‘token‘: token
}
# ...\Lib\site-packages\rest_framework_jwt\serializers.py
...
from django.contrib.auth import authenticate
from .compat import Serializer
from rest_framework_jwt.settings import api_settings
jwt_payload_handler = api_settings.JWT_PAYLOAD_HANDLER
jwt_encode_handler = api_settings.JWT_ENCODE_HANDLER
class JSONWebTokenSerializer(Serializer):
def __init__(self, *args, **kwargs):
super(JSONWebTokenSerializer, self).__init__(*args, **kwargs)
# 定义JSONWebTokenSerializer类操作的字段
self.fields[self.username_field] = serializers.CharField()
self.fields[‘password‘] = PasswordField(write_only=True)
...
def validate(self, attrs):
# 将username和password处理成字典的形式
credentials = {
self.username_field: attrs.get(self.username_field),
‘password‘: attrs.get(‘password‘)
}
if all(credentials.values()):
user = authenticate(**credentials) # 借助auth模块的authenticate方法校验username和password得到user对象
...
payload = jwt_payload_handler(user) # 将user对象进行处理后得到payload
return {
‘token‘: jwt_encode_handler(payload), # 将payload进行处理后得到token
‘user‘: user
}
...
# ...\Lib\site-packages\rest_framework_jwt\compat.py
class Serializer(serializers.Serializer):
@property
def object(self):
return self.validated_data
‘‘‘
‘‘‘
# ...\d_proj\api\urls.py
...
urlpatterns = [
url(r‘^login/$‘, views.JwtLoginAPIView.as_view())
]
# ...\d_proj\api\views.py
from rest_framework_jwt.views import JSONWebTokenAPIView
from .my_serializers import JwtLoginSerializer
class JwtLoginAPIView(JSONWebTokenAPIView):
serializer_class = JwtLoginSerializer
# ...\d_proj\api\my_serializers.py
from rest_framework import serializers
import re
from . import models
from rest_framework_jwt.serializers import jwt_payload_handler, jwt_encode_handler
class JwtLoginSerializer(serializers.Serializer):
# 定义JwtLoginSerializer类操作的字段
username = serializers.CharField()
password = serializers.CharField()
def validate(self, attrs):
user = self._authenticate(attrs)
payload = jwt_payload_handler(user)
token = jwt_encode_handler(payload)
self.object = {‘token‘: token, ‘user‘: user}
return attrs
def _authenticate(self, attrs):
username = attrs.get(‘username‘)
password = attrs.get(‘password‘)
if re.match(r‘.*@.*‘, username):
user = models.MyUser.objects.filter(email=username).first()
elif re.match(r‘^1[3-9][0-9]{9}$‘, username):
user = models.MyUser.objects.filter(mobile=username).first()
else:
user = models.MyUser.objects.filter(username=username).first()
if not user or not user.check_password(password):
raise serializers.ValidationError({‘detail‘: ‘用户信息错误‘})
return user
‘‘‘
‘‘‘
# ...\Lib\site-packages\rest_framework_jwt\settings.py
...
from django.conf import settings # django的settings对象作了配置的插拔式设计
from rest_framework.settings import APISettings
USER_SETTINGS = getattr(settings, ‘JWT_AUTH‘, None)
...
api_settings = APISettings(USER_SETTINGS, DEFAULTS, IMPORT_STRINGS)
‘‘‘
‘‘‘
# ...\d_proj\api\urls.py
...
from rest_framework_jwt.views import VerifyJSONWebToken, RefreshJSONWebToken,
urlpatterns = [
url(r‘refresh/$‘, VerifyJSONWebToken.as_view()),
url(r‘verify/$‘, RefreshJSONWebToken.as_view()),
]
# ...\d_proj\d_proj\settings.py
...
import datetime
JWT_AUTH = {
‘JWT_EXPIRATION_DELTA‘: datetime.timedelta(seconds=300), # token校验的过期时间, 每300秒必须刷新一次
‘JWT_ALLOW_REFRESH‘: True, # 允许刷新
‘JWT_REFRESH_EXPIRATION_DELTA‘: datetime.timedelta(days=7), # token刷新的过期时间, 超过七天无法刷新, 刷新算法包含校验算法
}
‘‘‘
步骤:
‘‘‘
# ...\d_proj\d_proj\settings.py
REST_FRAMEWORK = {
...,
‘DEFAULT_AUTHENTICATION_CLASSES‘: [
‘rest_framework_jwt.authentication.JSONWebTokenAuthentication‘
],
...,
}
‘‘‘
自定义auth模块绑定的用户表, drf身份认证, drf配置文件, drf内置的身份认证类, drf-jwt
原文:https://www.cnblogs.com/-406454833/p/12708899.html