Accept Merge Request #318 判题服务器使用新的架构 : (new-arch -> dev)

Merge Request: 判题服务器使用新的架构
Created By: @virusdefender
Accepted By: @virusdefender
URL: https://coding.net/u/virusdefender/p/qduoj/git/merge/318
This commit is contained in:
virusdefender 2015-12-09 21:41:47 +08:00
commit 875416f618
44 changed files with 487 additions and 326 deletions

View File

@ -72,6 +72,5 @@ class UserProfile(models.Model):
school = models.CharField(max_length=200, blank=True, null=True) school = models.CharField(max_length=200, blank=True, null=True)
student_id = models.CharField(max_length=15, blank=True, null=True) student_id = models.CharField(max_length=15, blank=True, null=True)
class Meta: class Meta:
db_table = "user_profile" db_table = "user_profile"

View File

@ -7,7 +7,7 @@ from problem.models import AbstractProblem
from group.models import Group from group.models import Group
from utils.models import RichTextField from utils.models import RichTextField
from jsonfield import JSONField from jsonfield import JSONField
from judge.judger.result import result from judge.result import result
GROUP_CONTEST = 0 GROUP_CONTEST = 0

View File

@ -4,11 +4,11 @@ redis
django-redis-sessions django-redis-sessions
djangorestframework djangorestframework
django-rest-swagger django-rest-swagger
celery
gunicorn gunicorn
coverage coverage
django-extensions django-extensions
supervisor supervisor
pillow pillow
jsonfield jsonfield
Envelopes Envelopes
huey

View File

@ -23,4 +23,4 @@ serverurl=unix:///tmp/supervisor.sock ; use unix:// schem for a unix sockets.
[include] [include]
files=gunicorn.conf mq.conf files=gunicorn.conf task_queue.conf

View File

@ -1,12 +1,12 @@
[program:mq] [program:mq]
command=python manage.py runscript mq command=python manage.py run_huey
directory=/code/ directory=/code/
user=root user=root
numprocs=1 numprocs=1
stdout_logfile=/code/log/mq.log stdout_logfile=/code/log/task_queue.log
stderr_logfile=/code/log/mq.log stderr_logfile=/code/log/task_queue.log
autostart=true autostart=true
autorestart=true autorestart=true
startsecs=5 startsecs=5

View File

@ -1 +0,0 @@
# coding=utf-8

View File

@ -1,94 +0,0 @@
# coding=utf-8
import sys
import json
import MySQLdb
from client import JudgeClient
from language import languages
from compiler import compile_
from result import result
from settings import judger_workspace, submission_db
from logger import logger
# 简单的解析命令行参数
# 参数有 -solution_id -time_limit -memory_limit -test_case_id
# 获取到的值是['xxx.py', '-solution_id', '1111', '-time_limit', '1000', '-memory_limit', '100', '-test_case_id', 'aaaa']
args = sys.argv
submission_id = args[2]
time_limit = args[4]
memory_limit = args[6]
test_case_id = args[8]
def db_conn():
return MySQLdb.connect(db=submission_db["db"],
user=submission_db["user"],
passwd=submission_db["password"],
host=submission_db["host"],
port=submission_db["port"], charset="utf8")
conn = db_conn()
cur = conn.cursor()
cur.execute("select language, code from submission where id = %s", (submission_id,))
data = cur.fetchall()
if not data:
exit()
language_code = data[0][0]
code = data[0][1]
conn.close()
# 将代码写入文件
language = languages[language_code]
src_path = judger_workspace + "run/" + language["src_name"]
f = open(src_path, "w")
f.write(code.encode("utf8"))
f.close()
# 编译
try:
exe_path = compile_(language, src_path, judger_workspace + "run/")
except Exception as e:
print e
conn = db_conn()
cur = conn.cursor()
cur.execute("update submission set result=%s, info=%s where id=%s",
(result["compile_error"], str(e), submission_id))
conn.commit()
exit()
# 运行
try:
client = JudgeClient(language_code=language_code,
exe_path=exe_path,
max_cpu_time=int(time_limit),
max_real_time=int(time_limit) * 2,
max_memory=int(memory_limit),
test_case_dir=judger_workspace + "test_case/" + test_case_id + "/")
judge_result = {"result": result["accepted"], "info": client.run(), "accepted_answer_time": None}
for item in judge_result["info"]:
if item["result"]:
judge_result["result"] = item["result"]
break
else:
l = sorted(judge_result["info"], key=lambda k: k["cpu_time"])
judge_result["accepted_answer_time"] = l[-1]["cpu_time"]
except Exception as e:
logger.error(e)
conn = db_conn()
cur = conn.cursor()
cur.execute("update submission set result=%s, info=%s where id=%s", (result["system_error"], str(e), submission_id))
conn.commit()
exit()
conn = db_conn()
cur = conn.cursor()
cur.execute("update submission set result=%s, info=%s, accepted_answer_time=%s where id=%s",
(judge_result["result"], json.dumps(judge_result["info"]), judge_result["accepted_answer_time"],
submission_id))
conn.commit()
conn.close()

