full steps:
-
start django
-
start a celery worker
python manage.py celery worker --app=celery_worker:app -Ofair -n W1
-
upload a url list file, loop url list send each url to a task
fetch_article -
worker works
-
upload another url list file
-
worker no actions
views.py:
@csrf_exempt
def upload(request):
job_name = request.POST.get('job_name')
if not job_name:
return JsonResponse(JsonStatus.Error)
if len(request.FILES) == 1:
yq_data = request.FILES.values()[0]
else:
return JsonResponse(JsonStatus.Error)
job = Job.objects.create(name=job_name)
reader = csv.reader(yq_data, delimiter=',')
task_count = 0
next(reader)
for row in reader:
url = row[0].strip()
fetch_article.delay(job.id, url)
# fetch_article.apply_async(args=[job.id, url], queue=job.queue_name)
task_count += 1
# print 'qn%s' % job.queue_name
# rp = celery_app.control.add_consumer(queue=job.queue_name, reply=True)
# print rp
job.task_count = task_count
job.save()
return JsonResponse(JsonStatus.OK, msg=task_count)
tasks.py
@shared_task()
def fetch_article(job_id, url):
logger.info(u'fetch_article:%s' % url)
Processer = get_processor_cls(url)
a = Article(job_id=job_id, url=url)
try:
ap = Processer(url)
title, text = ap.process()
a.title = title
a.content = text
except Exception as e:
a.status = 2
a.error = e
logger.error(u'fetch_article:%s error:%s' % (url, e))
a.save()