python-3.xdjangodjango-registration

Django Rest Framework:How to calculate percentage of video duration?


In my project, I have a video section in which I want to calculate the percentage of time the user will watch. Through the below URL can access the details of video

URL : video/video_id output:

"video": {
                "id": "84e7288c-dc09-44aa-850c-08546a98ffde",
                "deleted": null,
                "datetime_created": "02/04/2022 06:56 AM",
                "datetime_updated": "02/04/2022 06:56 AM",
                "video_name": "video name3",
                "description": "description about video",
                "duration": "00:33:20",
                "create_date": "02/04/2022 06:56 AM",
                "video_type": "micro",
                "file_url": "https://vimeo.com/216763352",
                "general_status": "high",
                "review_status": "draft",
                "video_number": "VD6129",
                "created_by": null
            },

"duration": "00:33:20" is the total duration of video. How to calculate the percentage of video time that the user is watching, if passing the time in total seconds like

{
    "time":200
}

Solution

  • Well, I have recently done something like that, my task was to calculate the video watched time and give points accordingly. I was also asked not to give percentage and points for watched video time. I used merge algorithm to merge overlapping intervals. You can customize the code for your task requirements, because you may not be asked to give user points and make unique 100% accurate calculation like I did, here is what I have done:

    models.py

    from django.db import models
    from django.apps import apps
    from django.db.models import Sum, F, Subquery
    from django.db.models.functions import Coalesce
    from django.utils.translation import ugettext_lazy as _
    from django.dispatch import receiver
    from django.db.models.signals import post_save
    
    from common.models import BaseUser
    from helpers.models import BaseModel
    
    
    class LessonTypeChoices(models.TextChoices):
        video = "video", _("Video")
        task = "task", _("Task")
        exam = "exam", _("Exam")
        book = "book", _("Book")
        audiobook = "audiobook", _("Audio book")
    
    
    class Lesson(BaseModel):
        course = models.ForeignKey(
            "courses.Course",
            on_delete=models.CASCADE,
            related_name="lessons",
            verbose_name=_("course"),
        )
        type = models.CharField(_("type"), max_length=32, choices=LessonTypeChoices.choices)
        title = models.CharField(_("title"), max_length=256)
        description = models.TextField(_("description"))
        points = models.IntegerField(_("points"), default=0)
        order = models.IntegerField(_("order"), default=0)
    
        def __str__(self):
            return f"{self.title} - {self.type}"
    
        class Meta:
            db_table = "lesson"
            verbose_name = _("lesson")
            verbose_name_plural = _("lessons")
    
    
    @receiver(post_save, sender=Lesson)
    def add_course_point(sender, instance, created, **kwargs):
        """
        Increment course points by lesson points when lesson is created
        """
        Course = apps.get_model("courses.Course")
        Course.objects.filter(id=instance.course_id).update(
            points=Subquery(
                Course.objects.filter(id=instance.course_id)
                .annotate(sum_points=Sum("lessons__points"))
                .values("sum_points")
            )
        )
    
    class LessonProgress(BaseModel):
        lesson = models.ForeignKey(
            Lesson,
            on_delete=models.CASCADE,
            related_name="lesson_progress",
            verbose_name=_("lesson"),
        )
        user = models.ForeignKey(
            BaseUser,
            on_delete=models.CASCADE,
            related_name="lesson_progress",
            verbose_name=_("user"),
        )
        type = models.CharField(_("type"), max_length=32, choices=LessonTypeChoices.choices)
        percentage = models.FloatField(_("progress percentage"), default=0)
        points = models.FloatField(_("points"), default=0)
        is_locked = models.BooleanField(_("lesson is locked"), default=True)
        is_completed = models.BooleanField(_("lesson is completed"), default=False)
        last_progress = models.IntegerField(_("last progress"), default=0)
    
        class Meta:
            unique_together = ("lesson", "user")
            db_table = "lesson_progress"
            verbose_name = _("lesson progress")
            verbose_name_plural = _("lesson progress")
    
        def set_locked(self, instance, user):
            """
            Update lessons' locked status
            """
            percentage = self.get_percentage(instance, user)
    
            for index, lesson in enumerate(instance.lessons.all().order_by("order")):
                if index in [0, 1] or percentage / ((index + 1) - 2) >= 80:
                    lesson.lesson_progress.update(is_locked=False)
                else:
                    lesson.lesson_progress.update(is_locked=True)
    
        @staticmethod
        def get_percentage(instance, user):
            """
            Calculate average percentage of completed lessons
            """
            percentage = (
                instance.lessons.all()
                .aggregate(
                    percentage=Coalesce(
                        Sum(
                            "lesson_progress__percentage",
                            filter=models.Q(lesson_progress__user=user),
                        ),
                        0,
                    )
                )
                .get("percentage")
            )
            return percentage
    
        @staticmethod
        def is_complete(percentage):
            """
            decide whether lesson is completed or not
            """
            if percentage >= 90:
                return True
            return False
    
        @staticmethod
        def calculate_percentage(length, progress):
            """
            Calculate the watched video or submitted exam answer percentage
            """
            return progress / length * 100
    
        @staticmethod
        def calculate_points(
            instance,
            percentage,
        ):
            """
            Calculate earned points by lesson progress percentage
            """
            return instance.points / 100 * percentage
    
        @staticmethod
        def user_points(user, points, is_expired):
            """
            Add points to user
            """
            if not is_expired:
                user.score += points
                user.save()
    
        @staticmethod
        def task_progress_points(instance, percentage, is_completed, points, is_expired):
            """
            Update task lesson progress -> percentage, is_completed, points
            """
            instance.percentage += percentage
            instance.is_completed = is_completed
    
            if not is_expired:
                instance.points += points
    
            instance.save()
    
        @staticmethod
        def exam_video_progress_points(
            instance, percentage, last_progress, is_completed, points, is_expired
        ):
            """
            Update exam video lesson progress -> percentage, last_progress, is_completed, points
            """
            instance.percentage += percentage
            instance.last_progress = last_progress
            instance.is_completed = is_completed
    
            if not is_expired:
                instance.points += points
    
            instance.save()
    
        @staticmethod
        def merge(intervals, progress_before):
            """
            merge queryset with each other, remove overlapping intervals and create
            """
            bulk_list = []
    
            for progress in intervals:
                # merge overlapping intervals
                if bulk_list and progress.start_progress <= bulk_list[-1].end_progress:
                    bulk_list[-1].end_progress = max(
                        bulk_list[-1].end_progress, progress.end_progress
                    )
                    bulk_list[-1].progress = (
                        max(bulk_list[-1].end_progress, progress.end_progress)
                        - bulk_list[0].start_progress
                    )
                    bulk_list[-1].video_length = max(
                        bulk_list[-1].video_length, progress.video_length
                    )
    
                else:
                    # add merged interval object to bulk_list
                    bulk_list.append(
                        LessonVideoProgress(
                            lesson_progress_id=progress.lesson_progress_id,
                            video_length=progress.video_length,
                            start_progress=progress.start_progress,
                            end_progress=progress.end_progress,
                            progress=progress.end_progress - progress.start_progress,
                        )
                    )
    
            # delete all interval objects
            intervals.delete()
    
            # create intervals from bulk_list
            LessonVideoProgress.objects.bulk_create(bulk_list)
    
            # calculate the duration seconds of new intervals
            progress_after = intervals.aggregate(progress_sum=Sum("progress")).get(
                "progress_sum"
            )
    
            # get the watched duration seconds -> subtracting old progress seconds by new progress seconds
            return float(progress_after - progress_before)
    
        @staticmethod
        def progress_before_merge(queryset):
            # calculate the progress duration seconds sum before creating new video progress instance
            return queryset.aggregate(progress_sum=Coalesce(Sum("progress"), 0)).get(
                "progress_sum"
            )
    
        @staticmethod
        def create_lesson_video_progress(
            progress_id, video_length, start_progress, end_progress
        ):
            # create new video progress instance
            LessonVideoProgress.objects.create(
                lesson_progress_id=progress_id,
                video_length=video_length,
                start_progress=start_progress,
                end_progress=end_progress,
            )
    
        @staticmethod
        def round_percentage(percentage, progress_percentage):
            # round the given percentages to 100 in case it exceeds it
            return percentage - ((progress_percentage + percentage) - 100)
    
    
    @receiver(post_save, sender=LessonProgress)
    def run_lessons(sender, instance, **kwargs):
        if instance.pk:
            instance.set_locked(instance.lesson.course, instance.user)
    
    
    class LessonVideoProgress(BaseModel):
        lesson_progress = models.ForeignKey(
            LessonProgress, on_delete=models.CASCADE, related_name="lesson_video_progress"
        )
        video_length = models.DecimalField(
            _("video length"), max_digits=19, decimal_places=3
        )
        progress = models.DecimalField(
            _("progress duration"), default=0, max_digits=19, decimal_places=3
        )
        start_progress = models.DecimalField(
            _("start progress"), max_digits=19, decimal_places=3
        )
        end_progress = models.DecimalField(
            _("end progress"), max_digits=19, decimal_places=3
        )
    
        class Meta:
            db_table = "lesson_video_progress"
            verbose_name = _("lesson video progress")
            verbose_name_plural = _("lesson video progress")
    
        def __str__(self):
            return f"{self.progress}"
    

    views.py

    class LessonVideoExamSubmitView(generics.GenericAPIView):
        queryset = LessonProgress.objects.all()
        serializer_class = serializers.LessonVideoExamSubmitSerializer
    
        def post(self, request, *args, **kwargs):
            serializer = self.serializer_class(
                data=request.data, context={"request": request}
            )
            serializer.is_valid(raise_exception=True)
            serializer.save()
            return Response(serializer.data)
    

    serializers.py

    class LessonVideoExamSubmitSerializer(serializers.Serializer):
        lesson_id = serializers.IntegerField()
        video_length = serializers.FloatField()
        start_progress = serializers.FloatField()
        end_progress = serializers.FloatField()
        is_expired = serializers.BooleanField(default=False)
    
        def create(self, validated_data):
            # get lesson progress for given lesson id
            progress = LessonProgress.objects.get(
                lesson_id=validated_data.get("lesson_id"), user=self.context["request"].user
            )
    
            # get the current video progress before merging with other progresses
            progress_before_merge = progress.progress_before_merge(
                progress.lesson_video_progress
            )
    
        # create new video progress for given intervals
        progress.create_lesson_video_progress(
            progress.id,
            validated_data.get("video_length"),
            validated_data.get("start_progress"),
            validated_data.get("end_progress"),
        )
    
        # merge created video progress objects with existing objects in the database
        progress_merge = progress.merge(
            progress.lesson_video_progress.all().order_by("start_progress"),
            progress_before_merge,
        )
    
        # pass video length and merged objects duration in seconds and get watched video percentage
        percentage = progress.calculate_percentage(
            validated_data.get("video_length"), progress_merge
        )
    
        if progress.percentage + percentage >= 100:
            """
            add progress percentage and calculated percentage, round the sum to 100
            """
            percentage = progress.round_percentage(percentage, progress.percentage)
    
        # get points to the lesson by passing the percentage
        points = progress.calculate_points(progress.lesson, percentage)
    
        # add points to user
        progress.user_points(progress.user, points, validated_data.get("is_expired"))
    
        # get completed status by sum of progress percentages
        is_completed = progress.is_complete(progress.percentage + percentage)
    
        # update lesson progress object
        progress.exam_video_progress_points(
            progress,
            percentage,
            validated_data.get("end_progress"),
            is_completed,
            points,
            validated_data.get("is_expired"),
        )
    
        return validated_data
    

    NOTE that you can optimize and remove unnecessary parts for your own video calculation logic. This can also be used for audio calculation logic. I don't usually share source code, but just because I really want to help, I did it, cheers)