View File

@ -1 +0,0 @@
celery -A judge.controller worker -l DEBUG

View File

@ -1,9 +0,0 @@
# coding=utf-8
from __future__ import absolute_import
from celery import Celery, platforms
from .settings import redis_config
app = Celery("judge", broker='redis://%s:%s/%s' % (redis_config["host"], redis_config["port"], redis_config["db"]),
include=["judge.judger_controller.tasks"])
platforms.C_FORCE_ROOT =True

View File

@ -1,39 +0,0 @@
# coding=utf-8
"""
注意
此文件包含 celery 的部分配置但是 celery 并不是运行在docker 中的所以本配置文件中的 redis和 MySQL 的地址就应该是
运行 redis MySQL docker 容器的地址了怎么获取这个地址见帮助文档测试用例的路径和源代码路径同理
"""
import os
# 这个redis 是 celery 使用的,包括存储队列信息还有部分统计信息
redis_config = {
"host": os.environ.get("REDIS_PORT_6379_TCP_ADDR"),
"port": 6379,
"db": 0
}
# 判题的 docker 容器的配置参数
docker_config = {
"image_name": "judger",
"docker_path": "docker",
"shell": True
}
# 测试用例的路径,是主机上的实际路径
test_case_dir = "/root/test_case/"
# 源代码路径,也就是 manage.py 所在的实际路径
source_code_dir = "/root/qduoj/"
# 日志文件夹路径
log_dir = "/root/log/"
# 存储提交信息的数据库,是 celery 使用的,与 oj.settings/local_settings 等区分,那是 web 服务器访问的地址
submission_db = {
"host": os.environ.get("submission_db_host"),
"port": 3306,
"db": "oj_submission",
"user": "root",
"password": "root"
}

View File

@ -1,45 +0,0 @@
# coding=utf-8
import json
import redis
import MySQLdb
import subprocess
from ..judger.result import result
from ..judger_controller.celery import app
from settings import docker_config, source_code_dir, test_case_dir, log_dir, submission_db, redis_config
@app.task
def judge(submission_id, time_limit, memory_limit, test_case_id):
try:
command = "%s run --privileged --rm " \
"--link mysql " \
"-v %s:/var/judger/test_case/:ro " \
"-v %s:/var/judger/code/:ro " \
"-v %s:/var/judger/code/log/ " \
"--device /dev/null:/dev/null " \
"%s " \
"python judge/judger/run.py " \
"--solution_id %s --time_limit %s --memory_limit %s --test_case_id %s" % \
(docker_config["docker_path"],
test_case_dir,
source_code_dir,
log_dir,
docker_config["image_name"],
submission_id, str(time_limit), str(memory_limit), test_case_id)
subprocess.call(command, shell=docker_config["shell"])
except Exception as e:
conn = MySQLdb.connect(db=submission_db["db"],
user=submission_db["user"],
passwd=submission_db["password"],
host=submission_db["host"],
port=submission_db["port"],
charset="utf8")
cur = conn.cursor()
cur.execute("update submission set result=%s, info=%s where id=%s",
(result["system_error"], str(e), submission_id))
conn.commit()
conn.close()
r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"])
r.decr("judge_queue_length")
r.lpush("queue", submission_id)

