1、session认证
..... login(request, user) #登录成功 # 登录之后获取获取最新的session_key session_key = request.session.session_key # 删除非当前用户session_key的记录 for session in Session.objects.filter(~Q(session_key=session_key), expire_date__gte=timezone.now()): data = session.get_decoded() if data.get(‘_auth_user_id‘, None) == str(request.user.id): session.delete()
2、jwt认证
如果是JWT认证模式,比较麻烦,个人的解决方法是,每个用户登录后,在redis中存储用户的jwt_token. key是用户的id,value是用户最新的jwt_token. 因为用的django-jwt库,所以这里定义了自己的LoginJWT, 继承JSONWebTokenAPIView.
主要代码改动,只是在登录后,把最新的jwt_token存入redis cache.
class LoginJWT(JSONWebTokenAPIView): serializer_class = JSONWebTokenSerializer def post(self, request, *args, **kwargs): serializer = self.get_serializer(data=request.data) if serializer.is_valid(): user = serializer.object.get(‘user‘) or request.user token = serializer.object.get(‘token‘) response_data = jwt_response_payload_handler(token, user, request) response = Response(response_data) if api_settings.JWT_AUTH_COOKIE: expiration = (datetime.utcnow() + api_settings.JWT_EXPIRATION_DELTA) response.set_cookie(api_settings.JWT_AUTH_COOKIE, token, expires=expiration, httponly=True) user.token = token user.save() k = "{}".format(user.id) cache.set(k, token, TOKEN_EXPIRE) return response return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
如果有推送机制的话,可以在此处添加推送机制,把新用户登录的消息推送给登录过的客户端。
这里我没采用推送,而是,在某个经常访问的DRF API中定义验证,验证当前请求带上来的jwt_token是否是redis中存储的最新的,如果不是,则返回用户,请求失败,请重新登录的消息。
比如在某个viewsets.GenericViewSet中,authentication_classes包含自定义的myJWTAuth
authentication_classes = (myJWTAuth, SessionAuthentication)
myJWTAuth代码:
class myJWTAuth(JSONWebTokenAuthentication): def authenticate(self, request): """ Returns a two-tuple of `User` and token if a valid signature has been supplied using JWT-based authentication. Otherwise returns `None`. """ jwt_value = self.get_jwt_value(request) if jwt_value is None: return None try: payload = jwt_decode_handler(jwt_value) except jwt.ExpiredSignature: msg = _(‘Signature has expired.‘) raise exceptions.AuthenticationFailed(msg) except jwt.DecodeError: msg = _(‘Error decoding signature.‘) raise exceptions.AuthenticationFailed(msg) except jwt.InvalidTokenError: raise exceptions.AuthenticationFailed() user = self.authenticate_credentials(payload) k = "{}".format(user.id) jwt_header=request.META.get(‘HTTP_AUTHORIZATION‘,None) try: jwt_str=jwt_header.split()[1] except: jwt_str="" if cache.has_key(k): cache_key = cache.get(k) if jwt_str==cache_key: cache.set(k, cache_key, TOKEN_EXPIRE) return (user, jwt_value) raise exceptions.AuthenticationFailed()
参考连接:https://blog.csdn.net/u014633966/article/details/85414656
原文:https://www.cnblogs.com/majianyu/p/10490264.html