# --*--coding:utf-8--*-- from django.conf.urls import patterns, url urlpatterns = patterns(‘myauth.login‘, url(r‘^login/$‘, ‘do_login‘, name=‘login‘), url(r‘^logout/$‘, ‘do_logout‘, name=‘logout‘), )
Jms_Users 是继承Django的AbstractUser类,且setting中指定了认证后台的User 模型为AUTH_USER_MODEL = ‘juser.User‘,所以无需再保存Django User对象(注释部分)。这里只是token验证后直接返回用户,真正的认证过程在login中实现
# --*--coding:utf-8--*-- # author: ArthurMok from django.contrib.auth.models import User as Contrib_Users from juser.models import User as Jms_Users from jumpserver.settings import DJANGO_AUTH_TOKEN from itsdangerous import TimedJSONWebSignatureSerializer as Serializer class MyAuthBackend(object): def authenticate(self, auth_token=None, token=None): s = Serializer(DJANGO_AUTH_TOKEN) username = s.loads(auth_token) try: user = Jms_Users.objects.get(username=username) except Jms_Users.DoesNotExist: return None else: # 用户已经通过统一接口认证,此处重写django的认证模块,使用token验证 if token == DJANGO_AUTH_TOKEN: # try: # contrib_user = Contrib_Users.objects.get(username=user.username) # except Contrib_Users.DoesNotExist: # # 当在django中无此用户,便创建 # contrib_user = Contrib_Users(username=user.username, password=auth_token) # contrib_user.is_staff = True # contrib_user.save() # return contrib_user return user else: return None def get_user(self, user_id): try: return Jms_Users.objects.get(pk=user_id) except Jms_Users.DoesNotExist: return None
调用认证接口SSO_URL实现用户的认证和获取用户信息,并保存用户信息。主要是 _add_user和do_login函数,其他为认证接口调用过程忽略不写。
def _add_user(request, username, name, mobile, email, department): jms_user = Jms_Users.objects.get(username=username) if jms_user: pass else: password = PyCrypt.gen_rand_pass(16) groups = [] admin_groups = [] role = ‘CU‘ uuid_r = uuid.uuid4().get_hex() ssh_key_pwd = PyCrypt.gen_rand_pass(16) if not email: email = username+‘@‘+EMAIL_DOMAIN is_active = True send_mail_need = True try: user = db_add_user(username=username, name=name, phone=mobile, department=department, password=password, email=email, role=role, uuid=uuid_r, groups=groups, admin_groups=admin_groups, ssh_key_pwd=ssh_key_pwd, is_active=is_active, date_joined=datetime.datetime.now()) server_add_user(username=username, ssh_key_pwd=ssh_key_pwd) user = get_object(Jms_Users, username=username) if groups: user_groups = [] for user_group_id in groups: user_groups.extend(UserGroup.objects.filter(id=user_group_id)) except IndexError, e: error = u‘添加用户 %s 失败 %s ‘ % (username, e) logger.error(error) return False else: user_add_mail(user, kwargs=locals()) msg = get_display_msg(user, password=password, ssh_key_pwd=ssh_key_pwd, send_mail_need=send_mail_need) logger.info(msg) return True
def do_login(request): local_login_url = ‘http://‘+request.get_host()+reverse(‘login‘) tmp_token = request.GET.get(‘token‘) # next_url = request.GET.get(‘next‘, ‘/‘) if request.user.is_authenticated(): return HttpResponseRedirect(reverse(‘index‘, args=())) else: if tmp_token: token = _sso_token(request, tmp_token) if token: user_info = _sso_user_info(request, token) # 认证通过并获取用户信息 if user_info: _add_user(request, user_info[‘um‘], user_info[‘name‘], user_info[‘mobile‘], user_info[‘email‘], user_info[‘department‘]) s = Serializer(DJANGO_AUTH_TOKEN) auth_token = s.dumps(user_info[‘um‘]) contrib_user = authenticate(auth_token=auth_token, token=DJANGO_AUTH_TOKEN) login(request, contrib_user) #登录 if contrib_user.role == ‘SU‘: request.session[‘role_id‘] = 2 elif contrib_user.role == ‘GA‘: request.session[‘role_id‘] = 1 else: request.session[‘role_id‘] = 0 return HttpResponseRedirect(reverse(‘index‘, args=())) sso_login_url = SSO_LOGIN_URL % (SSO_URL, local_login_url) return HttpResponseRedirect(sso_login_url)
重新开发Jumpserver用户认证模块,调用独立认证接口(二)
原文:http://www.cnblogs.com/mageguoshi/p/5755154.html