View File

@ -7,16 +7,16 @@ languages = {
"src_name": "main.c", "src_name": "main.c",
"code": 1, "code": 1,
"syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k",
"compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}main", "compile_command": "gcc -DONLINE_JUDGE -O2 -w -std=c99 {src_path} -lm -o {exe_path}/main",
"execute_command": "{exe_path}main" "execute_command": "{exe_path}/main"
}, },
2: { 2: {
"name": "cpp", "name": "cpp",
"src_name": "main.cpp", "src_name": "main.cpp",
"code": 2, "code": 2,
"syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k", "syscalls": "!execve:k,flock:k,ptrace:k,sync:k,fdatasync:k,fsync:k,msync,sync_file_range:k,syncfs:k,unshare:k,setns:k,clone:k,query_module:k,sysinfo:k,syslog:k,sysfs:k",
"compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}main", "compile_command": "g++ -DONLINE_JUDGE -O2 -w -std=c++11 {src_path} -lm -o {exe_path}/main",
"execute_command": "{exe_path}main" "execute_command": "{exe_path}/main"
}, },
3: { 3: {
"name": "java", "name": "java",

65
judge/runner.py Normal file
View File

@ -0,0 +1,65 @@
# coding=utf-8
import os
import socket
import shutil
from client import JudgeClient
from language import languages
from compiler import compile_
from result import result
from settings import judger_workspace
class JudgeInstanceRunner(object):
def run(self, token, submission_id, language_code, code, time_limit, memory_limit, test_case_id):
language = languages[language_code]
host_name = socket.gethostname()
judge_base_path = os.path.join(judger_workspace, "run", submission_id)
if not token or token != os.environ.get("rpc_token"):
return {"code": 2, "data": {"error": "Invalid token", "server": host_name}}
try:
os.mkdir(judge_base_path)
os.chmod(judge_base_path, 0777)
# 将代码写入文件
src_path = os.path.join(judge_base_path, language["src_name"])
f = open(src_path, "w")
f.write(code.encode("utf8"))
f.close()
except Exception as e:
shutil.rmtree(judge_base_path, ignore_errors=True)
return {"code": 2, "data": {"error": str(e), "server": host_name}}
# 编译
try:
exe_path = compile_(language, src_path, judge_base_path)
except Exception as e:
shutil.rmtree(judge_base_path, ignore_errors=True)
return {"code": 1, "data": {"error": str(e), "server": host_name}}
# 运行
try:
client = JudgeClient(language_code=language_code,
exe_path=exe_path,
max_cpu_time=int(time_limit),
max_real_time=int(time_limit) * 2,
max_memory=int(memory_limit),
test_case_dir=judger_workspace + "test_case/" + test_case_id + "/")
judge_result = {"result": result["accepted"], "info": client.run(),
"accepted_answer_time": None, "server": host_name}
for item in judge_result["info"]:
if item["result"]:
judge_result["result"] = item["result"]
break
else:
l = sorted(judge_result["info"], key=lambda k: k["cpu_time"])
judge_result["accepted_answer_time"] = l[-1]["cpu_time"]
return {"code": 0, "data": judge_result}
except Exception as e:
return {"code": 2, "data": {"error": str(e), "server": host_name}}
finally:
shutil.rmtree(judge_base_path, ignore_errors=True)

13
judge/server.py Normal file
View File

@ -0,0 +1,13 @@
# coding=utf-8
import SocketServer
from SimpleXMLRPCServer import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from runner import JudgeInstanceRunner
class AsyncXMLRPCServer(SocketServer.ThreadingMixIn, SimpleXMLRPCServer):
pass
server = AsyncXMLRPCServer(('0.0.0.0', 8080), SimpleXMLRPCRequestHandler, allow_none=True)
server.register_instance(JudgeInstanceRunner())
server.serve_forever()

View File

@ -0,0 +1,41 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
]
operations = [
migrations.CreateModel(
name='JudgeServer',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('ip', models.GenericIPAddressField()),
('port', models.IntegerField()),
('max_instance_number', models.IntegerField()),
('left_instance_number', models.IntegerField()),
('workload', models.IntegerField(default=0)),
('token', models.CharField(max_length=30)),
('lock', models.BooleanField(default=False)),
('status', models.BooleanField(default=True)),
],
options={
'db_table': 'judge_server',
},
),
migrations.CreateModel(
name='JudgeWaitingQueue',
fields=[
('id', models.AutoField(verbose_name='ID', serialize=False, auto_created=True, primary_key=True)),
('submission_id', models.CharField(max_length=40)),
('create_time', models.DateTimeField(auto_now_add=True)),
],
options={
'db_table': 'judge_waiting_queue',
},
),
]

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('judge_dispatcher', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='judgewaitingqueue',
name='memory_limit',
field=models.IntegerField(default=1),
preserve_default=False,
),
migrations.AddField(
model_name='judgewaitingqueue',
name='test_case_id',
field=models.CharField(default=1, max_length=40),
preserve_default=False,
),
migrations.AddField(
model_name='judgewaitingqueue',
name='time_limit',
field=models.IntegerField(default=1),
preserve_default=False,
),
]

