每次有判题任务完成的时候,递归调用自己判断还有没有等待的任务

This commit is contained in:
virusdefender 2015-12-07 23:20:27 +08:00
parent 6bce16b853
commit 1337b26d50
5 changed files with 73 additions and 12 deletions

View File

@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('judge_dispatcher', '0001_initial'),
]
operations = [
migrations.AddField(
model_name='judgewaitingqueue',
name='memory_limit',
field=models.IntegerField(default=1),
preserve_default=False,
),
migrations.AddField(
model_name='judgewaitingqueue',
name='test_case_id',
field=models.CharField(default=1, max_length=40),
preserve_default=False,
),
migrations.AddField(
model_name='judgewaitingqueue',
name='time_limit',
field=models.IntegerField(default=1),
preserve_default=False,
),
]

View File

@ -17,7 +17,14 @@ class JudgeServer(models.Model):
def use_judge_instance(self):
self.left_instance_number -= 1
self.workload = 100 - int(self.left_instance_number / self.max_instance_number)
self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100)
print self.left_instance_number, self.workload
self.save()
def release_judge_instance(self):
self.left_instance_number += 1
self.workload = 100 - int(float(self.left_instance_number) / self.max_instance_number * 100)
print self.left_instance_number, self.workload
self.save()
class Meta:
@ -26,6 +33,9 @@ class JudgeServer(models.Model):
class JudgeWaitingQueue(models.Model):
submission_id = models.CharField(max_length=40)
time_limit = models.IntegerField()
memory_limit = models.IntegerField()
test_case_id = models.CharField(max_length=40)
create_time = models.DateTimeField(auto_now_add=True)
class Meta:

View File

@ -13,7 +13,8 @@ from problem.models import Problem
from submission.models import Submission
from account.models import User
from utils.cache import get_cache_redis
from .models import JudgeServer,JudgeWaitingQueue
from .models import JudgeServer, JudgeWaitingQueue
logger = logging.getLogger("app_info")
@ -25,25 +26,31 @@ class JudgeDispatcher(object):
self.memory_limit = memory_limit
self.test_case_id = test_case_id
self.user = User.objects.get(id=submission.user_id)
def choose_judge_server(self):
servers = JudgeServer.objects.filter(workload__lt=100, lock=False, status=True).order_by("-workload")
if servers.exists():
return servers[0]
def judge(self):
server = servers[0]
server.use_judge_instance()
return server
def judge(self, is_waiting_task=False):
self.submission.judge_start_time = int(time.time() * 1000)
try:
judge_server = self.choose_judge_server()
# 如果没有合适的判题服务器,就放入等待队列中等待判题
if not judge_server:
JudgeWaitingQueue.objects.create(submission_id=self.submission.id)
if not is_waiting_task:
JudgeWaitingQueue.objects.create(submission_id=self.submission.id, time_limit=self.time_limit,
memory_limit=self.memory_limit, test_case_id=self.test_case_id)
return
s = TimeoutServerProxy("http://" + judge_server.ip + ":" + str(judge_server.port), timeout=20)
data = s.run(judge_server.token, self.submission.id, self.submission.language,
self.submission.code, self.time_limit, self.memory_limit, self.test_case_id)
judge_server.release_judge_instance()
# 编译错误
if data["code"] == 1:
self.submission.result = result["compile_error"]
@ -67,7 +74,19 @@ class JudgeDispatcher(object):
self.update_contest_problem_status()
else:
self.update_problem_status()
if is_waiting_task:
JudgeWaitingQueue.objects.filter(submission_id=self.submission.id).delete()
waiting_submissions = JudgeWaitingQueue.objects.all()
if waiting_submissions.exists():
submission = waiting_submissions.first()
# 防止循环依赖
from submission.tasks import _judge
_judge(Submission.objects.get(id=submission.submission_id), time_limit=submission.time_limit,
memory_limit=submission.memory_limit, test_case_id=submission.test_case_id,
is_waiting_task=True)
def update_problem_status(self):
problem = Problem.objects.get(id=self.submission.problem_id)
@ -109,7 +128,7 @@ class JudgeDispatcher(object):
self.user.save()
self.update_contest_rank(contest)
def update_contest_rank(self, contest):
if contest.real_time_rank:
get_cache_redis().delete(str(contest.id) + "_rank_cache")

View File

@ -5,5 +5,5 @@ from judge_dispatcher.tasks import JudgeDispatcher
@task()
def _judge(submission, time_limit, memory_limit, test_case_id):
JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge()
def _judge(submission, time_limit, memory_limit, test_case_id, is_waiting_task=False):
JudgeDispatcher(submission, time_limit, memory_limit, test_case_id).judge(is_waiting_task)

View File

@ -13,7 +13,7 @@ from problem.models import Problem
from contest.models import ContestProblem, Contest
from contest.decorators import check_user_contest_permission
from utils.shortcuts import serializer_invalid_response, error_response, success_response, error_page, paginate
from .task import _judge
from .tasks import _judge
from .models import Submission
from .serializers import (CreateSubmissionSerializer, SubmissionSerializer,
SubmissionhareSerializer, SubmissionRejudgeSerializer,