diff --git a/account/models.py b/account/models.py index 395fc275..b99c6917 100644 --- a/account/models.py +++ b/account/models.py @@ -72,6 +72,5 @@ class UserProfile(models.Model): school = models.CharField(max_length=200, blank=True, null=True) student_id = models.CharField(max_length=15, blank=True, null=True) - class Meta: db_table = "user_profile" diff --git a/contest/models.py b/contest/models.py index 51f5ea50..591b54af 100644 --- a/contest/models.py +++ b/contest/models.py @@ -7,7 +7,7 @@ from problem.models import AbstractProblem from group.models import Group from utils.models import RichTextField from jsonfield import JSONField -from judge.judger.result import result +from judge.result import result GROUP_CONTEST = 0 diff --git a/judge/judger/__init__.py b/db1.sqlite3 similarity index 100% rename from judge/judger/__init__.py rename to db1.sqlite3 diff --git a/dockerfiles/oj_web_server/requirements.txt b/dockerfiles/oj_web_server/requirements.txt index 7ad8718e..56676a92 100644 --- a/dockerfiles/oj_web_server/requirements.txt +++ b/dockerfiles/oj_web_server/requirements.txt @@ -4,11 +4,11 @@ redis django-redis-sessions djangorestframework django-rest-swagger -celery gunicorn coverage django-extensions supervisor pillow jsonfield -Envelopes \ No newline at end of file +Envelopes +huey \ No newline at end of file diff --git a/dockerfiles/oj_web_server/supervisord.conf b/dockerfiles/oj_web_server/supervisord.conf index 420b65f4..dec4bf49 100644 --- a/dockerfiles/oj_web_server/supervisord.conf +++ b/dockerfiles/oj_web_server/supervisord.conf @@ -23,4 +23,4 @@ serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets. [include] -files=gunicorn.conf mq.conf \ No newline at end of file +files=gunicorn.conf task_queue.conf \ No newline at end of file diff --git a/dockerfiles/oj_web_server/mq.conf b/dockerfiles/oj_web_server/task_queue.conf similarity index 53% rename from dockerfiles/oj_web_server/mq.conf rename to dockerfiles/oj_web_server/task_queue.conf index ae1797c6..39f837e0 100644 --- a/dockerfiles/oj_web_server/mq.conf +++ b/dockerfiles/oj_web_server/task_queue.conf @@ -1,12 +1,12 @@ [program:mq] -command=python manage.py runscript mq +command=python manage.py run_huey directory=/code/ user=root numprocs=1 -stdout_logfile=/code/log/mq.log -stderr_logfile=/code/log/mq.log +stdout_logfile=/code/log/task_queue.log +stderr_logfile=/code/log/task_queue.log autostart=true autorestart=true startsecs=5 diff --git a/judge/__init__.py b/judge/__init__.py index 9bad5790..e69de29b 100644 --- a/judge/__init__.py +++ b/judge/__init__.py @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/judge/judger/client.py b/judge/client.py similarity index 100% rename from judge/judger/client.py rename to judge/client.py diff --git a/judge/judger/compiler.py b/judge/compiler.py similarity index 100% rename from judge/judger/compiler.py rename to judge/compiler.py diff --git a/judge/judger/judge_exceptions.py b/judge/judge_exceptions.py similarity index 100% rename from judge/judger/judge_exceptions.py rename to judge/judge_exceptions.py diff --git a/judge/judger/run.py b/judge/judger/run.py deleted file mode 100644 index 8aeb5476..00000000 --- a/judge/judger/run.py +++ /dev/null @@ -1,94 +0,0 @@ -# coding=utf-8 -import sys -import json -import MySQLdb - -from client import JudgeClient -from language import languages -from compiler import compile_ -from result import result -from settings import judger_workspace, submission_db -from logger import logger - - -# 简单的解析命令行参数 -# 参数有 -solution_id -time_limit -memory_limit -test_case_id -# 获取到的值是['xxx.py', '-solution_id', '1111', '-time_limit', '1000', '-memory_limit', '100', '-test_case_id', 'aaaa'] -args = sys.argv -submission_id = args[2] -time_limit = args[4] -memory_limit = args[6] -test_case_id = args[8] - - -def db_conn(): - return MySQLdb.connect(db=submission_db["db"], - user=submission_db["user"], - passwd=submission_db["password"], - host=submission_db["host"], - port=submission_db["port"], charset="utf8") - - -conn = db_conn() -cur = conn.cursor() -cur.execute("select language, code from submission where id = %s", (submission_id,)) -data = cur.fetchall() -if not data: - exit() -language_code = data[0][0] -code = data[0][1] - -conn.close() - -# 将代码写入文件 -language = languages[language_code] -src_path = judger_workspace + "run/" + language["src_name"] -f = open(src_path, "w") -f.write(code.encode("utf8")) -f.close() - -# 编译 -try: - exe_path = compile_(language, src_path, judger_workspace + "run/") -except Exception as e: - print e - conn = db_conn() - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", - (result["compile_error"], str(e), submission_id)) - conn.commit() - exit() - -# 运行 -try: - client = JudgeClient(language_code=language_code, - exe_path=exe_path, - max_cpu_time=int(time_limit), - max_real_time=int(time_limit) * 2, - max_memory=int(memory_limit), - test_case_dir=judger_workspace + "test_case/" + test_case_id + "/") - judge_result = {"result": result["accepted"], "info": client.run(), "accepted_answer_time": None} - - for item in judge_result["info"]: - if item["result"]: - judge_result["result"] = item["result"] - break - else: - l = sorted(judge_result["info"], key=lambda k: k["cpu_time"]) - judge_result["accepted_answer_time"] = l[-1]["cpu_time"] - -except Exception as e: - logger.error(e) - conn = db_conn() - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", (result["system_error"], str(e), submission_id)) - conn.commit() - exit() - -conn = db_conn() -cur = conn.cursor() -cur.execute("update submission set result=%s, info=%s, accepted_answer_time=%s where id=%s", - (judge_result["result"], json.dumps(judge_result["info"]), judge_result["accepted_answer_time"], - submission_id)) -conn.commit() -conn.close() diff --git a/judge/judger_controller/README.md b/judge/judger_controller/README.md deleted file mode 100644 index 9dd56032..00000000 --- a/judge/judger_controller/README.md +++ /dev/null @@ -1 +0,0 @@ -celery -A judge.controller worker -l DEBUG \ No newline at end of file diff --git a/judge/judger_controller/celery.py b/judge/judger_controller/celery.py deleted file mode 100644 index 4c64ab04..00000000 --- a/judge/judger_controller/celery.py +++ /dev/null @@ -1,9 +0,0 @@ -# coding=utf-8 -from __future__ import absolute_import -from celery import Celery, platforms -from .settings import redis_config - -app = Celery("judge", broker='redis://%s:%s/%s' % (redis_config["host"], redis_config["port"], redis_config["db"]), - include=["judge.judger_controller.tasks"]) - -platforms.C_FORCE_ROOT =True diff --git a/judge/judger_controller/settings.py b/judge/judger_controller/settings.py deleted file mode 100644 index 4d48340e..00000000 --- a/judge/judger_controller/settings.py +++ /dev/null @@ -1,39 +0,0 @@ -# coding=utf-8 -""" -注意: -此文件包含 celery 的部分配置,但是 celery 并不是运行在docker 中的,所以本配置文件中的 redis和 MySQL 的地址就应该是 -运行 redis 和 MySQL 的 docker 容器的地址了。怎么获取这个地址见帮助文档。测试用例的路径和源代码路径同理。 -""" -import os -# 这个redis 是 celery 使用的,包括存储队列信息还有部分统计信息 -redis_config = { - "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR"), - "port": 6379, - "db": 0 -} - - -# 判题的 docker 容器的配置参数 -docker_config = { - "image_name": "judger", - "docker_path": "docker", - "shell": True -} - - -# 测试用例的路径,是主机上的实际路径 -test_case_dir = "/root/test_case/" -# 源代码路径,也就是 manage.py 所在的实际路径 -source_code_dir = "/root/qduoj/" -# 日志文件夹路径 -log_dir = "/root/log/" - - -# 存储提交信息的数据库,是 celery 使用的,与 oj.settings/local_settings 等区分,那是 web 服务器访问的地址 -submission_db = { - "host": os.environ.get("submission_db_host"), - "port": 3306, - "db": "oj_submission", - "user": "root", - "password": "root" -} diff --git a/judge/judger_controller/tasks.py b/judge/judger_controller/tasks.py deleted file mode 100644 index 2219d96e..00000000 --- a/judge/judger_controller/tasks.py +++ /dev/null @@ -1,45 +0,0 @@ -# coding=utf-8 -import json -import redis -import MySQLdb -import subprocess -from ..judger.result import result -from ..judger_controller.celery import app -from settings import docker_config, source_code_dir, test_case_dir, log_dir, submission_db, redis_config - - -@app.task -def judge(submission_id, time_limit, memory_limit, test_case_id): - try: - command = "%s run --privileged --rm " \ - "--link mysql " \ - "-v %s:/var/judger/test_case/:ro " \ - "-v %s:/var/judger/code/:ro " \ - "-v %s:/var/judger/code/log/ " \ - "--device /dev/null:/dev/null " \ - "%s " \ - "python judge/judger/run.py " \ - "--solution_id %s --time_limit %s --memory_limit %s --test_case_id %s" % \ - (docker_config["docker_path"], - test_case_dir, - source_code_dir, - log_dir, - docker_config["image_name"], - submission_id, str(time_limit), str(memory_limit), test_case_id) - subprocess.call(command, shell=docker_config["shell"]) - except Exception as e: - conn = MySQLdb.connect(db=submission_db["db"], - user=submission_db["user"], - passwd=submission_db["password"], - host=submission_db["host"], - port=submission_db["port"], - charset="utf8") - - cur = conn.cursor() - cur.execute("update submission set result=%s, info=%s where id=%s", - (result["system_error"], str(e), submission_id)) - conn.commit() - conn.close() - r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) - r.decr("judge_queue_length") - r.lpush("queue", submission_id) diff --git a/judge/judger/language.py b/judge/language.py similarity index 87% rename from judge/judger/language.py rename to judge/language.py index c7a14eb4..e6406566 100644 --- a/judge/judger/language.py +++ b/judge/language.py @@ -7,16 +7,16 @@ languages = { "src_name": "main.c", "code": 1, "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", - "compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}main", - "execute_command": "{exe_path}main" + "compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}/main", + "execute_command": "{exe_path}/main" }, 2: { "name": "cpp", "src_name": "main.cpp", "code": 2, "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", - "compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}main", - "execute_command": "{exe_path}main" + "compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}/main", + "execute_command": "{exe_path}/main" }, 3: { "name": "java", diff --git a/judge/judger/logger.py b/judge/logger.py similarity index 100% rename from judge/judger/logger.py rename to judge/logger.py diff --git a/judge/judger/result.py b/judge/result.py similarity index 100% rename from judge/judger/result.py rename to judge/result.py diff --git a/judge/runner.py b/judge/runner.py new file mode 100644 index 00000000..5ca5c205 --- /dev/null +++ b/judge/runner.py @@ -0,0 +1,65 @@ +# coding=utf-8 +import os +import socket +import shutil + +from client import JudgeClient +from language import languages +from compiler import compile_ +from result import result +from settings import judger_workspace + + +class JudgeInstanceRunner(object): + + def run(self, token, submission_id, language_code, code, time_limit, memory_limit, test_case_id): + language = languages[language_code] + host_name = socket.gethostname() + judge_base_path = os.path.join(judger_workspace, "run", submission_id) + + if not token or token != os.environ.get("rpc_token"): + return {"code": 2, "data": {"error": "Invalid token", "server": host_name}} + + try: + os.mkdir(judge_base_path) + os.chmod(judge_base_path, 0777) + + # 将代码写入文件 + src_path = os.path.join(judge_base_path, language["src_name"]) + f = open(src_path, "w") + f.write(code.encode("utf8")) + f.close() + except Exception as e: + shutil.rmtree(judge_base_path, ignore_errors=True) + return {"code": 2, "data": {"error": str(e), "server": host_name}} + + # 编译 + try: + exe_path = compile_(language, src_path, judge_base_path) + except Exception as e: + shutil.rmtree(judge_base_path, ignore_errors=True) + return {"code": 1, "data": {"error": str(e), "server": host_name}} + + # 运行 + try: + client = JudgeClient(language_code=language_code, + exe_path=exe_path, + max_cpu_time=int(time_limit), + max_real_time=int(time_limit) * 2, + max_memory=int(memory_limit), + test_case_dir=judger_workspace + "test_case/" + test_case_id + "/") + judge_result = {"result": result["accepted"], "info": client.run(), + "accepted_answer_time": None, "server": host_name} + + for item in judge_result["info"]: + if item["result"]: + judge_result["result"] = item["result"] + break + else: + l = sorted(judge_result["info"], key=lambda k: k["cpu_time"]) + judge_result["accepted_answer_time"] = l[-1]["cpu_time"] + return {"code": 0, "data": judge_result} + except Exception as e: + return {"code": 2, "data": {"error": str(e), "server": host_name}} + finally: + shutil.rmtree(judge_base_path, ignore_errors=True) \ No newline at end of file diff --git a/judge/server.py b/judge/server.py new file mode 100644 index 00000000..477cc1e4 --- /dev/null +++ b/judge/server.py @@ -0,0 +1,13 @@ +# coding=utf-8 +import SocketServer +from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler +from runner import JudgeInstanceRunner + + +class AsyncXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer): + pass + + +server = AsyncXMLRPCServer(('0.0.0.0', 8080), SimpleXMLRPCRequestHandler, allow_none=True) +server.register_instance(JudgeInstanceRunner()) +server.serve_forever() \ No newline at end of file diff --git a/judge/judger/settings.py b/judge/settings.py similarity index 100% rename from judge/judger/settings.py rename to judge/settings.py diff --git a/judge/judger/utils.py b/judge/utils.py similarity index 100% rename from judge/judger/utils.py rename to judge/utils.py diff --git a/judge/judger_controller/__init__.py b/judge_dispatcher/__init__.py similarity index 100% rename from judge/judger_controller/__init__.py rename to judge_dispatcher/__init__.py diff --git a/judge_dispatcher/migrations/0001_initial.py b/judge_dispatcher/migrations/0001_initial.py new file mode 100644 index 00000000..e85330ed --- /dev/null +++ b/judge_dispatcher/migrations/0001_initial.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ] + + operations = [ + migrations.CreateModel( + name='JudgeServer', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('ip', models.GenericIPAddressField()), + ('port', models.IntegerField()), + ('max_instance_number', models.IntegerField()), + ('left_instance_number', models.IntegerField()), + ('workload', models.IntegerField(default=0)), + ('token', models.CharField(max_length=30)), + ('lock', models.BooleanField(default=False)), + ('status', models.BooleanField(default=True)), + ], + options={ + 'db_table': 'judge_server', + }, + ), + migrations.CreateModel( + name='JudgeWaitingQueue', + fields=[ + ('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)), + ('submission_id', models.CharField(max_length=40)), + ('create_time', models.DateTimeField(auto_now_add=True)), + ], + options={ + 'db_table': 'judge_waiting_queue', + }, + ), + ] diff --git a/judge_dispatcher/migrations/0002_auto_20151207_2310.py b/judge_dispatcher/migrations/0002_auto_20151207_2310.py new file mode 100644 index 00000000..1597919c --- /dev/null +++ b/judge_dispatcher/migrations/0002_auto_20151207_2310.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('judge_dispatcher', '0001_initial'), + ] + + operations = [ + migrations.AddField( + model_name='judgewaitingqueue', + name='memory_limit', + field=models.IntegerField(default=1), + preserve_default=False, + ), + migrations.AddField( + model_name='judgewaitingqueue', + name='test_case_id', + field=models.CharField(default=1, max_length=40), + preserve_default=False, + ), + migrations.AddField( + model_name='judgewaitingqueue', + name='time_limit', + field=models.IntegerField(default=1), + preserve_default=False, + ), + ] diff --git a/judge_dispatcher/migrations/__init__.py b/judge_dispatcher/migrations/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/judge_dispatcher/models.py b/judge_dispatcher/models.py new file mode 100644 index 00000000..7ec5af8f --- /dev/null +++ b/judge_dispatcher/models.py @@ -0,0 +1,44 @@ +# coding=utf-8 +from django.db import models + + +class JudgeServer(models.Model): + ip = models.GenericIPAddressField() + port = models.IntegerField() + # 这个服务器最大可能运行的判题实例数量 + max_instance_number = models.IntegerField() + left_instance_number = models.IntegerField() + workload = models.IntegerField(default=0) + token = models.CharField(max_length=30) + # 进行测试用例同步的时候加锁 + lock = models.BooleanField(default=False) + # status 为 false 的时候代表不使用这个服务器 + status = models.BooleanField(default=True) + + def use_judge_instance(self): + # 因为use 和 release 中间是判题时间,可能这个 model 的数据已经被修改了,所以不能直接使用self.xxx,否则取到的是旧数据 + server = JudgeServer.objects.select_for_update().get(id=self.id) + server.left_instance_number -= 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + server.save() + + def release_judge_instance(self): + # 使用原子操作 + server = JudgeServer.objects.select_for_update().get(id=self.id) + server.left_instance_number += 1 + server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100) + server.save() + + class Meta: + db_table = "judge_server" + + +class JudgeWaitingQueue(models.Model): + submission_id = models.CharField(max_length=40) + time_limit = models.IntegerField() + memory_limit = models.IntegerField() + test_case_id = models.CharField(max_length=40) + create_time = models.DateTimeField(auto_now_add=True) + + class Meta: + db_table = "judge_waiting_queue" diff --git a/judge_dispatcher/rpc_client.py b/judge_dispatcher/rpc_client.py new file mode 100644 index 00000000..c095cb5e --- /dev/null +++ b/judge_dispatcher/rpc_client.py @@ -0,0 +1,24 @@ +# coding=utf-8 +import xmlrpclib +import httplib + + +class TimeoutHTTPConnection(httplib.HTTPConnection): + def __init__(self, host, timeout=10): + httplib.HTTPConnection.__init__(self, host, timeout=timeout) + + +class TimeoutTransport(xmlrpclib.Transport): + def __init__(self, timeout=10, *args, **kwargs): + xmlrpclib.Transport.__init__(self, *args, **kwargs) + self.timeout = timeout + + def make_connection(self, host): + conn = TimeoutHTTPConnection(host, self.timeout) + return conn + + +class TimeoutServerProxy(xmlrpclib.ServerProxy): + def __init__(self, uri, timeout=10, *args, **kwargs): + kwargs['transport'] = TimeoutTransport(timeout=timeout, use_datetime=kwargs.get('use_datetime', 0)) + xmlrpclib.ServerProxy.__init__(self, uri, *args, **kwargs) diff --git a/judge_dispatcher/tasks.py b/judge_dispatcher/tasks.py new file mode 100644 index 00000000..244c9074 --- /dev/null +++ b/judge_dispatcher/tasks.py @@ -0,0 +1,149 @@ +# coding=utf-8 +import json +import logging +import time + +from django.db import transaction + +from rpc_client import TimeoutServerProxy + +from judge.result import result +from contest.models import ContestProblem, ContestRank, Contest, CONTEST_UNDERWAY +from problem.models import Problem +from submission.models import Submission +from account.models import User +from utils.cache import get_cache_redis + +from .models import JudgeServer, JudgeWaitingQueue + +logger = logging.getLogger("app_info") + + +class JudgeDispatcher(object): + def __init__(self, submission, time_limit, memory_limit, test_case_id): + self.submission = submission + self.time_limit = time_limit + self.memory_limit = memory_limit + self.test_case_id = test_case_id + self.user = User.objects.get(id=submission.user_id) + + def choose_judge_server(self): + servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload") + if servers.exists(): + return servers.first() + + def judge(self, is_waiting_task=False): + self.submission.judge_start_time = int(time.time() * 1000) + + with transaction.atomic(): + judge_server = self.choose_judge_server() + + # 如果没有合适的判题服务器,就放入等待队列中等待判题 + if not judge_server: + JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit, + memory_limit=self.memory_limit, test_case_id=self.test_case_id) + return + + judge_server.use_judge_instance() + + try: + s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20) + + data = s.run(judge_server.token, self.submission.id, self.submission.language, + self.submission.code, self.time_limit, self.memory_limit, self.test_case_id) + # 编译错误 + if data["code"] == 1: + self.submission.result = result["compile_error"] + self.submission.info = data["data"]["error"] + # system error + elif data["code"] == 2: + self.submission.result = result["system_error"] + self.submission.info = data["data"]["error"] + elif data["code"] == 0: + self.submission.result = data["data"]["result"] + self.submission.info = json.dumps(data["data"]["info"]) + self.submission.accepted_answer_time = data["data"]["accepted_answer_time"] + except Exception as e: + self.submission.result = result["system_error"] + self.submission.info = str(e) + finally: + with transaction.atomic(): + judge_server.release_judge_instance() + + self.submission.judge_end_time = int(time.time() * 1000) + self.submission.save() + + if self.submission.contest_id: + self.update_contest_problem_status() + else: + self.update_problem_status() + + with transaction.atomic(): + waiting_submissions = JudgeWaitingQueue.objects.select_for_update().all() + if waiting_submissions.exists(): + # 防止循环依赖 + from submission.tasks import _judge + + waiting_submission = waiting_submissions.first() + + submission = Submission.objects.get(id=waiting_submission.submission_id) + waiting_submission.delete() + + _judge(submission, time_limit=waiting_submission.time_limit, + memory_limit=waiting_submission.memory_limit, test_case_id=waiting_submission.test_case_id, + is_waiting_task=True) + + def update_problem_status(self): + problem = Problem.objects.get(id=self.submission.problem_id) + + # 更新普通题目的计数器 + problem.add_submission_number() + + # 更新用户做题状态 + problems_status = self.user.problems_status + if "problems" not in problems_status: + problems_status["problems"] = {} + if self.submission.result == result["accepted"]: + problem.add_ac_number() + problems_status["problems"][str(problem.id)] = 1 + else: + problems_status["problems"][str(problem.id)] = 2 + self.user.problems_status = problems_status + self.user.save() + # 普通题目的话,到这里就结束了 + + def update_contest_problem_status(self): + # 能运行到这里的都是比赛题目 + contest = Contest.objects.get(id=self.submission.contest_id) + if contest.status != CONTEST_UNDERWAY: + logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id) + return + with transaction.atomic(): + contest_problem = ContestProblem.objects.select_for_update().get(contest=contest, id=self.submission.problem_id) + + contest_problem.add_submission_number() + + # todo 事务 + problems_status = self.user.problems_status + if "contest_problems" not in problems_status: + problems_status["contest_problems"] = {} + if self.submission.result == result["accepted"]: + contest_problem.add_ac_number() + problems_status["contest_problems"][str(contest_problem.id)] = 1 + else: + problems_status["contest_problems"][str(contest_problem.id)] = 0 + self.user.problems_status = problems_status + self.user.save() + + self.update_contest_rank(contest) + + def update_contest_rank(self, contest): + if contest.real_time_rank: + get_cache_redis().delete(str(contest.id) + "_rank_cache") + + with transaction.atomic(): + try: + contest_rank = ContestRank.objects.select_for_update().get(contest=contest, user=self.user) + contest_rank.update_rank(self.submission) + except ContestRank.DoesNotExist: + ContestRank.objects.create(contest=contest, user=self.user).update_rank(self.submission) diff --git a/monitor/views.py b/monitor/views.py index 15c62b4c..f502579f 100644 --- a/monitor/views.py +++ b/monitor/views.py @@ -2,15 +2,15 @@ import redis import datetime from rest_framework.views import APIView -from judge.judger.result import result -from judge.judger_controller.settings import redis_config +from judge.result import result +from django.conf import settings from utils.shortcuts import success_response from submission.models import Submission class QueueLengthMonitorAPIView(APIView): def get(self, request): - r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) + r = redis.Redis(host=settings.redis_config["host"], port=settings.redis_config["port"], db=settings.redis_config["db"]) waiting_number = r.get("judge_queue_length") if waiting_number is None: waiting_number = 0 diff --git a/mq/__init__.py b/mq/__init__.py deleted file mode 100644 index 9bad5790..00000000 --- a/mq/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/models.py b/mq/models.py deleted file mode 100644 index 9bad5790..00000000 --- a/mq/models.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/scripts/__init__.py b/mq/scripts/__init__.py deleted file mode 100644 index 9bad5790..00000000 --- a/mq/scripts/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# coding=utf-8 diff --git a/mq/scripts/mq.py b/mq/scripts/mq.py deleted file mode 100644 index a3b15e7e..00000000 --- a/mq/scripts/mq.py +++ /dev/null @@ -1,104 +0,0 @@ -# coding=utf-8 -import logging - -import redis - -from django.db import transaction - -from judge.judger_controller.settings import redis_config -from judge.judger.result import result -from submission.models import Submission -from problem.models import Problem -from utils.cache import get_cache_redis -from contest.models import ContestProblem, Contest, CONTEST_UNDERWAY, ContestRank -from account.models import User - -logger = logging.getLogger("app_info") - - -class MessageQueue(object): - def __init__(self): - self.conn = redis.StrictRedis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) - self.queue = 'queue' - - def listen_task(self): - while True: - submission_id = self.conn.blpop(self.queue, 0)[1] - logger.debug("receive submission_id: " + submission_id) - - try: - submission = Submission.objects.get(id=submission_id) - except Submission.DoesNotExist: - logger.warning("Submission does not exist, submission_id: " + submission_id) - continue - - # 更新该用户的解题状态用 - try: - user = User.objects.get(pk=submission.user_id) - except User.DoesNotExist: - logger.warning("Submission user does not exist, submission_id: " + submission_id) - continue - - if not submission.contest_id: - try: - problem = Problem.objects.get(id=submission.problem_id) - except Problem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - continue - - problems_status = user.problems_status - - # 更新普通题目的计数器 - problem.add_submission_number() - if "problems" not in problems_status: - problems_status["problems"] = {} - if submission.result == result["accepted"]: - problem.add_ac_number() - problems_status["problems"][str(problem.id)] = 1 - else: - problems_status["problems"][str(problem.id)] = 2 - user.problems_status = problems_status - user.save() - # 普通题目的话,到这里就结束了 - continue - - # 能运行到这里的都是比赛题目 - try: - contest = Contest.objects.get(id=submission.contest_id) - if contest.status != CONTEST_UNDERWAY: - logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + submission_id) - continue - contest_problem = ContestProblem.objects.get(contest=contest, id=submission.problem_id) - except Contest.DoesNotExist: - logger.warning("Submission contest does not exist, submission_id: " + submission_id) - continue - except ContestProblem.DoesNotExist: - logger.warning("Submission problem does not exist, submission_id: " + submission_id) - continue - - # 如果比赛现在不是封榜状态,删除比赛的排名缓存 - if contest.real_time_rank: - get_cache_redis().delete(str(contest.id) + "_rank_cache") - - with transaction.atomic(): - try: - contest_rank = ContestRank.objects.get(contest=contest, user=user) - contest_rank.update_rank(submission) - except ContestRank.DoesNotExist: - ContestRank.objects.create(contest=contest, user=user).update_rank(submission) - - problems_status = user.problems_status - - contest_problem.add_submission_number() - if "contest_problems" not in problems_status: - problems_status["contest_problems"] = {} - if submission.result == result["accepted"]: - contest_problem.add_ac_number() - problems_status["contest_problems"][str(contest_problem.id)] = 1 - else: - problems_status["contest_problems"][str(contest_problem.id)] = 0 - user.problems_status = problems_status - user.save() - -logger.debug("Start message queue") -MessageQueue().listen_task() diff --git a/oj/__init__.py b/oj/__init__.py index c19c0794..f4206374 100644 --- a/oj/__init__.py +++ b/oj/__init__.py @@ -6,4 +6,4 @@ \___/ |_| |_||_||_||_| |_| \___| \___/ \__,_| \__,_| \__, | \___| |_.__/ \__, | \__, | \__,_| \__,_| |___/ |___/ |_| https://github.com/QingdaoU/OnlineJudge -""" \ No newline at end of file +""" diff --git a/oj/local_settings.py b/oj/local_settings.py index a08ae7e2..82bfe2ed 100644 --- a/oj/local_settings.py +++ b/oj/local_settings.py @@ -22,6 +22,12 @@ REDIS_CACHE = { "db": 1 } +REDIS_QUEUE = { + "host": "127.0.0.1", + "port": 6379, + "db": 2 +} + DEBUG = True ALLOWED_HOSTS = [] diff --git a/oj/server_settings.py b/oj/server_settings.py index bba27c6f..cd86c16f 100644 --- a/oj/server_settings.py +++ b/oj/server_settings.py @@ -31,6 +31,13 @@ REDIS_CACHE = { "db": 1 } +REDIS_QUEUE = { + "host": os.environ.get("REDIS_PORT_6379_TCP_ADDR", "127.0.0.1"), + "port": 6379, + "db": 2 +} + + DEBUG = False ALLOWED_HOSTS = ['*'] diff --git a/oj/settings.py b/oj/settings.py index de88dfbc..1d3cc378 100644 --- a/oj/settings.py +++ b/oj/settings.py @@ -22,6 +22,12 @@ if ENV == "local": elif ENV == "server": from .server_settings import * +BROKER_BACKEND = "redis" +CELERY_ACCEPT_CONTENT = ['json'] +CELERY_TASK_SERIALIZER = 'json' +CELERY_RESULT_SERIALIZER = 'json' + + BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) @@ -48,11 +54,12 @@ INSTALLED_APPS = ( 'problem', 'admin', 'submission', - 'mq', 'contest', + 'judge', + 'judge_dispatcher', - 'django_extensions', 'rest_framework', + 'huey.djhuey', ) if DEBUG: @@ -185,7 +192,19 @@ WEBSITE_INFO = {"website_name": "qduoj", "website_footer": u"青岛大学信息工程学院 创新实验室 京ICP备15062075号-1", "url": "https://qduoj.com"} + +HUEY = { + 'backend': 'huey.backends.redis_backend', + 'name': 'task_queue', + 'connection': {'host': REDIS_QUEUE["host"], 'port': REDIS_QUEUE["port"], 'db': REDIS_QUEUE["db"]}, + 'always_eager': False, # Defaults to False when running via manage.py run_huey + # Options to pass into the consumer when running ``manage.py run_huey`` + 'consumer_options': {'workers': 50}, +} + + SMTP_CONFIG = {"smtp_server": "smtp.mxhichina.com", "email": "noreply@qduoj.com", "password": os.environ.get("smtp_password", "111111"), "tls": False} + diff --git a/runJudge.sh b/runJudge.sh deleted file mode 100755 index ba25da83..00000000 --- a/runJudge.sh +++ /dev/null @@ -1 +0,0 @@ -nohup celery -A judge.judger_controller worker -l DEBUG & diff --git a/submission/migrations/0007_auto_20151207_1645.py b/submission/migrations/0007_auto_20151207_1645.py new file mode 100644 index 00000000..a053d98c --- /dev/null +++ b/submission/migrations/0007_auto_20151207_1645.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('submission', '0006_submission_shared'), + ] + + operations = [ + migrations.AddField( + model_name='submission', + name='judge_end_time', + field=models.IntegerField(null=True, blank=True), + ), + migrations.AddField( + model_name='submission', + name='judge_start_time', + field=models.IntegerField(null=True, blank=True), + ), + ] diff --git a/submission/migrations/0008_auto_20151208_2106.py b/submission/migrations/0008_auto_20151208_2106.py new file mode 100644 index 00000000..b3a85763 --- /dev/null +++ b/submission/migrations/0008_auto_20151208_2106.py @@ -0,0 +1,25 @@ +# -*- coding: utf-8 -*- +# Generated by Django 1.9 on 2015-12-08 13:06 +from __future__ import unicode_literals + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('submission', '0007_auto_20151207_1645'), + ] + + operations = [ + migrations.AlterField( + model_name='submission', + name='judge_end_time', + field=models.BigIntegerField(blank=True, null=True), + ), + migrations.AlterField( + model_name='submission', + name='judge_start_time', + field=models.BigIntegerField(blank=True, null=True), + ), + ] diff --git a/submission/models.py b/submission/models.py index b28bd5ab..fc1a0708 100644 --- a/submission/models.py +++ b/submission/models.py @@ -1,13 +1,17 @@ # coding=utf-8 from django.db import models from utils.shortcuts import rand_str -from judge.judger.result import result +from judge.result import result class Submission(models.Model): id = models.CharField(max_length=32, default=rand_str, primary_key=True, db_index=True) user_id = models.IntegerField(db_index=True) create_time = models.DateTimeField(auto_now_add=True) + # 判题开始时间 + judge_start_time = models.BigIntegerField(blank=True, null=True) + # 判题结束时间 + judge_end_time = models.BigIntegerField(blank=True, null=True) result = models.IntegerField(default=result["waiting"]) language = models.IntegerField() code = models.TextField() @@ -24,3 +28,6 @@ class Submission(models.Model): class Meta: db_table = "submission" + + def __unicode__(self): + return self.id diff --git a/submission/tasks.py b/submission/tasks.py new file mode 100644 index 00000000..25476e36 --- /dev/null +++ b/submission/tasks.py @@ -0,0 +1,9 @@ +# coding=utf-8 +from huey.djhuey import task + +from judge_dispatcher.tasks import JudgeDispatcher + + +@task() +def _judge(submission, time_limit, memory_limit, test_case_id, is_waiting_task=False): + JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge(is_waiting_task) \ No newline at end of file diff --git a/submission/views.py b/submission/views.py index 1f2acb16..ccda8acb 100644 --- a/submission/views.py +++ b/submission/views.py @@ -7,14 +7,13 @@ from django.shortcuts import render from django.core.paginator import Paginator from rest_framework.views import APIView -from judge.judger_controller.tasks import judge from account.decorators import login_required, super_admin_required from account.models import SUPER_ADMIN, User from problem.models import Problem from contest.models import ContestProblem, Contest from contest.decorators import check_user_contest_permission from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate -from utils.cache import get_cache_redis +from .tasks import _judge from .models import Submission from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, SubmissionhareSerializer, SubmissionRejudgeSerializer, @@ -23,11 +22,6 @@ from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, logger = logging.getLogger("app_info") -def _judge(submission_id, time_limit, memory_limit, test_case_id): - judge.delay(submission_id, time_limit, memory_limit, test_case_id) - get_cache_redis().incr("judge_queue_length") - - class SubmissionAPIView(APIView): @login_required def post(self, request): @@ -49,7 +43,7 @@ class SubmissionAPIView(APIView): problem_id=problem.id) try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -94,7 +88,7 @@ class ContestSubmissionAPIView(APIView): code=data["code"], problem_id=problem.id) try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败") @@ -279,7 +273,7 @@ class SubmissionRejudgeAdminAPIView(APIView): except Problem.DoesNotExist: return error_response(u"题目不存在") try: - _judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) + _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id) except Exception as e: logger.error(e) return error_response(u"提交判题任务失败")