View File

View File

@ -0,0 +1,44 @@
# coding=utf-8
from django.db import models
class JudgeServer(models.Model):
ip = models.GenericIPAddressField()
port = models.IntegerField()
# 这个服务器最大可能运行的判题实例数量
max_instance_number = models.IntegerField()
left_instance_number = models.IntegerField()
workload = models.IntegerField(default=0)
token = models.CharField(max_length=30)
# 进行测试用例同步的时候加锁
lock = models.BooleanField(default=False)
# status 为 false 的时候代表不使用这个服务器
status = models.BooleanField(default=True)
def use_judge_instance(self):
# 因为use 和 release 中间是判题时间,可能这个 model 的数据已经被修改了所以不能直接使用self.xxx否则取到的是旧数据
server = JudgeServer.objects.select_for_update().get(id=self.id)
server.left_instance_number -= 1
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
server.save()
def release_judge_instance(self):
# 使用原子操作
server = JudgeServer.objects.select_for_update().get(id=self.id)
server.left_instance_number += 1
server.workload = 100 - int(float(server.left_instance_number) / server.max_instance_number * 100)
server.save()
class Meta:
db_table = "judge_server"
class JudgeWaitingQueue(models.Model):
submission_id = models.CharField(max_length=40)
time_limit = models.IntegerField()
memory_limit = models.IntegerField()
test_case_id = models.CharField(max_length=40)
create_time = models.DateTimeField(auto_now_add=True)
class Meta:
db_table = "judge_waiting_queue"

View File

@ -0,0 +1,24 @@
# coding=utf-8
import xmlrpclib
import httplib
class TimeoutHTTPConnection(httplib.HTTPConnection):
def __init__(self, host, timeout=10):
httplib.HTTPConnection.__init__(self, host, timeout=timeout)
class TimeoutTransport(xmlrpclib.Transport):
def __init__(self, timeout=10, *args, **kwargs):
xmlrpclib.Transport.__init__(self, *args, **kwargs)
self.timeout = timeout
def make_connection(self, host):
conn = TimeoutHTTPConnection(host, self.timeout)
return conn
class TimeoutServerProxy(xmlrpclib.ServerProxy):
def __init__(self, uri, timeout=10, *args, **kwargs):
kwargs['transport'] = TimeoutTransport(timeout=timeout, use_datetime=kwargs.get('use_datetime', 0))
xmlrpclib.ServerProxy.__init__(self, uri, *args, **kwargs)

149
judge_dispatcher/tasks.py Normal file
View File

