remove celery and use dramatiq

This commit is contained in:
virusdefender 2019-03-11 16:21:29 +08:00
parent 1af50c0f4c
commit a5f0c8eb31
16 changed files with 65 additions and 61 deletions

View File

@ -1,14 +1,13 @@
import logging
from celery import shared_task
import dramatiq
from options.options import SysOptions
from utils.shortcuts import send_email
from utils.shortcuts import send_email, DRAMATIQ_WORKER_ARGS
logger = logging.getLogger(__name__)
@shared_task
@dramatiq.actor(**DRAMATIQ_WORKER_ARGS(max_retries=3))
def send_email_async(from_name, to_email, to_name, subject, content):
if not SysOptions.smtp_config:
return

View File

@ -302,11 +302,11 @@ class ApplyResetPasswordAPI(APIView):
"link": f"{SysOptions.website_base_url}/reset-password/{user.reset_password_token}"
}
email_html = render_to_string("reset_password_email.html", render_data)
send_email_async.delay(from_name=SysOptions.website_name_shortcut,
to_email=user.email,
to_name=user.username,
subject=f"Reset your password",
content=email_html)
send_email_async.send(from_name=SysOptions.website_name_shortcut,
to_email=user.email,
to_name=user.username,
subject=f"Reset your password",
content=email_html)
return self.success("Succeeded")

View File

@ -234,7 +234,7 @@ class DownloadContestSubmissions(APIView):
exclude_admin = request.GET.get("exclude_admin") == "1"
zip_path = self._dump_submissions(contest, exclude_admin)
delete_files.apply_async((zip_path,), countdown=300)
delete_files.send_with_options(args=(zip_path,), delay=300_000)
resp = FileResponse(open(zip_path, "rb"))
resp["Content-Type"] = "application/zip"
resp["Content-Disposition"] = f"attachment;filename={os.path.basename(zip_path)}"

View File

@ -1,6 +1,3 @@
amqp==2.4.2
billiard==3.5.0.5
celery==4.2.1
certifi==2019.3.9
chardet==3.0.4
coverage==4.5.3
@ -15,7 +12,6 @@ flake8-quotes==1.0.0
gunicorn==19.9.0
idna==2.8
jsonfield==2.0.2
kombu==4.4.0
mccabe==0.6.1
otpauth==1.0.1
Pillow==5.4.1
@ -30,5 +26,6 @@ redis==3.2.0
requests==2.21.0
six==1.12.0
urllib3==1.24.1
vine==1.2.0
XlsxWriter==1.1.5
django-dramatiq==0.5.0
dramatiq==1.3.0

View File

@ -38,12 +38,12 @@ startsecs=5
stopwaitsecs = 5
killasgroup=true
[program:celery]
command=celery -A oj worker -l warning --autoscale 2,%(ENV_MAX_WORKER_NUM)s
[program:dramatiq]
command=python3 manage.py rundramatiq --no-reload --processes %(ENV_MAX_WORKER_NUM)s --threads 4
directory=/app/
user=nobody
stdout_logfile=/data/log/celery.log
stderr_logfile=/data/log/celery.log
stdout_logfile=/data/log/dramatiq.log
stderr_logfile=/data/log/dramatiq.log
autostart=true
autorestart=true
startsecs=5

View File

@ -26,7 +26,7 @@ def process_pending_task():
# 防止循环引入
from judge.tasks import judge_task
data = json.loads(cache.rpop(CacheKey.waiting_queue).decode("utf-8"))
judge_task.delay(**data)
judge_task.send(**data)
class DispatcherBase(object):

View File

@ -1,12 +1,12 @@
from __future__ import absolute_import, unicode_literals
from celery import shared_task
import dramatiq
from account.models import User
from submission.models import Submission
from judge.dispatcher import JudgeDispatcher
from utils.shortcuts import DRAMATIQ_WORKER_ARGS
@shared_task
@dramatiq.actor(**DRAMATIQ_WORKER_ARGS())
def judge_task(submission_id, problem_id):
uid = Submission.objects.get(id=submission_id).user_id
if User.objects.get(id=uid).is_disabled:

View File

@ -1,6 +0,0 @@
from __future__ import absolute_import, unicode_literals
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ["celery_app"]

View File

@ -1,18 +0,0 @@
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the "celery" program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "oj.settings")
app = Celery("oj")
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object("django.conf:settings")
# load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
# app.autodiscover_tasks()

View File

