首页 > 其他 > 详细

drf serializer 校验输入字段及添加返回字段

时间:2020-01-07 18:36:31      阅读:86      评论:0      收藏:0      [点我收藏+]

serializer.py

技术分享图片

 

 models.py

技术分享图片

 

 view.py

技术分享图片

 

 

 

serializer.py

class AnswerListSerializer(serializers.ModelSerializer):
    question_id = serializers.IntegerField(source=question.id, required=True)
    video_url = serializers.CharField(source=get_video_url, required=False)

    class Meta:
        model = Answer
        fields = (
            id,
            question_id,
            body,
            video_url,
            rtmp_key,
        )
        extra_kwargs = {
            id: {
                read_only: True,
            },
            body: {
                required: False,
            },
            rtmp_key: {
                required: False,
            },
        }

    def validate(self, data):
        rtmp_key = data.get(rtmp_key, None)
        video_url = data.pop(get_video_url, None)
        content = data.get(content,None)
        check_type = data.get(check_type,None)
        check_data = data.get(check_data,None)
        if not all([content,check_data,check_type]):
            raise BadRequestException(msg="Missed content or check_type or check_data")
        if rtmp_key is None and video_url is None:
            raise BadRequestException(msg="Missed rtmp_key or video_url")

        if rtmp_key is not None and video_url is not None:
            raise BadRequestException(msg="Can not set rtmp_key and video_url at same time")

        question_id = int(data.pop(question)[id])

        try:
            question = Question.objects.get(
                id=question_id,
            )
        except Exception:
            raise BadRequestException(msg="Question object with %d not exists" % question_id)
            # raise serializers.ValidationError("question object with %d not exists" % question_id) # noqa

        data[question] = question
        data[video_url] = video_url
        return data

    def create(self, validated_data):
        create_data = {
            application: validated_data.get(application, None),
            user: validated_data.get(user, None),
            question: validated_data.get(question, None)
        }
        try:
            answer, created = Answer.objects.get_or_create(**create_data)
        except Exception as e:
            print str(e)
            raise DatabaseErrorException(Database error when create answer)

        video_url = validated_data.get(video_url, None)
        if video_url:
            answer.video_url = video_url
        body = validated_data.get(body, None)
        rtmp_key = validated_data.get(rtmp_key, None)
        content = validated_data.get(content, None),
        check_type = validated_data.get(check_type, None),
        check_data = validated_data.get(check_data, None)
        if rtmp_key:
            answer.rtmp_key = rtmp_key
        if body:
            answer.body = body
        if content:
            answer.content = content
        if check_type:
            answer.check_type = check_type
        if check_data:
            answer.check_data = check_data
        answer.save()

        return answer

 

models.py