@ -0,0 +1,149 @@
# coding=utf-8
import json
import logging
import time
from django.db import transaction
from rpc_client import TimeoutServerProxy
from judge.result import result
from contest.models import ContestProblem, ContestRank, Contest, CONTEST_UNDERWAY
from problem.models import Problem
from submission.models import Submission
from account.models import User
from utils.cache import get_cache_redis
from .models import JudgeServer, JudgeWaitingQueue
logger = logging.getLogger("app_info")
class JudgeDispatcher(object):
def __init__(self, submission, time_limit, memory_limit, test_case_id):
self.submission = submission
self.time_limit = time_limit
self.memory_limit = memory_limit
self.test_case_id = test_case_id
self.user = User.objects.get(id=submission.user_id)
def choose_judge_server(self):
servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload")
if servers.exists():
return servers.first()
def judge(self, is_waiting_task=False):
self.submission.judge_start_time = int(time.time() * 1000)
with transaction.atomic():
judge_server = self.choose_judge_server()
# 如果没有合适的判题服务器,就放入等待队列中等待判题
if not judge_server:
JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit,
memory_limit=self.memory_limit, test_case_id=self.test_case_id)
return
judge_server.use_judge_instance()
try:
s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20)
data = s.run(judge_server.token, self.submission.id, self.submission.language,
self.submission.code, self.time_limit, self.memory_limit, self.test_case_id)
# 编译错误
if data["code"] == 1:
self.submission.result = result["compile_error"]
self.submission.info = data["data"]["error"]
# system error
elif data["code"] == 2:
self.submission.result = result["system_error"]
self.submission.info = data["data"]["error"]
elif data["code"] == 0:
self.submission.result = data["data"]["result"]
self.submission.info = json.dumps(data["data"]["info"])
self.submission.accepted_answer_time = data["data"]["accepted_answer_time"]
except Exception as e:
self.submission.result = result["system_error"]
self.submission.info = str(e)
finally:
with transaction.atomic():
judge_server.release_judge_instance()
self.submission.judge_end_time = int(time.time() * 1000)
self.submission.save()
if self.submission.contest_id:
self.update_contest_problem_status()
else:
self.update_problem_status()
with transaction.atomic():
waiting_submissions = JudgeWaitingQueue.objects.select_for_update().all()
if waiting_submissions.exists():
# 防止循环依赖
from submission.tasks import _judge
waiting_submission = waiting_submissions.first()
submission = Submission.objects.get(id=waiting_submission.submission_id)
waiting_submission.delete()
_judge(submission, time_limit=waiting_submission.time_limit,
memory_limit=waiting_submission.memory_limit, test_case_id=waiting_submission.test_case_id,
is_waiting_task=True)
def update_problem_status(self):
problem = Problem.objects.get(id=self.submission.problem_id)
# 更新普通题目的计数器
problem.add_submission_number()
# 更新用户做题状态
problems_status = self.user.problems_status
if "problems" not in problems_status:
problems_status["problems"] = {}
if self.submission.result == result["accepted"]:
problem.add_ac_number()
problems_status["problems"][str(problem.id)] = 1
else:
problems_status["problems"][str(problem.id)] = 2
self.user.problems_status = problems_status
self.user.save()
# 普通题目的话,到这里就结束了
def update_contest_problem_status(self):
# 能运行到这里的都是比赛题目
contest = Contest.objects.get(id=self.submission.contest_id)
if contest.status != CONTEST_UNDERWAY:
logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + self.submission.id)
return
with transaction.atomic():
contest_problem = ContestProblem.objects.select_for_update().get(contest=contest, id=self.submission.problem_id)
contest_problem.add_submission_number()
# todo 事务
problems_status = self.user.problems_status
if "contest_problems" not in problems_status:
problems_status["contest_problems"] = {}
if self.submission.result == result["accepted"]:
contest_problem.add_ac_number()
problems_status["contest_problems"][str(contest_problem.id)] = 1
else:
problems_status["contest_problems"][str(contest_problem.id)] = 0
self.user.problems_status = problems_status
self.user.save()
self.update_contest_rank(contest)
def update_contest_rank(self, contest):
if contest.real_time_rank:
get_cache_redis().delete(str(contest.id) + "_rank_cache")
with transaction.atomic():
try:
contest_rank = ContestRank.objects.select_for_update().get(contest=contest, user=self.user)
contest_rank.update_rank(self.submission)
except ContestRank.DoesNotExist:
ContestRank.objects.create(contest=contest, user=self.user).update_rank(self.submission)

View File

