# flake8: noqa import os import sys import re import json import django sys.path.append("../") os.environ.setdefault("DJANGO_SETTINGS_MODULE", "oj.settings") django.setup() from django.conf import settings from account.models import User, UserProfile, AdminType, ProblemPermission from problem.models import Problem, ProblemTag, ProblemDifficulty, ProblemRuleType admin_type_map = { 0: AdminType.REGULAR_USER, 1: AdminType.ADMIN, 2: AdminType.SUPER_ADMIN } languages_map = { 1: "C", 2: "C++", 3: "Java" } email_regex = re.compile(r"(^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+$)") # pk -> name tags = {} # pk -> user obj users = {} problems = [] def get_input_result(): while True: resp = input() if resp not in ["yes", "no"]: print("Please input yes or no") continue return resp == "yes" def set_problem_display_id_prefix(): while True: print("Please input a prefix which will be used in all the imported problem's displayID") print( "For example, if your input is 'old'(no quote), the problems' display id will be old1, old2, old3..\ninput:", end="") resp = input() if resp.strip(): return resp.strip() else: print("Empty prefix detected, sure to do that? (yes/no)") if get_input_result(): return "" def get_test_case_score(test_case_id): info_path = os.path.join(settings.TEST_CASE_DIR, test_case_id, "info") if not os.path.exists(info_path): return [] with open(info_path, "r") as info_file: info = json.load(info_file) test_case_score = [] for test_case in info["test_cases"].values(): test_case_score.append({"input_name": test_case["input_name"], "output_name": test_case.get("output_name", "-"), "score": 0}) return test_case_score def import_users(): i = 0 print("Find %d users in old data." % len(users.keys())) print("import users now? (yes/no)") if get_input_result(): for data in users.values(): if not email_regex.match(data["email"]): print("%s will not be created due to invalid email: %s" % (data["username"], data["email"])) continue data["username"] = data["username"].lower() user, created = User.objects.get_or_create(username=data["username"]) if not created: print("%s already exists, omitted" % user.username) continue user.password = data["password"] user.email = data["email"] admin_type = admin_type_map[data["admin_type"]] user.admin_type = admin_type if admin_type == AdminType.ADMIN: user.problem_permission = ProblemPermission.OWN elif admin_type == AdminType.SUPER_ADMIN: user.problem_permission = ProblemPermission.ALL user.save() UserProfile.objects.create(user=user, real_name=data["real_name"]) i += 1 print("%s imported successfully" % user.username) print("%d users have successfully imported\n" % i) def import_tags(): i = 0 print("\nFind these tags in old data:") print(", ".join(tags.values()), '\n') print("import tags now? (yes/no)") if get_input_result(): for tagname in tags.values(): tag, created = ProblemTag.objects.get_or_create(name=tagname) if not created: print("%s already exists, omitted" % tagname) else: print("%s tag created successfully" % tagname) i += 1 print("%d tags have successfully imported\n" % i) else: print("Problem depends on problem_tags and users, exit..") exit(1) def import_problems(): i = 0 print("\nFind %d problems in old data" % len(problems)) prefix = set_problem_display_id_prefix() print("import problems using prefix: %s? (yes/no)" % prefix) if get_input_result(): default_creator = User.objects.first() for data in problems: try: creator_id = \ User.objects.filter(username=users[data["created_by"]]["username"]).values_list("id", flat=True)[0] except (User.DoesNotExist, IndexError): print("The origin creator does not exist, set it to default_creator") creator_id = default_creator.id data["created_by_id"] = creator_id data.pop("created_by") data["_id"] = prefix + str(data.pop("id")) data["difficulty"] = ProblemDifficulty.Mid if data["spj_language"]: data["spj_language"] = languages_map[data["spj_language"]] data["samples"] = json.loads(data["samples"]) data["languages"] = ["C", "C++"] test_case_score = get_test_case_score(data["test_case_id"]) if not test_case_score: print("%s test_case files don't exist, omitted" % data["title"]) continue data["test_case_score"] = test_case_score data["rule_type"] = ProblemRuleType.ACM data["template"] = {} data["visible"] = False data.pop("total_submit_number") data.pop("total_accepted_number") tag_ids = data.pop("tags") problem = Problem.objects.create(**data) problem.create_time = data["create_time"] problem.save() for tag_id in tag_ids: tag, _ = ProblemTag.objects.get_or_create(name=tags[tag_id]) problem.tags.add(tag) i += 1 print("%s imported successfully" % data["title"]) print("%d problems have successfully imported" % i) if __name__ == "__main__": if len(sys.argv) == 1: print("Usage: python3 %s [old_data_path]" % sys.argv[0]) exit(0) data_path = sys.argv[1] if not os.path.isfile(data_path): print("Data file does not exist") exit(1) with open(data_path, "r") as data_file: old_data = json.load(data_file) print("Read old data successfully.\n") for obj in old_data: if obj["model"] == "problem.problemtag": tags[obj["pk"]] = obj["fields"]["name"] elif obj["model"] == "account.user": users[obj["pk"]] = obj["fields"] elif obj["model"] == "problem.problem": obj["fields"]["id"] = obj["pk"] problems.append(obj["fields"]) import_users() import_tags() import_problems()