class Answer(models.Model):
    user = models.ForeignKey(
        User,
        related_name=answers,
        null=True,
    )
    application = models.ForeignKey(
        Application,
        related_name=answers,
        null=True,
    )
    question = models.ForeignKey(
        Question,
        related_name=answers,
    )
    created_date = models.DateTimeField(
        auto_now_add=True,
    )
    update_date = models.DateTimeField(
        auto_now=True,
    )
    body = models.CharField(
        max_length=10240,
        null=True,
        blank=True,
    )
    video_url = models.CharField(
        max_length=1024,
        null=True,
        blank=True,
    )
    unified_video_url = models.CharField(
        max_length=1024,
        null=True,
        blank=True,
    )
    rtmp_key = models.CharField(
        max_length=256,
        null=True,
        blank=True,
    )
    audio_url = models.CharField(
        max_length=1024,
        null=True,
        blank=True,
    )
    ANSWER_RATING_SCORE_DEFAULT = -1
    ANSWER_RATING_SCORE_ACCEPT_MIN = 0
    ANSWER_RATING_SCORE_ACCEPT_MAX = 5
    rating_score = models.IntegerField(
        default=ANSWER_RATING_SCORE_DEFAULT,
        validators=[MinValueValidator(ANSWER_RATING_SCORE_DEFAULT),
                    MaxValueValidator(ANSWER_RATING_SCORE_ACCEPT_MAX)],
    )
    rater = models.ForeignKey(
        User,
        related_name=rating_applications,
        null=True,
        blank=True,
    )

    ANSWER_FRAME_STATUS_NOT_YET = 0
    ANSWER_FRAME_STATUS_FAILED = 1
    ANSWER_FRAME_STATUS_SUCCESS = 2
    ANSWER_FRAME_STATUS_CHOICES = (
        (ANSWER_FRAME_STATUS_NOT_YET, Not Yet),
        (ANSWER_FRAME_STATUS_FAILED, Failed),
        (ANSWER_FRAME_STATUS_SUCCESS, Success),
    )
    answer_frame_status = models.PositiveSmallIntegerField(
        default=ANSWER_FRAME_STATUS_NOT_YET,
        choices=ANSWER_FRAME_STATUS_CHOICES,
    )
    content = models.TextField(
        verbose_name="test content",
        default=""
    )
    APPLICATION_TYPE_SIMPLE = 0
    APPLICATION_TYPE_MANY = 1
    APPLICATION_TYPE_CHOICES = (
        (APPLICATION_TYPE_SIMPLE, simple select),
        (APPLICATION_TYPE_MANY, many select),
    )
    check_type = models.PositiveSmallIntegerField(
        choices=APPLICATION_TYPE_CHOICES,
        default=APPLICATION_TYPE_SIMPLE,
    )
    check_data = models.CharField(
        max_length=10,
        verbose_name="选择结果",
        default="",
        blank=True
    )

    def __unicode__(self):
        return %s-%s-%s % (self.user.username, self.question.title, self.application.position.title)

    @classmethod
    def is_a_valid_rating_score(_cls, rating_score):
        return rating_score in range(_cls.ANSWER_RATING_SCORE_ACCEPT_MIN,
                                     _cls.ANSWER_RATING_SCORE_ACCEPT_MAX + 1)

    class Meta:
        unique_together = (user, application, question, )

    def get_video_url(self):
        # use video_url as first priority
        # if user upload video via rtmp, rtmp playlist key will first store in "rtmp_key" field
        # then, rtmp playlist will be transcoded into mp4 format and store in "video_url" field
        if self.video_url is not None and self.video_url != ‘‘:
            return self.video_url

        if self.rtmp_key is not None and self.rtmp_key != ‘‘:
            url = ‘‘
            try:
                channel, key = tuple(self.rtmp_key.split(/))
                url = get_signed_playlist_url(channel, key)
            except Exception:
                pass

            return url

        return ‘‘

    def get_mp4_video_url(self):
        if self.video_url is None or self.video_url == ‘‘:
            return None

        return self.video_url

    def get_rtmp_key(self):
        if self.rtmp_key is None or self.rtmp_key == ‘‘:
            return None

        return self.rtmp_key

    def get_author_obj(self):
        return self.application.user

    def get_question_id(self):
        return self.question.id

    def get_question_title(self):
        return self.question.title

    def get_question_competency_name(self):
        return self.question.get_competency_name()

    def get_latest_emotion_score(self):
        if self.emotion_scores.count() > 0:
            return self.emotion_scores.all().order_by(-created_date)[0]
        return None

    def get_emotion_score_overall(self):
        emotion_score = self.get_latest_emotion_score()
        if emotion_score:
            return emotion_score.overall

        return None

    def get_frame_score(self):
        emotion_score = self.get_latest_emotion_score()
        if emotion_score:
            return emotion_score.frame_score

        return {}

    def is_answer_frame_status_success(self):
        return self.answer_frame_status == Answer.ANSWER_FRAME_STATUS_SUCCESS

    def set_frame_status(self, frame_status):
        self.answer_frame_status = frame_status
        self.save()

    def set_frame_status_fail(self):
        return self.set_frame_status(Answer.ANSWER_FRAME_STATUS_FAILED)

    def set_frame_status_success(self):
        return self.set_frame_status(Answer.ANSWER_FRAME_STATUS_SUCCESS)

    def get_no_emotion_data_frames(self):
        if getattr(self, video_frames, None) is None:
            return None

        return self.video_frames.filter(azure_fer=None)

    def get_lang(self):
        return self.get_question().get_answer_lang()

    def get_question(self):
        return self.question

    # def can_emotion_score_be_calculated(self):
    #     frames = self.video_frames.all()
    #     for frame in frames:
    #         if frame.azure_emotion_result is None:
    #             return False
    #
    #     if self.get_question_template_competency_emotion_factor is None:
    #         return False
    #
    #     return True

    def get_frames(self):
        return self.video_frames.all().order_by(ts_index)

    def get_emotion_raw(self):
        frames = self.get_frames()
        emotion_raw = []
        last_non_empty_emotion_dict = {}
        for frame in frames:
            current_frame_emotion_dict = frame.azure_emotion_result
            if current_frame_emotion_dict is None:
                return []

            if not current_frame_emotion_dict:
                if not last_non_empty_emotion_dict:
                    current_frame_emotion_dict = {uneutral: 0.0, usadness: 0.00, uhappiness: 0.0, udisgust: 0.0, uanger: 0.0, usurprise: 0.0, ufear: 0.0, ucontempt: 0.0}
                else:
                    current_frame_emotion_dict = last_non_empty_emotion_dict
            else:
                last_non_empty_emotion_dict = current_frame_emotion_dict

            emotion_raw.append(current_frame_emotion_dict)

        return emotion_raw

    def get_question_template_competency_emotion_factor(self):
        return self.question.get_competency_emotion_factor()

    def calculate_emotion_score(self):
        frames = self.get_frames()
        for frame in frames:
            if frame.azure_emotion_result is None:
                return {}
        worker_logger.debug(frame is ok)
        q_c_e_factor = self.get_question_template_competency_emotion_factor()
        if q_c_e_factor is None:
            return {}
        worker_logger.debug(q_c_e_factor is ok)
        positive_score_list = []
        negative_score_list = []
        positive_factor = dict_to_list_by_sorted_key(q_c_e_factor.positive)
        positive_factor_np = np.array(positive_factor)
        negative_factor = dict_to_list_by_sorted_key(q_c_e_factor.negative)
        negative_factor_np = np.array(negative_factor)

        last_non_empty_emotion_sorted = []
        for frame in frames:
            frame_emotion_data = dict_to_list_by_sorted_key(frame.azure_emotion_result)
            if len(frame_emotion_data) != settings.AZURE_EMOTION_DATA_DIM:
                if not last_non_empty_emotion_sorted:
                    frame_emotion_data = [0] * settings.AZURE_EMOTION_DATA_DIM
                else:
                    frame_emotion_data = last_non_empty_emotion_sorted
            else:
                last_non_empty_emotion_sorted = frame_emotion_data

            e_data_np = np.array([frame_emotion_data])
            positive_score_list.append(np.around(np.dot(e_data_np, positive_factor_np)[0], decimals=2))
            negative_score_list.append(np.around(np.dot(e_data_np, negative_factor_np)[0], decimals=2))

        return {
            positive: positive_score_list,
            negative: negative_score_list,
        }

    def get_question_template_emotion_competency_model(self):
        return self.question.get_emotion_competency_model()

    def get_question_template_nlp_competency_model(self):
        return self.question.get_nlp_competency_model()

    def get_question_competency(self):
        return self.question.get_competency()

    def get_reference_position(self):
        return self.question.get_refer_position()

    def get_position(self):
        return self.question.get_position()

    def get_question_competency_wmd_reference(self):
        position = self.get_position()
        if not position:
            return None

        competency = self.get_question_competency()
        if not competency:
            return None

        from leapin.nlp.models import CompetencyWmdReference
        com_wmd_ref = None
        try:
            com_wmd_ref = CompetencyWmdReference.objects.get(
                position=position,
                competency=competency
            )
        except Exception:
            pass

        return com_wmd_ref

    def get_competency_score(self):
        scores = self.competency_scores.all()
        if scores.count() > 0:
            return scores[0]

        return None

    def score(self, model_type_relation):
        worker_logger.debug(Answer %d Competency Score: start % self.id)
        competency = self.get_question_competency()
        if competency is None:
            worker_logger.debug(Answer %d Competency Score: no competency found % self.id)
            worker_logger.debug(Answer %d Competency Score: end % self.id)
            return {}

        from leapin.leapinbox.models import AnswerCompetencyScore
        competency_score = None
        try:
            competency_score = AnswerCompetencyScore.objects.get(
                answer=self,
                competency=competency
            )
        except ObjectDoesNotExist:
            emotion_score = 0.0
            nlp_score = 0.0
            for model_type in model_type_relation.keys():
                if model_type == emotion:
                    worker_logger.debug(Answer %d Emotion Competency Score: start % self.id)
                    emotion_score = self.score_competency_emotion()
                    if emotion_score is None:
                        worker_logger.debug(Answer %d Emotion Competency Score: error % self.id)
                        return {}
                    worker_logger.debug(Answer %d Emotion Competency Score: end % self.id)
                elif model_type == nlp:
                    worker_logger.debug(Answer %d NLP Competency Score: start % self.id)
                    nlp_score = self.score_competency_nlp()
                    if nlp_score is None:
                        worker_logger.debug(Answer %d NLP Competency Score: error % self.id)
                        return {}
                    worker_logger.debug(Answer %d NLP Competency Score: end % self.id)

            competency_score = AnswerCompetencyScore.objects.create(
                answer=self,
                competency=competency,
                emotion_score=emotion_score,
                nlp_score=nlp_score
            )

        return competency_score

    def score_competency(self):
        worker_logger.debug(Answer %d Competency Score: start % self.id)
        competency = self.get_question_competency()
        if competency is None:
            worker_logger.debug(Answer %d Competency Score: no competency found % self.id)
            worker_logger.debug(Answer %d Competency Score: end % self.id)
            return {}

        worker_logger.debug(Answer %d Emotion Competency Score: start % self.id)
        emotion_score = self.score_competency_emotion()
        worker_logger.debug(Answer %d Emotion Competency Score: end % self.id)

        worker_logger.debug(Answer %d NLP Competency Score: start % self.id)
        nlp_score = self.score_competency_nlp()
        worker_logger.debug(Answer %d NLP Competency Score: end % self.id)

        return {
            competency: competency,
            emotion_score: emotion_score,
            nlp_score: nlp_score
        }

    def score_competency_emotion(self):
        frames = self.get_frames()
        frames_emotion_data_list = []
        for frame in frames:
            if frame.azure_emotion_result is None:
                return None
            if len(frame.azure_emotion_result) > 0:
                frames_emotion_data_list.append(frame.azure_emotion_result)
        worker_logger.debug(Answer %d Emotion Competency Score: answer frames is ok % self.id)
        q_c_s_m = self.get_question_template_emotion_competency_model()
        if q_c_s_m is None:
            return None
        worker_logger.debug(Answer %d Emotion Competency Score: q_c_s_m is ok % self.id)
        # e_data_list = np.array(frames_emotion_data_list)
        # input = np.mean(e_data_list, axis=0)
        score = q_c_s_m.score_competency(frames_emotion_data_list)
        return score

    def get_nlp_data(self):
        return getattr(self, nlp_data, None)

    def score_competency_nlp(self):
        answer_nlp_data_record = self.get_nlp_data()
        if not answer_nlp_data_record:
            return None

        answer_nlp_data = answer_nlp_data_record.get_data()
        if not answer_nlp_data:
            return None

        worker_logger.debug(Answer %d NLP Competency Score: answer nlp data is ok % self.id)
        q_c_s_m = self.get_question_template_nlp_competency_model()
        if q_c_s_m is None:
            return None
        worker_logger.debug(Answer %d NLP Competency Score: q_c_s_m is ok % self.id)

        score = q_c_s_m.score_competency(answer_nlp_data)
        return score


    def m3u8_to_mp4(self):
        if self.get_mp4_video_url() is not None:
            return self

        if not self.rtmp_key:
            raise RtmpKeyNotValid(rtmp_key is invalid)

        mp4_video_url = m3u8_to_mp4(self.rtmp_key)
        self.video_url = mp4_video_url
        self.save()

        return self

    def convert_video_audio_to_mp3(self):
        if oss_object_exists(self.unified_video_url):
            return self

        if not self.video_url:
            raise VideoUrlNotValid(video url is invalid)

        unified_video_url = unified_video(self.video_url)
        self.unified_video_url = unified_video_url
        self.save()

        return self

    def frame_video(self):
        if self.is_answer_frame_status_success():
            return self

        if not self.unified_video_url:
            raise VideoUrlNotValid(unified video url is invalid)

        prefix = application-%d/answer-%d % (self.application.id, self.id)
        frame_list = mts_frame_video(self.unified_video_url, prefix)
        if len(frame_list) == 0:
            self.set_frame_status_fail()
            raise FrameVideoRequestFail(frame video api fail)

        from leapin.emotion.models import (
            AnswerEmotionFrame,
        )
        ts_index = 1
        for frame in frame_list:
            answer_emotion_frame, created = AnswerEmotionFrame.objects.get_or_create(
                answer=self,
                ts_index=ts_index,
            )
            answer_emotion_frame.aliyun_oss_key = frame
            answer_emotion_frame.save()
            ts_index = ts_index + 1

        self.set_frame_status_success()
        return self

    def gen_frames_data(self):
        if not self.is_answer_frame_status_success():
            raise AnswerFrameNotValid(answer has invalid frames)

        answer_frames = self.get_no_emotion_data_frames()
        if answer_frames is None:
            return self

        for frame in answer_frames:
            if frame.is_emotion_data_got():
                continue

            emotion_api_type = free
            position_settings = self.get_question().get_position().get_position_settings()
            if position_settings:
                api_type = position_settings.emotion_api_type

            face_info = get_frame_face_info(
                get_oss_object_signed_url(frame.aliyun_oss_key),
                api_type=emotion_api_type
            )
            face_attr = face_info.get(faceAttributes, {})
            emotion = face_attr.get(emotion, {})
            frame.azure_fer = face_info
            frame.azure_emotion_result = emotion
            frame.save()

        return self

    def video_to_audio(self):
        if not self.unified_video_url:
            raise VideoUrlNotValid(unified video url is invalid)

        if self.audio_url:
            return self

        audio_url = interview_video_to_audio(self.unified_video_url)
        self.audio_url = audio_url
        self.save()
        return self

    def audio_to_text(self):
        if not self.audio_url:
            raise VideoUrlNotValid(audio url is invalid)

        if self.body:
            return self

        self.body = speech2text(self.audio_url, self.get_lang())
        self.save()
        return self

    def get_body_lines(self):
        spliter = .
        if self.get_lang() == zh:
            spliter = u

        return self.body.split(spliter)

    def get_wmd_reference_score(self, body_lines):
        wmd_reference = self.get_question_competency_wmd_reference()
        if not wmd_reference:
            return None

        return wmd_reference.get_wmd_reference_score(body_lines)

    def gen_nlp_data(self):
        positon = self.get_position()
        if not positon.is_nlp_support():
            return

        if not self.body:
            raise Exception(answer body is none)

        sentimental = sentence_setimental(self.body, self.get_lang())
        s_length = sentence_length(self.body, self.get_lang())
        s_ratio = 1.0
        answer_video_length = self.get_frames().count()
        if answer_video_length > 0:
            s_ratio = float(s_length)/float(answer_video_length)

        body_lines = self.get_body_lines()
        wmd_reference_score = self.get_wmd_reference_score(body_lines)
        if not wmd_reference_score:
            raise Exception(wmd reference score error. Answer %d % self.id)

        from leapin.nlp.models import AnswerNLP
        a_nlp, created = AnswerNLP.objects.get_or_create(answer=self)
        a_nlp.sentimental_score = sentimental
        a_nlp.words_length = s_length
        a_nlp.words_ratio = s_ratio
        a_nlp.wmd_instance = wmd_reference_score
        a_nlp.save()
        return self