@ -2,15 +2,15 @@
import redis import redis
import datetime import datetime
from rest_framework.views import APIView from rest_framework.views import APIView
from judge.judger.result import result from judge.result import result
from judge.judger_controller.settings import redis_config from django.conf import settings
from utils.shortcuts import success_response from utils.shortcuts import success_response
from submission.models import Submission from submission.models import Submission
class QueueLengthMonitorAPIView(APIView): class QueueLengthMonitorAPIView(APIView):
def get(self, request): def get(self, request):
r = redis.Redis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"]) r = redis.Redis(host=settings.redis_config["host"], port=settings.redis_config["port"], db=settings.redis_config["db"])
waiting_number = r.get("judge_queue_length") waiting_number = r.get("judge_queue_length")
if waiting_number is None: if waiting_number is None:
waiting_number = 0 waiting_number = 0

View File

@ -1 +0,0 @@
# coding=utf-8

View File

@ -1 +0,0 @@
# coding=utf-8

View File

@ -1 +0,0 @@
# coding=utf-8

View File

@ -1,104 +0,0 @@
# coding=utf-8
import logging
import redis
from django.db import transaction
from judge.judger_controller.settings import redis_config
from judge.judger.result import result
from submission.models import Submission
from problem.models import Problem
from utils.cache import get_cache_redis
from contest.models import ContestProblem, Contest, CONTEST_UNDERWAY, ContestRank
from account.models import User
logger = logging.getLogger("app_info")
class MessageQueue(object):
def __init__(self):
self.conn = redis.StrictRedis(host=redis_config["host"], port=redis_config["port"], db=redis_config["db"])
self.queue = 'queue'
def listen_task(self):
while True:
submission_id = self.conn.blpop(self.queue, 0)[1]
logger.debug("receive submission_id: " + submission_id)
try:
submission = Submission.objects.get(id=submission_id)
except Submission.DoesNotExist:
logger.warning("Submission does not exist, submission_id: " + submission_id)
continue
# 更新该用户的解题状态用
try:
user = User.objects.get(pk=submission.user_id)
except User.DoesNotExist:
logger.warning("Submission user does not exist, submission_id: " + submission_id)
continue
if not submission.contest_id:
try:
problem = Problem.objects.get(id=submission.problem_id)
except Problem.DoesNotExist:
logger.warning("Submission problem does not exist, submission_id: " + submission_id)
continue
problems_status = user.problems_status
# 更新普通题目的计数器
problem.add_submission_number()
if "problems" not in problems_status:
problems_status["problems"] = {}
if submission.result == result["accepted"]:
problem.add_ac_number()
problems_status["problems"][str(problem.id)] = 1
else:
problems_status["problems"][str(problem.id)] = 2
user.problems_status = problems_status
user.save()
# 普通题目的话,到这里就结束了
continue
# 能运行到这里的都是比赛题目
try:
contest = Contest.objects.get(id=submission.contest_id)
if contest.status != CONTEST_UNDERWAY:
logger.info("Contest debug mode, id: " + str(contest.id) + ", submission id: " + submission_id)
continue
contest_problem = ContestProblem.objects.get(contest=contest, id=submission.problem_id)
except Contest.DoesNotExist:
logger.warning("Submission contest does not exist, submission_id: " + submission_id)
continue
except ContestProblem.DoesNotExist:
logger.warning("Submission problem does not exist, submission_id: " + submission_id)
continue
# 如果比赛现在不是封榜状态,删除比赛的排名缓存
if contest.real_time_rank:
get_cache_redis().delete(str(contest.id) + "_rank_cache")
with transaction.atomic():
try:
contest_rank = ContestRank.objects.get(contest=contest, user=user)
contest_rank.update_rank(submission)
except ContestRank.DoesNotExist:
ContestRank.objects.create(contest=contest, user=user).update_rank(submission)
problems_status = user.problems_status
contest_problem.add_submission_number()
if "contest_problems" not in problems_status:
problems_status["contest_problems"] = {}
if submission.result == result["accepted"]:
contest_problem.add_ac_number()
problems_status["contest_problems"][str(contest_problem.id)] = 1
else:
problems_status["contest_problems"][str(contest_problem.id)] = 0
user.problems_status = problems_status
user.save()
logger.debug("Start message queue")
MessageQueue().listen_task()

