OnlineJudge/problem/views.py

322 lines
12 KiB
Python
Raw Normal View History

# coding=utf-8
import zipfile
import re
import os
2015-08-10 06:30:06 +00:00
import hashlib
import json
import logging
from django.shortcuts import render
from django.db.models import Q, Count
2015-08-14 08:38:31 +00:00
from django.core.paginator import Paginator
from django.utils.timezone import now
from django.conf import settings
from rest_framework.views import APIView
from account.models import SUPER_ADMIN
from account.decorators import super_admin_required
from utils.shortcuts import (serializer_invalid_response, error_response,
success_response, paginate, rand_str, error_page)
2015-08-10 09:59:39 +00:00
from .serizalizers import (CreateProblemSerializer, EditProblemSerializer, ProblemSerializer,
ProblemTagSerializer)
from .models import Problem, ProblemTag
from .decorators import check_user_problem_permission
logger = logging.getLogger("app_info")
2015-09-17 01:44:31 +00:00
def problem_page(request, problem_id):
"""
前台题目详情页
"""
try:
problem = Problem.objects.get(id=problem_id, visible=True)
except Problem.DoesNotExist:
return error_page(request, u"题目不存在")
return render(request, "oj/problem/problem.html", {"problem": problem, "samples": json.loads(problem.samples)})
2015-08-16 08:52:26 +00:00
class ProblemTagAdminAPIView(APIView):
"""
获取所有标签的列表
"""
2015-08-16 08:52:26 +00:00
def get(self, request):
return success_response(ProblemTagSerializer(ProblemTag.objects.all(), many=True).data)
class ProblemAdminAPIView(APIView):
@super_admin_required
def post(self, request):
"""
题目发布json api接口
---
request_serializer: CreateProblemSerializer
response_serializer: ProblemSerializer
"""
serializer = CreateProblemSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
2015-10-10 10:49:34 +00:00
try:
Problem.objects.get(title=data["title"])
2015-10-10 10:49:34 +00:00
return error_response(u"添加失败,存在重复的题目")
except Problem.DoesNotExist:
pass
problem = Problem.objects.create(title=data["title"],
description=data["description"],
input_description=data["input_description"],
output_description=data["output_description"],
test_case_id=data["test_case_id"],
source=data["source"],
samples=json.dumps(data["samples"]),
time_limit=data["time_limit"],
memory_limit=data["memory_limit"],
difficulty=data["difficulty"],
created_by=request.user,
hint=data["hint"],
visible=data["visible"])
for tag in data["tags"]:
try:
tag = ProblemTag.objects.get(name=tag)
except ProblemTag.DoesNotExist:
tag = ProblemTag.objects.create(name=tag)
problem.tags.add(tag)
return success_response(ProblemSerializer(problem).data)
else:
return serializer_invalid_response(serializer)
@check_user_problem_permission
def put(self, request):
"""
题目编辑json api接口
---
request_serializer: EditProblemSerializer
response_serializer: ProblemSerializer
"""
serializer = EditProblemSerializer(data=request.data)
if serializer.is_valid():
data = serializer.data
problem = Problem.objects.get(id=data["id"])
problem.title = data["title"]
problem.description = data["description"]
problem.input_description = data["input_description"]
problem.output_description = data["output_description"]
problem.test_case_id = data["test_case_id"]
problem.source = data["source"]
problem.time_limit = data["time_limit"]
problem.memory_limit = data["memory_limit"]
problem.difficulty = data["difficulty"]
problem.samples = json.dumps(data["samples"])
problem.hint = data["hint"]
problem.visible = data["visible"]
problem.last_update_time = now()
# 删除原有的标签的对应关系
problem.tags.remove(*problem.tags.all())
# 重新添加所有的标签
2015-08-15 05:54:30 +00:00
for tag in data["tags"]:
try:
tag = ProblemTag.objects.get(name=tag)
except ProblemTag.DoesNotExist:
tag = ProblemTag.objects.create(name=tag)
problem.tags.add(tag)
problem.save()
return success_response(ProblemSerializer(problem).data)
else:
return serializer_invalid_response(serializer)
def get(self, request):
"""
题目分页json api接口
---
response_serializer: ProblemSerializer
"""
2015-08-15 05:54:30 +00:00
problem_id = request.GET.get("problem_id", None)
if problem_id:
try:
# 普通管理员只能获取自己创建的题目
# 超级管理员可以获取全部的题目
2015-08-15 05:54:30 +00:00
problem = Problem.objects.get(id=problem_id)
if request.user.admin_type != SUPER_ADMIN and problem.created_by != request.user:
return error_response(u"题目不存在")
2015-08-15 05:54:30 +00:00
return success_response(ProblemSerializer(problem).data)
except Problem.DoesNotExist:
return error_response(u"题目不存在")
# 获取问题列表
problems = Problem.objects.all().order_by("-create_time")
if request.user.admin_type != SUPER_ADMIN:
problems = problems.filter(created_by=request.user)
visible = request.GET.get("visible", None)
if visible:
problems = problems.filter(visible=(visible == "true"))
keyword = request.GET.get("keyword", None)
if keyword:
problems = problems.filter(Q(title__contains=keyword) |
Q(description__contains=keyword))
return paginate(request, problems, ProblemSerializer)
class TestCaseUploadAPIView(APIView):
"""
上传题目的测试用例
"""
def _is_legal_test_case_file_name(self, file_name):
# 正整数开头的 .in 或者.out 结尾的
regex = r"^[1-9]\d*\.(in|out)$"
return re.compile(regex).match(file_name) is not None
def post(self, request):
if "file" not in request.FILES:
return error_response(u"文件上传失败")
f = request.FILES["file"]
tmp_zip = "/tmp/" + rand_str() + ".zip"
try:
with open(tmp_zip, "wb") as test_case_zip:
for chunk in f:
test_case_zip.write(chunk)
except IOError as e:
logger.error(e)
return error_response(u"上传失败")
2015-10-17 12:00:40 +00:00
try:
test_case_file = zipfile.ZipFile(tmp_zip, 'r')
except Exception:
return error_response(u"解压失败")
name_list = test_case_file.namelist()
# 如果文件是直接打包的那么name_list 就是["1.in", "1.out"]这样的
if len(name_list) % 2 == 1:
return error_response(u"测试用例文件格式错误,文件数目为奇数")
for index in range(1, len(name_list) / 2 + 1):
if not (str(index) + ".in" in name_list and str(index) + ".out" in name_list):
return error_response(u"测试用例文件格式错误,缺少" + str(index) + u".in/.out文件")
problem_test_dir = rand_str()
test_case_dir = settings.TEST_CASE_DIR + problem_test_dir + "/"
# 得到了合法的测试用例文件列表 然后去解压缩
os.mkdir(test_case_dir)
for name in name_list:
f = open(test_case_dir + name, "wb")
try:
f.write(test_case_file.read(name).replace("\r\n", "\n"))
except MemoryError:
return error_response(u"单个测试数据体积过大!")
finally:
f.close()
name_list.sort()
file_info = {"test_case_number": len(name_list) / 2, "test_cases": {}}
# 计算输出文件的md5
for i in range(1, len(name_list) / 2 + 1):
md5 = hashlib.md5()
striped_md5 = hashlib.md5()
f = open(test_case_dir + str(i) + ".out", "r")
# 完整文件的md5
while True:
data = f.read(2 ** 8)
if not data:
break
md5.update(data)
# 删除标准输出最后的空格和换行
# 这时只能一次全部读入了,分块读的话,没办法确定文件结尾
f.seek(0)
striped_md5.update(f.read().rstrip())
file_info["test_cases"][str(i)] = {"input_name": str(i) + ".in",
"output_name": str(i) + ".out",
"output_md5": md5.hexdigest(),
"striped_output_md5": striped_md5.hexdigest(),
"input_size": os.path.getsize(test_case_dir + str(i) + ".in"),
"output_size": os.path.getsize(test_case_dir + str(i) + ".out")}
# 写入配置文件
with open(test_case_dir + "info", "w") as f:
f.write(json.dumps(file_info))
return success_response({"test_case_id": problem_test_dir,
"file_list": file_info["test_cases"]})
2015-08-14 08:38:31 +00:00
def get(self, request):
test_case_id = request.GET.get("test_case_id", None)
if not test_case_id:
return error_response(u"参数错误")
test_case_config = settings.TEST_CASE_DIR + test_case_id + "/info"
try:
f = open(test_case_config)
config = json.loads(f.read())
f.close()
except Exception as e:
return error_response(u"读取测试用例出错")
return success_response({"file_list": config["test_cases"]})
2015-08-14 08:38:31 +00:00
def problem_list_page(request, page=1):
"""
前台的问题列表
"""
2015-08-15 05:54:30 +00:00
# 正常情况
problems = Problem.objects.filter(visible=True)
2015-08-15 05:54:30 +00:00
# 搜索的情况
keyword = request.GET.get("keyword", None)
if keyword:
problems = problems.filter(Q(title__contains=keyword) | Q(description__contains=keyword))
2015-08-15 05:54:30 +00:00
2015-09-11 15:22:42 +00:00
difficulty_order = request.GET.get("order_by", None)
if difficulty_order:
if difficulty_order[0] == "-":
problems = problems.order_by("-difficulty")
difficulty_order = "difficulty"
else:
problems = problems.order_by("difficulty")
difficulty_order = "-difficulty"
else:
difficulty_order = "difficulty"
2015-08-15 05:54:30 +00:00
# 按照标签筛选
tag_text = request.GET.get("tag", None)
if tag_text:
try:
tag = ProblemTag.objects.get(name=tag_text)
except ProblemTag.DoesNotExist:
return error_page(request, u"标签不存在")
problems = tag.problem_set.all().filter(visible=True)
2015-08-15 05:54:30 +00:00
2015-08-14 08:38:31 +00:00
paginator = Paginator(problems, 20)
try:
current_page = paginator.page(int(page))
except Exception:
2015-08-15 05:54:30 +00:00
return error_page(request, u"不存在的页码")
2015-08-14 08:38:31 +00:00
previous_page = next_page = None
try:
previous_page = current_page.previous_page_number()
except Exception:
pass
try:
next_page = current_page.next_page_number()
except Exception:
pass
# 右侧标签列表 按照关联的题目的数量排序 排除题目数量为0的
tags = ProblemTag.objects.annotate(problem_number=Count("problem")).filter(problem_number__gt=0).order_by(
"-problem_number")
2015-08-15 05:54:30 +00:00
return render(request, "oj/problem/problem_list.html",
{"problems": current_page, "page": int(page),
"previous_page": previous_page, "next_page": next_page,
"keyword": keyword, "tag": tag_text,
2015-09-11 15:22:42 +00:00
"tags": tags, "difficulty_order": difficulty_order})