views.py

class AnswerListView(generics.ListCreateAPIView):
    serializer_class = AnswerListSerializer
    permission_classes = (IsAuthenticated, IsCandidate,)

    def get_queryset(self):
        user = self.request.user
        application_id = self.kwargs[application_id]
        return Answer.objects.filter(user=user, application_id=application_id)

    def perform_create(self, serializer):
        application_id = self.kwargs[application_id]
        question_id = self.request.data.get(question_id)
        try:
            application = Application.objects.get(id=int(application_id))
        except Exception:
            raise BadRequestException(Application %s not exists % str(application_id))
        if not application.is_position_contains_question_by_id(question_id):
            raise BadRequestException(Question %s is not belong to position of Application %s %
                                      (str(question_id),
                                       str(application.id)))

        serializer.save(user=self.request.user, application=application)

    def post(self, request, *args, **kwargs):
        """
        <h4>Introduction</h4>
        <p>接口用于应聘者按问题提交answer</p>
        <br>
        <h4>Request</h4>
        <ul>
            <li> question_id - answer对应的问题id </li>
            <li> video_url - 直接上传oss的video地址 </li>
            <li> rtmp_key - 推流的oss点播文件key(前端缓存的) </li>
        </ul>
        <p>其中,video_url和rtmp_key是互斥参数,两者同时只能有一个。</p>
        <br>
        <h4>Response</h4>
        <p>返回中的video_url后端已做处理:</p>
        <ul>
            <li> 如果是使用直接上传oss的,则该字段返回video的url </li>
            <li> 如果是使用推流上传的,则该字段返回点播文件的加密播放地址,有效期为12个小时 </li>
        </ul>
        """
        video_url = request.data.get(video_url, None)
        rtmp_key = request.data.get(rtmp_key, None)
        rtmp_playlist = request.user.get_oss_rtmp_playlist_by_rtmp_key(rtmp_key)
        if rtmp_playlist is None and video_url is None:
            raise BadRequestException(invalid rtmp_key)

        return self.create(request, *args, **kwargs)

    def get(self, request, *args, **kwargs):
        return self.list(request, *args, **kwargs)

 

urls.py

    url(^application/(?P<application_id>\d+)/answers/$, csrf_exempt(views.AnswerListView.as_view())),  # noqa

drf serializer 校验输入字段及添加返回字段

原文:https://www.cnblogs.com/tangda/p/12162889.html

(0)
(0)
   
举报
评论 一句话评论(0
关于我们 - 联系我们 - 留言反馈 - 联系我们:wmxa8@hotmail.com
© 2014 bubuko.com 版权所有
打开技术之扣,分享程序人生!