View File

@ -6,4 +6,4 @@
\___/ |_| |_||_||_||_| |_| \___| \___/ \__,_| \__,_| \__, | \___| |_.__/ \__, | \__, | \__,_| \__,_| \___/ |_| |_||_||_||_| |_| \___| \___/ \__,_| \__,_| \__, | \___| |_.__/ \__, | \__, | \__,_| \__,_|
|___/ |___/ |_| |___/ |___/ |_|
https://github.com/QingdaoU/OnlineJudge https://github.com/QingdaoU/OnlineJudge
""" """

View File

@ -22,6 +22,12 @@ REDIS_CACHE = {
"db": 1 "db": 1
} }
REDIS_QUEUE = {
"host": "127.0.0.1",
"port": 6379,
"db": 2
}
DEBUG = True DEBUG = True
ALLOWED_HOSTS = [] ALLOWED_HOSTS = []

View File

@ -31,6 +31,13 @@ REDIS_CACHE = {
"db": 1 "db": 1
} }
REDIS_QUEUE = {
"host": os.environ.get("REDIS_PORT_6379_TCP_ADDR", "127.0.0.1"),
"port": 6379,
"db": 2
}
DEBUG = False DEBUG = False
ALLOWED_HOSTS = ['*'] ALLOWED_HOSTS = ['*']

View File

@ -22,6 +22,12 @@ if ENV == "local":
elif ENV == "server": elif ENV == "server":
from .server_settings import * from .server_settings import *
BROKER_BACKEND = "redis"
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
@ -48,11 +54,12 @@ INSTALLED_APPS = (
'problem', 'problem',
'admin', 'admin',
'submission', 'submission',
'mq',
'contest', 'contest',
'judge',
'judge_dispatcher',
'django_extensions',
'rest_framework', 'rest_framework',
'huey.djhuey',
) )
if DEBUG: if DEBUG:
@ -185,7 +192,19 @@ WEBSITE_INFO = {"website_name": "qduoj",
"website_footer": u"青岛大学信息工程学院 创新实验室 <a href=\"http://www.miibeian.gov.cn/\">京ICP备15062075号-1</a>", "website_footer": u"青岛大学信息工程学院 创新实验室 <a href=\"http://www.miibeian.gov.cn/\">京ICP备15062075号-1</a>",
"url": "https://qduoj.com"} "url": "https://qduoj.com"}
HUEY = {
'backend': 'huey.backends.redis_backend',
'name': 'task_queue',
'connection': {'host': REDIS_QUEUE["host"], 'port': REDIS_QUEUE["port"], 'db': REDIS_QUEUE["db"]},
'always_eager': False, # Defaults to False when running via manage.py run_huey
# Options to pass into the consumer when running ``manage.py run_huey``
'consumer_options': {'workers': 50},
}
SMTP_CONFIG = {"smtp_server": "smtp.mxhichina.com", SMTP_CONFIG = {"smtp_server": "smtp.mxhichina.com",
"email": "noreply@qduoj.com", "email": "noreply@qduoj.com",
"password": os.environ.get("smtp_password", "111111"), "password": os.environ.get("smtp_password", "111111"),
"tls": False} "tls": False}

View File

@ -1 +0,0 @@
nohup celery -A judge.judger_controller worker -l DEBUG &

View File

@ -0,0 +1,24 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('submission', '0006_submission_shared'),
]
operations = [
migrations.AddField(
model_name='submission',
name='judge_end_time',
field=models.IntegerField(null=True, blank=True),
),
migrations.AddField(
model_name='submission',
name='judge_start_time',
field=models.IntegerField(null=True, blank=True),
),
]

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2015-12-08 13:06
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('submission', '0007_auto_20151207_1645'),
]
operations = [
migrations.AlterField(
model_name='submission',
name='judge_end_time',
field=models.BigIntegerField(blank=True, null=True),
),
migrations.AlterField(
model_name='submission',
name='judge_start_time',
field=models.BigIntegerField(blank=True, null=True),
),
]

View File

@ -1,13 +1,17 @@
# coding=utf-8 # coding=utf-8
from django.db import models from django.db import models
from utils.shortcuts import rand_str from utils.shortcuts import rand_str
from judge.judger.result import result from judge.result import result
class Submission(models.Model): class Submission(models.Model):
id = models.CharField(max_length=32, default=rand_str, primary_key=True, db_index=True) id = models.CharField(max_length=32, default=rand_str, primary_key=True, db_index=True)
user_id = models.IntegerField(db_index=True) user_id = models.IntegerField(db_index=True)
create_time = models.DateTimeField(auto_now_add=True) create_time = models.DateTimeField(auto_now_add=True)
# 判题开始时间
judge_start_time = models.BigIntegerField(blank=True, null=True)
# 判题结束时间
judge_end_time = models.BigIntegerField(blank=True, null=True)
result = models.IntegerField(default=result["waiting"]) result = models.IntegerField(default=result["waiting"])
language = models.IntegerField() language = models.IntegerField()
code = models.TextField() code = models.TextField()
@ -24,3 +28,6 @@ class Submission(models.Model):
class Meta: class Meta:
db_table = "submission" db_table = "submission"
def __unicode__(self):
return self.id

9
submission/tasks.py Normal file
View File

@ -0,0 +1,9 @@
# coding=utf-8
from huey.djhuey import task
from judge_dispatcher.tasks import JudgeDispatcher
@task()
def _judge(submission, time_limit, memory_limit, test_case_id, is_waiting_task=False):
JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge(is_waiting_task)

View File

@ -7,14 +7,13 @@ from django.shortcuts import render
from django.core.paginator import Paginator from django.core.paginator import Paginator
from rest_framework.views import APIView from rest_framework.views import APIView
from judge.judger_controller.tasks import judge
from account.decorators import login_required, super_admin_required from account.decorators import login_required, super_admin_required
from account.models import SUPER_ADMIN, User from account.models import SUPER_ADMIN, User
from problem.models import Problem from problem.models import Problem
from contest.models import ContestProblem, Contest from contest.models import ContestProblem, Contest
from contest.decorators import check_user_contest_permission from contest.decorators import check_user_contest_permission
from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate
from utils.cache import get_cache_redis from .tasks import _judge
from .models import Submission from .models import Submission
from .serializers import (CreateSubmissionSerializer, SubmissionSerializer, from .serializers import (CreateSubmissionSerializer, SubmissionSerializer,
SubmissionhareSerializer, SubmissionRejudgeSerializer, SubmissionhareSerializer, SubmissionRejudgeSerializer,
@ -23,11 +22,6 @@ from .serializers import (CreateSubmissionSerializer, SubmissionSerializer,
logger = logging.getLogger("app_info") logger = logging.getLogger("app_info")
def _judge(submission_id, time_limit, memory_limit, test_case_id):
judge.delay(submission_id, time_limit, memory_limit, test_case_id)
get_cache_redis().incr("judge_queue_length")
class SubmissionAPIView(APIView): class SubmissionAPIView(APIView):
@login_required @login_required
def post(self, request): def post(self, request):
@ -49,7 +43,7 @@ class SubmissionAPIView(APIView):
problem_id=problem.id) problem_id=problem.id)
try: try:
_judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return error_response(u"提交判题任务失败") return error_response(u"提交判题任务失败")
@ -94,7 +88,7 @@ class ContestSubmissionAPIView(APIView):
code=data["code"], code=data["code"],
problem_id=problem.id) problem_id=problem.id)
try: try:
_judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return error_response(u"提交判题任务失败") return error_response(u"提交判题任务失败")
@ -279,7 +273,7 @@ class SubmissionRejudgeAdminAPIView(APIView):
except Problem.DoesNotExist: except Problem.DoesNotExist:
return error_response(u"题目不存在") return error_response(u"题目不存在")
try: try:
_judge(submission.id, problem.time_limit, problem.memory_limit, problem.test_case_id) _judge(submission, problem.time_limit, problem.memory_limit, problem.test_case_id)
except Exception as e: except Exception as e:
logger.error(e) logger.error(e)
return error_response(u"提交判题任务失败") return error_response(u"提交判题任务失败")