OnlineJudge/judge/dispatcher.py

245 lines
11 KiB
Python
Raw Normal View History

2017-05-08 12:37:54 +00:00
import hashlib
2017-08-15 12:02:36 +00:00
import json
2017-05-08 12:37:54 +00:00
import logging
from urllib.parse import urljoin
2017-08-15 12:02:36 +00:00
import requests
from django.core.cache import cache
2017-05-08 12:37:54 +00:00
from django.db import transaction
from django.db.models import F
from account.models import User
2017-05-08 12:37:54 +00:00
from conf.models import JudgeServer, JudgeServerToken
from contest.models import ContestRuleType, ACMContestRank, OIContestRank
2017-08-15 12:02:36 +00:00
from judge.languages import languages
from problem.models import Problem, ProblemRuleType, ContestProblem
from submission.models import JudgeStatus, Submission
2017-08-15 13:05:41 +00:00
from utils.cache import judge_cache
2017-08-15 12:32:14 +00:00
from utils.constants import CacheKey
2017-05-08 12:37:54 +00:00
logger = logging.getLogger(__name__)
# 继续处理在队列中的问题
2017-08-15 12:32:14 +00:00
def process_pending_task():
2017-08-15 13:05:41 +00:00
if judge_cache.llen(CacheKey.waiting_queue):
# 防止循环引入
2017-08-15 12:02:36 +00:00
from judge.tasks import judge_task
2017-08-16 07:33:27 +00:00
data = json.loads(judge_cache.rpop(CacheKey.waiting_queue).decode("utf-8"))
judge_task.delay(**data)
2017-05-08 12:37:54 +00:00
class JudgeDispatcher(object):
def __init__(self, submission_id, problem_id):
2017-05-08 12:37:54 +00:00
token = JudgeServerToken.objects.first().token
self.token = hashlib.sha256(token.encode("utf-8")).hexdigest()
2017-08-15 13:05:41 +00:00
self.redis_conn = judge_cache
self.submission = Submission.objects.get(pk=submission_id)
if self.submission.contest_id:
self.problem = ContestProblem.objects.select_related("contest")\
.get(_id=problem_id, contest_id=self.submission.contest_id)
self.contest = self.problem.contest
else:
2017-08-20 12:32:07 +00:00
self.problem = Problem.objects.get(_id=problem_id)
2017-05-08 12:37:54 +00:00
def _request(self, url, data=None):
kwargs = {"headers": {"X-Judge-Server-Token": self.token,
"Content-Type": "application/json"}}
if data:
kwargs["data"] = json.dumps(data)
try:
return requests.post(url, **kwargs).json()
except Exception as e:
logger.error(e.with_traceback())
@staticmethod
def choose_judge_server():
with transaction.atomic():
# TODO: use more reasonable way
2017-05-10 09:46:59 +00:00
servers = JudgeServer.objects.select_for_update().all().order_by("task_number")
servers = [s for s in servers if s.status == "normal"]
if servers:
server = servers[0]
2017-05-08 12:37:54 +00:00
server.used_instance_number = F("task_number") + 1
server.save()
return server
@staticmethod
def release_judge_res(judge_server_id):
with transaction.atomic():
# 使用原子操作, 同时因为use和release中间间隔了判题过程,需要重新查询一下
server = JudgeServer.objects.select_for_update().get(id=judge_server_id)
server.used_instance_number = F("task_number") - 1
server.save()
def judge(self, output=False):
server = self.choose_judge_server()
if not server:
2017-08-20 12:32:07 +00:00
data = {"submission_id": self.submission.id, "problem_id": self.problem._id}
2017-08-15 12:32:14 +00:00
self.redis_conn.lpush(CacheKey.waiting_queue, json.dumps(data))
2017-05-08 12:37:54 +00:00
return
sub_config = list(filter(lambda item: self.submission.language == item["name"], languages))[0]
spj_config = {}
if self.problem.spj_code:
for lang in languages:
if lang["name"] == self.problem.spj_language:
spj_config = lang["spj"]
break
data = {
"language_config": sub_config["config"],
"src": self.submission.code,
"max_cpu_time": self.problem.time_limit,
"max_memory": 1024 * 1024 * self.problem.memory_limit,
"test_case_id": self.problem.test_case_id,
"output": output,
"spj_version": self.problem.spj_version,
"spj_config": spj_config.get("config"),
"spj_compile_config": spj_config.get("compile"),
"spj_src": self.problem.spj_code
}
2017-08-15 13:05:41 +00:00
Submission.objects.filter(id=self.submission.id).update(result=JudgeStatus.JUDGING)
2017-05-08 12:37:54 +00:00
# TODO: try catch
resp = self._request(urljoin(server.service_url, "/judge"), data=data)
self.submission.info = resp
2017-05-10 09:46:59 +00:00
if resp["err"]:
self.submission.result = JudgeStatus.COMPILE_ERROR
self.submission.statistic_info["err_info"] = resp["data"]
2017-05-08 12:37:54 +00:00
else:
# 用时和内存占用保存为多个测试点中最长的那个
self.submission.statistic_info["time_cost"] = max([x["cpu_time"] for x in resp["data"]])
self.submission.statistic_info["memory_cost"] = max([x["memory"] for x in resp["data"]])
2017-08-20 12:32:07 +00:00
# todo OI statistic_info["score"]
2017-05-10 09:46:59 +00:00
error_test_case = list(filter(lambda case: case["result"] != 0, resp["data"]))
# 多个测试点全部正确则AC否则 ACM模式下取第一个错误的测试点的状态, OI模式若全部错误则取第一个错误测试点状态否则为部分正确
2017-05-08 12:37:54 +00:00
if not error_test_case:
self.submission.result = JudgeStatus.ACCEPTED
elif self.problem.rule_type == ProblemRuleType.ACM or len(error_test_case) == len(resp["data"]):
self.submission.result = error_test_case[0]["result"]
2017-05-08 12:37:54 +00:00
else:
self.submission.result = JudgeStatus.PARTIALLY_ACCEPTED
self.submission.save()
2017-05-08 12:37:54 +00:00
self.release_judge_res(server.id)
2017-08-15 13:05:41 +00:00
self.update_problem_status()
if self.submission.contest_id:
self.update_contest_rank()
2017-05-08 12:37:54 +00:00
else:
2017-08-15 13:05:41 +00:00
self.update_user_profile()
# 至此判题结束,尝试处理任务队列中剩余的任务
2017-08-15 12:32:14 +00:00
process_pending_task()
2017-05-08 12:37:54 +00:00
def compile_spj(self, service_url, src, spj_version, spj_compile_config, test_case_id):
data = {"src": src, "spj_version": spj_version,
"spj_compile_config": spj_compile_config,
"test_case_id": test_case_id}
return self._request(urljoin(service_url, "compile_spj"), data=data)
2017-05-08 12:37:54 +00:00
def update_problem_status(self):
2017-08-15 13:05:41 +00:00
self.problem.add_submission_number()
if self.submission.result == JudgeStatus.ACCEPTED:
self.problem.add_ac_number()
2017-05-08 12:37:54 +00:00
with transaction.atomic():
2017-08-15 13:05:41 +00:00
if self.submission.contest_id:
2017-08-20 12:32:07 +00:00
problem = ContestProblem.objects.select_for_update().get(_id=self.problem._id, contest_id=self.contest.id)
2017-08-15 13:05:41 +00:00
else:
2017-08-20 12:32:07 +00:00
problem = Problem.objects.select_related().get(_id=self.problem._id)
2017-08-15 13:05:41 +00:00
info = problem.statistic_info
2017-08-16 07:33:27 +00:00
result = str(self.submission.result)
info[result] = info.get(result, 0) + 1
2017-08-15 13:05:41 +00:00
problem.statistic_info = info
problem.save(update_fields=["statistic_info"])
2017-08-15 13:05:41 +00:00
def update_user_profile(self):
with transaction.atomic():
user = User.objects.select_for_update().get(id=self.submission.user_id)
user_profile = user.userprofile
user_profile.add_submission_number()
problems_status = user_profile.problems_status
2017-08-20 12:32:07 +00:00
problem_id = str(self.problem._id)
if self.problem.rule_type == ProblemRuleType.ACM:
if problem_id not in problems_status:
problems_status[problem_id] = {"status": self.submission.result}
if self.submission.result == JudgeStatus.ACCEPTED:
user_profile.add_accepted_problem_number()
# 以前提交过, ac了直接略过
elif problems_status[problem_id]["status"] != JudgeStatus.ACCEPTED:
if self.submission.result == JudgeStatus.ACCEPTED:
user_profile.add_accepted_problem_number()
problems_status[problem_id]["status"] = JudgeStatus.ACCEPTED
else:
problems_status[problem_id]["status"] = self.submission.result
else:
score = self.submission.statistic_info["score"]
if problem_id not in problems_status:
user_profile.add_score(score)
problems_status[problem_id] = {"score": score}
2017-05-08 12:37:54 +00:00
else:
2017-08-20 12:32:07 +00:00
# 加上本次 减掉上次的score
user_profile.add_score(score, problems_status[problem_id]["score"])
problems_status[problem_id] = {"score": score}
user_profile.problems_status = problems_status
user_profile.save(update_fields=["problems_status"])
def update_contest_rank(self):
if self.contest.real_time_rank:
cache.delete(str(self.contest.id) + "_rank_cache")
with transaction.atomic():
if self.contest.rule_type == ContestRuleType.ACM:
acm_rank, _ = ACMContestRank.objects.select_for_update(). \
get_or_create(user_id=self.submission.user_id, contest=self.contest)
self._update_acm_contest_rank(acm_rank)
else:
oi_rank, _ = OIContestRank.objects.select_for_update(). \
get_or_create(user_id=self.submission.user_id, contest=self.contest)
self._update_oi_contest_rank(oi_rank)
def _update_acm_contest_rank(self, rank):
info = rank.submission_info.get(str(self.submission.problem_id))
# 因前面更改过,这里需要重新获取
problem = ContestProblem.objects.get(contest_id=self.contest.id, _id=self.problem._id)
# 此题提交过
if info:
if info["is_ac"]:
return
rank.submission_number += 1
if self.submission.result == JudgeStatus.ACCEPTED:
rank.accepted_number += 1
info["is_ac"] = True
info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds()
rank.total_time += info["ac_time"] + info["error_number"] * 20 * 60
if problem.accepted_number == 1:
info["is_first_ac"] = True
else:
info["error_number"] += 1
# 第一次提交
else:
rank.submission_number += 1
info = {"is_ac": False, "ac_time": 0, "error_number": 0, "is_first_ac": False}
if self.submission.result == JudgeStatus.ACCEPTED:
rank.accepted_number += 1
info["is_ac"] = True
info["ac_time"] = (self.submission.create_time - self.contest.start_time).total_seconds()
rank.total_time += info["ac_time"]
if problem.accepted_number == 1:
info["is_first_ac"] = True
else:
info["error_number"] = 1
rank.submission_info[str(self.submission.problem_id)] = info
rank.save()
def _update_oi_contest_rank(self, rank):
pass