@ -33,7 +33,8 @@ VENDOR_APPS = (
'django.contrib.messages',
'django.contrib.staticfiles',
'rest_framework',
'raven.contrib.django.raven_compat'
'raven.contrib.django.raven_compat',
'django_dramatiq',
)
LOCAL_APPS = (
'account',
@ -164,6 +165,11 @@ LOGGING = {
'level': 'ERROR',
'propagate': True,
},
'dramatiq': {
'handlers': LOGGING_HANDLERS,
'level': 'DEBUG',
'propagate': False,
},
'': {
'handlers': LOGGING_HANDLERS,
'level': 'WARNING',
@ -202,11 +208,32 @@ CACHES = {
SESSION_ENGINE = "django.contrib.sessions.backends.cache"
SESSION_CACHE_ALIAS = "default"
CELERY_RESULT_BACKEND = f"{REDIS_URL}/2"
BROKER_URL = f"{REDIS_URL}/3"
CELERY_TASK_SOFT_TIME_LIMIT = CELERY_TASK_TIME_LIMIT = 180
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
DRAMATIQ_BROKER = {
"BROKER": "dramatiq.brokers.redis.RedisBroker",
"OPTIONS": {
"url": f"{REDIS_URL}/4",
},
"MIDDLEWARE": [
# "dramatiq.middleware.Prometheus",
"dramatiq.middleware.AgeLimit",
"dramatiq.middleware.TimeLimit",
"dramatiq.middleware.Callbacks",
"dramatiq.middleware.Retries",
# "django_dramatiq.middleware.AdminMiddleware",
"django_dramatiq.middleware.DbConnectionsMiddleware"
]
}
DRAMATIQ_RESULT_BACKEND = {
"BACKEND": "dramatiq.results.backends.redis.RedisBackend",
"BACKEND_OPTIONS": {
"url": f"{REDIS_URL}/4",
},
"MIDDLEWARE_OPTIONS": {
"result_ttl": None
}
}
RAVEN_CONFIG = {
'dsn': 'https://b200023b8aed4d708fb593c5e0a6ad3d:1fddaba168f84fcf97e0d549faaeaff0@sentry.io/263057'
}

View File

@ -300,8 +300,6 @@ class ProblemAPI(ProblemBase):
except Problem.DoesNotExist:
return self.error("Problem does not exists")
ensure_created_by(problem, request.user)
if Submission.objects.filter(problem=problem).exists():
return self.error("Can't delete the problem as it has submissions")
d = os.path.join(settings.TEST_CASE_DIR, problem.test_case_id)
if os.path.isdir(d):
shutil.rmtree(d, ignore_errors=True)
@ -541,7 +539,7 @@ class ExportProblemAPI(APIView):
with zipfile.ZipFile(path, "w") as zip_file:
for index, problem in enumerate(problems):
self.process_one_problem(zip_file=zip_file, user=request.user, problem=problem, index=index + 1)
delete_files.apply_async((path,), countdown=300)
delete_files.send_with_options(args=(path,), delay=300_000)
resp = FileResponse(open(path, "rb"))
resp["Content-Type"] = "application/zip"
resp["Content-Disposition"] = f"attachment;filename=problem-export.zip"

View File

@ -18,5 +18,5 @@ class SubmissionRejudgeAPI(APIView):
submission.statistic_info = {}
submission.save()
judge_task.delay(submission.id, submission.problem.id)
judge_task.send(submission.id, submission.problem.id)
return self.success()

View File

@ -80,7 +80,7 @@ class SubmissionAPI(APIView):
contest_id=data.get("contest_id"))
# use this for debug
# JudgeDispatcher(submission.id, problem.id).judge()
judge_task.delay(submission.id, problem.id)
judge_task.send(submission.id, problem.id)
if hide_id:
return self.success()
else:

View File

@ -1,7 +1,6 @@
import functools
import json
import logging
from collections import OrderedDict
from django.http import HttpResponse, QueryDict
from django.utils.decorators import method_decorator
@ -98,6 +97,8 @@ class APIView(View):
elif isinstance(errors, list):
return self.extract_errors(errors[0], key)
return key, errors
def invalid_serializer(self, serializer):
key, error = self.extract_errors(serializer.errors)
if key == "non_field_errors":

View File

@ -81,3 +81,7 @@ def send_email(smtp_config, from_name, to_email, to_name, subject, content):
def get_env(name, default=""):
return os.environ.get(name, default)
def DRAMATIQ_WORKER_ARGS(time_limit=3600_000, max_retries=0, max_age=7200_000):
return {"max_retries": max_retries, "time_limit": time_limit, "max_age": max_age}

View File

@ -1,8 +1,10 @@
import os
from celery import shared_task
import dramatiq
from utils.shortcuts import DRAMATIQ_WORKER_ARGS
@shared_task
@dramatiq.actor(**DRAMATIQ_WORKER_ARGS())
def delete_files(*args):
for item in args:
try: