from ipaddress import ip_network import dateutil.parser from utils.api import APIView, validate_serializer from utils.cache import cache from utils.constants import CacheKey from account.decorators import check_contest_permission, ensure_created_by from ..models import Contest, ContestAnnouncement, ACMContestRank from ..serializers import (ContestAnnouncementSerializer, ContestAdminSerializer, CreateConetestSeriaizer, CreateContestAnnouncementSerializer, EditConetestSeriaizer, EditContestAnnouncementSerializer, ACMContesHelperSerializer, ) class ContestAPI(APIView): @validate_serializer(CreateConetestSeriaizer) def post(self, request): data = request.data data["start_time"] = dateutil.parser.parse(data["start_time"]) data["end_time"] = dateutil.parser.parse(data["end_time"]) data["created_by"] = request.user if data["end_time"] <= data["start_time"]: return self.error("Start time must occur earlier than end time") if data.get("password") and data["password"] == "": data["password"] = None for ip_range in data["allowed_ip_ranges"]: try: ip_network(ip_range, strict=False) except ValueError: return self.error(f"{ip_range} is not a valid cidr network") contest = Contest.objects.create(**data) return self.success(ContestAdminSerializer(contest).data) @validate_serializer(EditConetestSeriaizer) def put(self, request): data = request.data try: contest = Contest.objects.get(id=data.pop("id")) ensure_created_by(contest, request.user) except Contest.DoesNotExist: return self.error("Contest does not exist") data["start_time"] = dateutil.parser.parse(data["start_time"]) data["end_time"] = dateutil.parser.parse(data["end_time"]) if data["end_time"] <= data["start_time"]: return self.error("Start time must occur earlier than end time") if not data["password"]: data["password"] = None for ip_range in data["allowed_ip_ranges"]: try: ip_network(ip_range, strict=False) except ValueError as e: return self.error(f"{ip_range} is not a valid cidr network") if not contest.real_time_rank and data.get("real_time_rank"): cache_key = f"{CacheKey.contest_rank_cache}:{contest.id}" cache.delete(cache_key) for k, v in data.items(): setattr(contest, k, v) contest.save() return self.success(ContestAdminSerializer(contest).data) def get(self, request): contest_id = request.GET.get("id") if contest_id: try: contest = Contest.objects.get(id=contest_id) ensure_created_by(contest, request.user) return self.success(ContestAdminSerializer(contest).data) except Contest.DoesNotExist: return self.error("Contest does not exist") contests = Contest.objects.all().order_by("-create_time") if request.user.is_admin(): contests = contests.filter(created_by=request.user) keyword = request.GET.get("keyword") if keyword: contests = contests.filter(title__contains=keyword) return self.success(self.paginate_data(request, contests, ContestAdminSerializer)) class ContestAnnouncementAPI(APIView): @validate_serializer(CreateContestAnnouncementSerializer) def post(self, request): """ Create one contest_announcement. """ data = request.data try: contest = Contest.objects.get(id=data.pop("contest_id")) ensure_created_by(contest, request.user) data["contest"] = contest data["created_by"] = request.user except Contest.DoesNotExist: return self.error("Contest does not exist") announcement = ContestAnnouncement.objects.create(**data) return self.success(ContestAnnouncementSerializer(announcement).data) @validate_serializer(EditContestAnnouncementSerializer) def put(self, request): """ update contest_announcement """ data = request.data try: contest_announcement = ContestAnnouncement.objects.get(id=data.pop("id")) ensure_created_by(contest_announcement, request.user) except ContestAnnouncement.DoesNotExist: return self.error("Contest announcement does not exist") for k, v in data.items(): setattr(contest_announcement, k, v) contest_announcement.save() return self.success() def delete(self, request): """ Delete one contest_announcement. """ contest_announcement_id = request.GET.get("id") if contest_announcement_id: if request.user.is_admin(): ContestAnnouncement.objects.filter(id=contest_announcement_id, contest__created_by=request.user).delete() else: ContestAnnouncement.objects.filter(id=contest_announcement_id).delete() return self.success() def get(self, request): """ Get one contest_announcement or contest_announcement list. """ contest_announcement_id = request.GET.get("id") if contest_announcement_id: try: contest_announcement = ContestAnnouncement.objects.get(id=contest_announcement_id) ensure_created_by(contest_announcement, request.user) return self.success(ContestAnnouncementSerializer(contest_announcement).data) except ContestAnnouncement.DoesNotExist: return self.error("Contest announcement does not exist") contest_id = request.GET.get("contest_id") if not contest_id: return self.error("Parameter error") contest_announcements = ContestAnnouncement.objects.filter(contest_id=contest_id) if request.user.is_admin(): contest_announcements = contest_announcements.filter(created_by=request.user) keyword = request.GET.get("keyword") if keyword: contest_announcements = contest_announcements.filter(title__contains=keyword) return self.success(ContestAnnouncementSerializer(contest_announcements, many=True).data) class ACMContestHelper(APIView): @check_contest_permission(check_type="ranks") def get(self, request): ranks = ACMContestRank.objects.filter(contest=self.contest, accepted_number__gt=0) \ .values("id", "user__username", "user__userprofile__real_name", "submission_info") results = [] for rank in ranks: for problem_id, info in rank["submission_info"].items(): if info["is_ac"]: results.append({ "id": rank["id"], "username": rank["user__username"], "real_name": rank["user__userprofile__real_name"], "problem_id": problem_id, "ac_info": info, "checked": info.get("checked", False) }) results.sort(key=lambda x: -x["ac_info"]["ac_time"]) return self.success(results) @check_contest_permission(check_type="ranks") @validate_serializer(ACMContesHelperSerializer) def put(self, request): data = request.data try: rank = ACMContestRank.objects.get(pk=data["rank_id"]) except ACMContestRank.DoesNotExist: return self.error("Rank id does not exist") problem_rank_status = rank.submission_info.get(data["problem_id"]) if not problem_rank_status: return self.error("Problem id does not exist") problem_rank_status["checked"] = data["checked"] rank.save(update_fields=("submission_info",)) return self.success()