diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 00000000..96ae5ca5 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +venv +.idea +.git +.DS_Store diff --git a/judge/dispatcher.py b/judge/dispatcher.py index 5466df85..65bc11db 100644 --- a/judge/dispatcher.py +++ b/judge/dispatcher.py @@ -4,7 +4,7 @@ import logging from urllib.parse import urljoin import requests -from django.db import transaction +from django.db import transaction, IntegrityError from django.db.models import F from account.models import User @@ -29,6 +29,27 @@ def process_pending_task(): judge_task.send(**data) +class ChooseJudgeServer: + def __init__(self): + self.server = None + + def __enter__(self) -> [JudgeServer, None]: + with transaction.atomic(): + servers = JudgeServer.objects.select_for_update().filter(is_disabled=False).order_by("task_number") + servers = [s for s in servers if s.status == "normal"] + for server in servers: + if server.task_number <= server.cpu_core * 2: + server.task_number = F("task_number") + 1 + server.save() + self.server = server + return server + return None + + def __exit__(self, exc_type, exc_val, exc_tb): + if self.server: + JudgeServer.objects.filter(id=self.server.id).update(task_number=F("task_number") - 1) + + class DispatcherBase(object): def __init__(self): self.token = hashlib.sha256(SysOptions.judge_server_token.encode("utf-8")).hexdigest() @@ -42,25 +63,6 @@ class DispatcherBase(object): except Exception as e: logger.exception(e) - @staticmethod - def choose_judge_server(): - with transaction.atomic(): - servers = JudgeServer.objects.select_for_update().filter(is_disabled=False).order_by("task_number") - servers = [s for s in servers if s.status == "normal"] - for server in servers: - if server.task_number <= server.cpu_core * 2: - server.task_number = F("task_number") + 1 - server.save() - return server - - @staticmethod - def release_judge_server(judge_server_id): - with transaction.atomic(): - # 使用原子操作, 同时因为use和release中间间隔了判题过程,需要重新查询一下 - server = JudgeServer.objects.get(id=judge_server_id) - server.task_number = F("task_number") - 1 - server.save() - class SPJCompiler(DispatcherBase): def __init__(self, spj_code, spj_version, spj_language): @@ -74,13 +76,14 @@ class SPJCompiler(DispatcherBase): } def compile_spj(self): - server = self.choose_judge_server() - if not server: - return "No available judge_server" - result = self._request(urljoin(server.service_url, "compile_spj"), data=self.data) - self.release_judge_server(server.id) - if result["err"]: - return result["data"] + with ChooseJudgeServer() as server: + if not server: + return "No available judge_server" + result = self._request(urljoin(server.service_url, "compile_spj"), data=self.data) + if not result: + return "Failed to call judge server" + if result["err"]: + return result["data"] class JudgeDispatcher(DispatcherBase): @@ -118,12 +121,6 @@ class JudgeDispatcher(DispatcherBase): self.submission.statistic_info["score"] = score def judge(self): - server = self.choose_judge_server() - if not server: - data = {"submission_id": self.submission.id, "problem_id": self.problem.id} - cache.lpush(CacheKey.waiting_queue, json.dumps(data)) - return - language = self.submission.language sub_config = list(filter(lambda item: language == item["name"], SysOptions.languages))[0] spj_config = {} @@ -152,9 +149,18 @@ class JudgeDispatcher(DispatcherBase): "spj_src": self.problem.spj_code } - Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.JUDGING) + with ChooseJudgeServer() as server: + if not server: + data = {"submission_id": self.submission.id, "problem_id": self.problem.id} + cache.lpush(CacheKey.waiting_queue, json.dumps(data)) + return + Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.JUDGING) + resp = self._request(urljoin(server.service_url, "/judge"), data=data) + + if not resp: + Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.SYSTEM_ERROR) + return - resp = self._request(urljoin(server.service_url, "/judge"), data=data) if resp["err"]: self.submission.result = JudgeStatus.COMPILE_ERROR self.submission.statistic_info["err_info"] = resp["data"] @@ -173,7 +179,6 @@ class JudgeDispatcher(DispatcherBase): else: self.submission.result = JudgeStatus.PARTIALLY_ACCEPTED self.submission.save() - self.release_judge_server(server.id) if self.contest_id: if self.contest.status != ContestStatus.CONTEST_UNDERWAY or \ @@ -322,15 +327,25 @@ class JudgeDispatcher(DispatcherBase): def update_contest_rank(self): if self.contest.rule_type == ContestRuleType.OI or self.contest.real_time_rank: cache.delete(f"{CacheKey.contest_rank_cache}:{self.contest.id}") + with transaction.atomic(): if self.contest.rule_type == ContestRuleType.ACM: - acm_rank, _ = ACMContestRank.objects.select_for_update(). \ - get_or_create(user_id=self.submission.user_id, contest=self.contest) - self._update_acm_contest_rank(acm_rank) + model = ACMContestRank + func = self._update_acm_contest_rank else: - oi_rank, _ = OIContestRank.objects.select_for_update(). \ - get_or_create(user_id=self.submission.user_id, contest=self.contest) - self._update_oi_contest_rank(oi_rank) + model = OIContestRank + func = self._update_oi_contest_rank + + try: + # todo unique index + # func 也不是安全的 + rank = model.objects.get(user_id=self.submission.user_id, contest=self.contest) + except ACMContestRank.DoesNotExist: + try: + rank = model.objects.create(user_id=self.submission.user_id, contest=self.contest) + except IntegrityError: + rank = model.objects.get(user_id=self.submission.user_id, contest=self.contest) + func(rank) def _update_acm_contest_rank(self, rank): info = rank.submission_info.get(str(self.submission.problem_id)) diff --git a/utils/views.py b/utils/views.py index faced3ab..ce30c717 100644 --- a/utils/views.py +++ b/utils/views.py @@ -72,4 +72,4 @@ class SimditorFileUploadAPIView(CSRFExemptAPIView): "success": True, "msg": "Success", "file_path": f"{settings.UPLOAD_PREFIX}/{file_name}", - "file_name": file.name}) \ No newline at end of file + "file_name": file.name})