mirror of
https://github.com/mediacms-io/mediacms.git
synced 2024-12-12 01:50:39 +01:00
e9739bab45
* avoid calling post save signals * remove stale celery ids that prevent new services from starting
793 lines
28 KiB
Python
793 lines
28 KiB
Python
import json
|
|
import os
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import tempfile
|
|
from datetime import datetime, timedelta
|
|
|
|
from celery import Task
|
|
from celery import shared_task as task
|
|
from celery.exceptions import SoftTimeLimitExceeded
|
|
from celery.signals import task_revoked
|
|
|
|
# from celery.task.control import revoke
|
|
from celery.utils.log import get_task_logger
|
|
from django.conf import settings
|
|
from django.core.cache import cache
|
|
from django.core.files import File
|
|
from django.db.models import Q
|
|
|
|
from actions.models import USER_MEDIA_ACTIONS, MediaAction
|
|
from users.models import User
|
|
|
|
from .backends import FFmpegBackend
|
|
from .exceptions import VideoEncodingError
|
|
from .helpers import (
|
|
calculate_seconds,
|
|
create_temp_file,
|
|
get_file_name,
|
|
get_file_type,
|
|
media_file_info,
|
|
produce_ffmpeg_commands,
|
|
produce_friendly_token,
|
|
rm_file,
|
|
run_command,
|
|
)
|
|
from .methods import list_tasks, notify_users, pre_save_action
|
|
from .models import Category, EncodeProfile, Encoding, Media, Rating, Tag
|
|
|
|
logger = get_task_logger(__name__)
|
|
|
|
VALID_USER_ACTIONS = [action for action, name in USER_MEDIA_ACTIONS]
|
|
|
|
ERRORS_LIST = [
|
|
"Output file is empty, nothing was encoded",
|
|
"Invalid data found when processing input",
|
|
"Unable to find a suitable output format for",
|
|
]
|
|
|
|
|
|
@task(name="chunkize_media", bind=True, queue="short_tasks", soft_time_limit=60 * 30)
|
|
def chunkize_media(self, friendly_token, profiles, force=True):
|
|
"""Break media in chunks and start encoding tasks"""
|
|
|
|
profiles = [EncodeProfile.objects.get(id=profile) for profile in profiles]
|
|
media = Media.objects.get(friendly_token=friendly_token)
|
|
cwd = os.path.dirname(os.path.realpath(media.media_file.path))
|
|
file_name = media.media_file.path.split("/")[-1]
|
|
random_prefix = produce_friendly_token()
|
|
file_format = "{0}_{1}".format(random_prefix, file_name)
|
|
chunks_file_name = "%02d_{0}".format(file_format)
|
|
chunks_file_name += ".mkv"
|
|
cmd = [
|
|
settings.FFMPEG_COMMAND,
|
|
"-y",
|
|
"-i",
|
|
media.media_file.path,
|
|
"-c",
|
|
"copy",
|
|
"-f",
|
|
"segment",
|
|
"-segment_time",
|
|
str(settings.VIDEO_CHUNKS_DURATION),
|
|
chunks_file_name,
|
|
]
|
|
chunks = []
|
|
ret = run_command(cmd, cwd=cwd)
|
|
|
|
if "out" in ret.keys():
|
|
for line in ret.get("error").split("\n"):
|
|
ch = re.findall(r"Opening \'([\W\w]+)\' for writing", line)
|
|
if ch:
|
|
chunks.append(ch[0])
|
|
if not chunks:
|
|
# command completely failed to segment file.putting to normal encode
|
|
logger.info("Failed to break file {0} in chunks." " Putting to normal encode queue".format(friendly_token))
|
|
for profile in profiles:
|
|
if media.video_height and media.video_height < profile.resolution:
|
|
if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
|
|
continue
|
|
encoding = Encoding(media=media, profile=profile)
|
|
encoding.save()
|
|
enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
|
|
encode_media.delay(friendly_token, profile.id, encoding.id, enc_url, force=force)
|
|
return False
|
|
|
|
chunks = [os.path.join(cwd, ch) for ch in chunks]
|
|
to_profiles = []
|
|
chunks_dict = {}
|
|
# calculate once md5sums
|
|
for chunk in chunks:
|
|
cmd = ["md5sum", chunk]
|
|
stdout = run_command(cmd).get("out")
|
|
md5sum = stdout.strip().split()[0]
|
|
chunks_dict[chunk] = md5sum
|
|
|
|
for profile in profiles:
|
|
if media.video_height and media.video_height < profile.resolution:
|
|
if profile.resolution not in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
|
|
continue
|
|
to_profiles.append(profile)
|
|
|
|
for chunk in chunks:
|
|
encoding = Encoding(
|
|
media=media,
|
|
profile=profile,
|
|
chunk_file_path=chunk,
|
|
chunk=True,
|
|
chunks_info=json.dumps(chunks_dict),
|
|
md5sum=chunks_dict[chunk],
|
|
)
|
|
|
|
encoding.save()
|
|
enc_url = settings.SSL_FRONTEND_HOST + encoding.get_absolute_url()
|
|
if profile.resolution in settings.MINIMUM_RESOLUTIONS_TO_ENCODE:
|
|
priority = 0
|
|
else:
|
|
priority = 9
|
|
encode_media.apply_async(
|
|
args=[friendly_token, profile.id, encoding.id, enc_url],
|
|
kwargs={"force": force, "chunk": True, "chunk_file_path": chunk},
|
|
priority=priority,
|
|
)
|
|
|
|
logger.info("got {0} chunks and will encode to {1} profiles".format(len(chunks), to_profiles))
|
|
return True
|
|
|
|
|
|
class EncodingTask(Task):
|
|
def on_failure(self, exc, task_id, args, kwargs, einfo):
|
|
# mainly used to run some post failure steps
|
|
# we get here if a task is revoked
|
|
try:
|
|
if hasattr(self, "encoding"):
|
|
self.encoding.status = "fail"
|
|
self.encoding.save(update_fields=["status"])
|
|
kill_ffmpeg_process(self.encoding.temp_file)
|
|
if hasattr(self.encoding, "media"):
|
|
self.encoding.media.post_encode_actions()
|
|
except BaseException:
|
|
pass
|
|
return False
|
|
|
|
|
|
@task(
|
|
name="encode_media",
|
|
base=EncodingTask,
|
|
bind=True,
|
|
queue="long_tasks",
|
|
soft_time_limit=settings.CELERY_SOFT_TIME_LIMIT,
|
|
)
|
|
def encode_media(
|
|
self,
|
|
friendly_token,
|
|
profile_id,
|
|
encoding_id,
|
|
encoding_url,
|
|
force=True,
|
|
chunk=False,
|
|
chunk_file_path="",
|
|
):
|
|
"""Encode a media to given profile, using ffmpeg, storing progress"""
|
|
|
|
logger.info("Encode Media started, friendly token {0}, profile id {1}, force {2}".format(friendly_token, profile_id, force))
|
|
|
|
if self.request.id:
|
|
task_id = self.request.id
|
|
else:
|
|
task_id = None
|
|
try:
|
|
media = Media.objects.get(friendly_token=friendly_token)
|
|
profile = EncodeProfile.objects.get(id=profile_id)
|
|
except BaseException:
|
|
Encoding.objects.filter(id=encoding_id).delete()
|
|
return False
|
|
|
|
# break logic with chunk True/False
|
|
if chunk:
|
|
# TODO: in case a video is chunkized and this enters here many times
|
|
# it will always run since chunk_file_path is always different
|
|
# thus find a better way for this check
|
|
if Encoding.objects.filter(media=media, profile=profile, chunk_file_path=chunk_file_path).count() > 1 and force is False:
|
|
Encoding.objects.filter(id=encoding_id).delete()
|
|
return False
|
|
else:
|
|
try:
|
|
encoding = Encoding.objects.get(id=encoding_id)
|
|
encoding.status = "running"
|
|
Encoding.objects.filter(
|
|
media=media,
|
|
profile=profile,
|
|
chunk=True,
|
|
chunk_file_path=chunk_file_path,
|
|
).exclude(id=encoding_id).delete()
|
|
except BaseException:
|
|
encoding = Encoding(
|
|
media=media,
|
|
profile=profile,
|
|
status="running",
|
|
chunk=True,
|
|
chunk_file_path=chunk_file_path,
|
|
)
|
|
else:
|
|
if Encoding.objects.filter(media=media, profile=profile).count() > 1 and force is False:
|
|
Encoding.objects.filter(id=encoding_id).delete()
|
|
return False
|
|
else:
|
|
try:
|
|
encoding = Encoding.objects.get(id=encoding_id)
|
|
encoding.status = "running"
|
|
Encoding.objects.filter(media=media, profile=profile).exclude(id=encoding_id).delete()
|
|
except BaseException:
|
|
encoding = Encoding(media=media, profile=profile, status="running")
|
|
|
|
if task_id:
|
|
encoding.task_id = task_id
|
|
encoding.worker = "localhost"
|
|
encoding.retries = self.request.retries
|
|
encoding.save()
|
|
|
|
if profile.extension == "gif":
|
|
tf = create_temp_file(suffix=".gif")
|
|
# -ss 5 start from 5 second. -t 25 until 25 sec
|
|
command = [
|
|
settings.FFMPEG_COMMAND,
|
|
"-y",
|
|
"-ss",
|
|
"3",
|
|
"-i",
|
|
media.media_file.path,
|
|
"-hide_banner",
|
|
"-vf",
|
|
"scale=344:-1:flags=lanczos,fps=1",
|
|
"-t",
|
|
"25",
|
|
"-f",
|
|
"gif",
|
|
tf,
|
|
]
|
|
ret = run_command(command)
|
|
if os.path.exists(tf) and get_file_type(tf) == "image":
|
|
with open(tf, "rb") as f:
|
|
myfile = File(f)
|
|
encoding.status = "success"
|
|
encoding.media_file.save(content=myfile, name=tf)
|
|
rm_file(tf)
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
if chunk:
|
|
original_media_path = chunk_file_path
|
|
else:
|
|
original_media_path = media.media_file.path
|
|
|
|
# if not media.duration:
|
|
# encoding.status = "fail"
|
|
# encoding.save(update_fields=["status"])
|
|
# return False
|
|
|
|
with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
|
|
tf = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
|
|
tfpass = create_temp_file(suffix=".{0}".format(profile.extension), dir=temp_dir)
|
|
ffmpeg_commands = produce_ffmpeg_commands(
|
|
original_media_path,
|
|
media.media_info,
|
|
resolution=profile.resolution,
|
|
codec=profile.codec,
|
|
output_filename=tf,
|
|
pass_file=tfpass,
|
|
chunk=chunk,
|
|
)
|
|
if not ffmpeg_commands:
|
|
encoding.status = "fail"
|
|
encoding.save(update_fields=["status"])
|
|
return False
|
|
|
|
encoding.temp_file = tf
|
|
encoding.commands = str(ffmpeg_commands)
|
|
|
|
encoding.save(update_fields=["temp_file", "commands", "task_id"])
|
|
|
|
# binding these, so they are available on on_failure
|
|
self.encoding = encoding
|
|
self.media = media
|
|
# can be one-pass or two-pass
|
|
for ffmpeg_command in ffmpeg_commands:
|
|
ffmpeg_command = [str(s) for s in ffmpeg_command]
|
|
encoding_backend = FFmpegBackend()
|
|
try:
|
|
encoding_command = encoding_backend.encode(ffmpeg_command)
|
|
duration, n_times = 0, 0
|
|
output = ""
|
|
while encoding_command:
|
|
try:
|
|
# TODO: understand an eternal loop
|
|
# eg h265 with mv4 file issue, and stop with error
|
|
output = next(encoding_command)
|
|
duration = calculate_seconds(output)
|
|
if duration:
|
|
percent = duration * 100 / media.duration
|
|
if n_times % 60 == 0:
|
|
encoding.progress = percent
|
|
try:
|
|
encoding.save(update_fields=["progress", "update_date"])
|
|
logger.info("Saved {0}".format(round(percent, 2)))
|
|
except BaseException:
|
|
pass
|
|
n_times += 1
|
|
except StopIteration:
|
|
break
|
|
except VideoEncodingError:
|
|
# ffmpeg error, or ffmpeg was killed
|
|
raise
|
|
except Exception as e:
|
|
try:
|
|
# output is empty, fail message is on the exception
|
|
output = e.message
|
|
except AttributeError:
|
|
output = ""
|
|
if isinstance(e, SoftTimeLimitExceeded):
|
|
kill_ffmpeg_process(encoding.temp_file)
|
|
encoding.logs = output
|
|
encoding.status = "fail"
|
|
encoding.save(update_fields=["status", "logs"])
|
|
raise_exception = True
|
|
# if this is an ffmpeg's valid error
|
|
# no need for the task to be re-run
|
|
# otherwise rerun task...
|
|
for error_msg in ERRORS_LIST:
|
|
if error_msg.lower() in output.lower():
|
|
raise_exception = False
|
|
if raise_exception:
|
|
raise self.retry(exc=e, countdown=5, max_retries=1)
|
|
|
|
encoding.logs = output
|
|
encoding.progress = 100
|
|
|
|
success = False
|
|
encoding.status = "fail"
|
|
if os.path.exists(tf) and os.path.getsize(tf) != 0:
|
|
ret = media_file_info(tf)
|
|
if ret.get("is_video") or ret.get("is_audio"):
|
|
encoding.status = "success"
|
|
success = True
|
|
|
|
with open(tf, "rb") as f:
|
|
myfile = File(f)
|
|
output_name = "{0}.{1}".format(get_file_name(original_media_path), profile.extension)
|
|
encoding.media_file.save(content=myfile, name=output_name)
|
|
encoding.total_run_time = (encoding.update_date - encoding.add_date).seconds
|
|
|
|
try:
|
|
encoding.save(update_fields=["status", "logs", "progress", "total_run_time"])
|
|
# this will raise a django.db.utils.DatabaseError error when task is revoked,
|
|
# since we delete the encoding at that stage
|
|
except BaseException:
|
|
pass
|
|
|
|
return success
|
|
|
|
|
|
@task(name="produce_sprite_from_video", queue="long_tasks")
|
|
def produce_sprite_from_video(friendly_token):
|
|
"""Produces a sprites file for a video, uses ffmpeg"""
|
|
|
|
try:
|
|
media = Media.objects.get(friendly_token=friendly_token)
|
|
except BaseException:
|
|
logger.info("failed to get media with friendly_token %s" % friendly_token)
|
|
return False
|
|
|
|
with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as tmpdirname:
|
|
try:
|
|
tmpdir_image_files = tmpdirname + "/img%03d.jpg"
|
|
output_name = tmpdirname + "/sprites.jpg"
|
|
cmd = "{0} -i {1} -f image2 -vf 'fps=1/10, scale=160:90' {2}&&files=$(ls {3}/img*.jpg | sort -t '-' -n -k 2 | tr '\n' ' ')&&convert $files -append {4}".format(
|
|
settings.FFMPEG_COMMAND,
|
|
media.media_file.path,
|
|
tmpdir_image_files,
|
|
tmpdirname,
|
|
output_name,
|
|
)
|
|
subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
|
|
if os.path.exists(output_name) and get_file_type(output_name) == "image":
|
|
with open(output_name, "rb") as f:
|
|
myfile = File(f)
|
|
media.sprites.save(
|
|
content=myfile,
|
|
name=get_file_name(media.media_file.path) + "sprites.jpg",
|
|
)
|
|
except BaseException:
|
|
pass
|
|
return True
|
|
|
|
|
|
@task(name="create_hls", queue="long_tasks")
|
|
def create_hls(friendly_token):
|
|
"""Creates HLS file for media, uses Bento4 mp4hls command"""
|
|
|
|
if not hasattr(settings, "MP4HLS_COMMAND"):
|
|
logger.info("Bento4 mp4hls command is missing from configuration")
|
|
return False
|
|
|
|
if not os.path.exists(settings.MP4HLS_COMMAND):
|
|
logger.info("Bento4 mp4hls command is missing")
|
|
return False
|
|
|
|
try:
|
|
media = Media.objects.get(friendly_token=friendly_token)
|
|
except BaseException:
|
|
logger.info("failed to get media with friendly_token %s" % friendly_token)
|
|
return False
|
|
|
|
p = media.uid.hex
|
|
output_dir = os.path.join(settings.HLS_DIR, p)
|
|
encodings = media.encodings.filter(profile__extension="mp4", status="success", chunk=False, profile__codec="h264")
|
|
if encodings:
|
|
existing_output_dir = None
|
|
if os.path.exists(output_dir):
|
|
existing_output_dir = output_dir
|
|
output_dir = os.path.join(settings.HLS_DIR, p + produce_friendly_token())
|
|
files = " ".join([f.media_file.path for f in encodings if f.media_file])
|
|
cmd = "{0} --segment-duration=4 --output-dir={1} {2}".format(settings.MP4HLS_COMMAND, output_dir, files)
|
|
subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
|
|
if existing_output_dir:
|
|
# override content with -T !
|
|
cmd = "cp -rT {0} {1}".format(output_dir, existing_output_dir)
|
|
subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
|
|
shutil.rmtree(output_dir)
|
|
output_dir = existing_output_dir
|
|
pp = os.path.join(output_dir, "master.m3u8")
|
|
if os.path.exists(pp):
|
|
if media.hls_file != pp:
|
|
media.hls_file = pp
|
|
media.save(update_fields=["hls_file"])
|
|
return True
|
|
|
|
|
|
@task(name="check_running_states", queue="short_tasks")
|
|
def check_running_states():
|
|
# Experimental - unused
|
|
"""Check stale running encodings and delete/reencode media"""
|
|
|
|
encodings = Encoding.objects.filter(status="running")
|
|
|
|
logger.info("got {0} encodings that are in state running".format(encodings.count()))
|
|
changed = 0
|
|
for encoding in encodings:
|
|
now = datetime.now(encoding.update_date.tzinfo)
|
|
if (now - encoding.update_date).seconds > settings.RUNNING_STATE_STALE:
|
|
media = encoding.media
|
|
profile = encoding.profile
|
|
# task_id = encoding.task_id
|
|
# terminate task
|
|
# TODO: not imported
|
|
# if task_id:
|
|
# revoke(task_id, terminate=True)
|
|
encoding.delete()
|
|
media.encode(profiles=[profile])
|
|
# TODO: allign with new code + chunksize...
|
|
changed += 1
|
|
if changed:
|
|
logger.info("changed from running to pending on {0} items".format(changed))
|
|
return True
|
|
|
|
|
|
@task(name="check_media_states", queue="short_tasks")
|
|
def check_media_states():
|
|
# Experimental - unused
|
|
# check encoding status of not success media
|
|
media = Media.objects.filter(Q(encoding_status="running") | Q(encoding_status="fail") | Q(encoding_status="pending"))
|
|
|
|
logger.info("got {0} media that are not in state success".format(media.count()))
|
|
|
|
changed = 0
|
|
for m in media:
|
|
m.set_encoding_status()
|
|
m.save(update_fields=["encoding_status"])
|
|
changed += 1
|
|
if changed:
|
|
logger.info("changed encoding status to {0} media items".format(changed))
|
|
return True
|
|
|
|
|
|
@task(name="check_pending_states", queue="short_tasks")
|
|
def check_pending_states():
|
|
# Experimental - unused
|
|
# check encoding profiles that are on state pending and not on a queue
|
|
encodings = Encoding.objects.filter(status="pending")
|
|
|
|
if not encodings:
|
|
return True
|
|
|
|
changed = 0
|
|
tasks = list_tasks()
|
|
task_ids = tasks["task_ids"]
|
|
media_profile_pairs = tasks["media_profile_pairs"]
|
|
for encoding in encodings:
|
|
if encoding.task_id and encoding.task_id in task_ids:
|
|
# encoding is in one of the active/reserved/scheduled tasks list
|
|
continue
|
|
elif (
|
|
encoding.media.friendly_token,
|
|
encoding.profile.id,
|
|
) in media_profile_pairs:
|
|
continue
|
|
# encoding is in one of the reserved/scheduled tasks list.
|
|
# has no task_id but will be run, so need to re-enter the queue
|
|
else:
|
|
media = encoding.media
|
|
profile = encoding.profile
|
|
encoding.delete()
|
|
media.encode(profiles=[profile], force=False)
|
|
changed += 1
|
|
if changed:
|
|
logger.info("set to the encode queue {0} encodings that were on pending state".format(changed))
|
|
return True
|
|
|
|
|
|
@task(name="check_missing_profiles", queue="short_tasks")
|
|
def check_missing_profiles():
|
|
# Experimental - unused
|
|
|
|
# check if video files have missing profiles. If so, add them
|
|
media = Media.objects.filter(media_type="video")
|
|
profiles = list(EncodeProfile.objects.all())
|
|
|
|
changed = 0
|
|
|
|
for m in media:
|
|
existing_profiles = [p.profile for p in m.encodings.all()]
|
|
missing_profiles = [p for p in profiles if p not in existing_profiles]
|
|
if missing_profiles:
|
|
m.encode(profiles=missing_profiles, force=False)
|
|
# since we call with force=False
|
|
# encode_media won't delete existing profiles
|
|
# if they appear on the meanwhile (eg on a big queue)
|
|
changed += 1
|
|
if changed:
|
|
logger.info("set to the encode queue {0} profiles".format(changed))
|
|
return True
|
|
|
|
|
|
@task(name="clear_sessions", queue="short_tasks")
|
|
def clear_sessions():
|
|
"""Clear expired sessions"""
|
|
|
|
try:
|
|
from importlib import import_module
|
|
|
|
from django.conf import settings
|
|
|
|
engine = import_module(settings.SESSION_ENGINE)
|
|
engine.SessionStore.clear_expired()
|
|
except BaseException:
|
|
return False
|
|
return True
|
|
|
|
|
|
@task(name="save_user_action", queue="short_tasks")
|
|
def save_user_action(user_or_session, friendly_token=None, action="watch", extra_info=None):
|
|
"""Short task that saves a user action"""
|
|
|
|
if action not in VALID_USER_ACTIONS:
|
|
return False
|
|
|
|
try:
|
|
media = Media.objects.get(friendly_token=friendly_token)
|
|
except BaseException:
|
|
return False
|
|
|
|
user = user_or_session.get("user_id")
|
|
session_key = user_or_session.get("user_session")
|
|
remote_ip = user_or_session.get("remote_ip_addr")
|
|
|
|
if user:
|
|
try:
|
|
user = User.objects.get(id=user)
|
|
except BaseException:
|
|
return False
|
|
|
|
if not (user or session_key):
|
|
return False
|
|
|
|
if action in ["like", "dislike", "watch", "report"]:
|
|
if not pre_save_action(
|
|
media=media,
|
|
user=user,
|
|
session_key=session_key,
|
|
action=action,
|
|
remote_ip=remote_ip,
|
|
):
|
|
return False
|
|
|
|
if action == "watch":
|
|
if user:
|
|
MediaAction.objects.filter(user=user, media=media, action="watch").delete()
|
|
else:
|
|
MediaAction.objects.filter(session_key=session_key, media=media, action="watch").delete()
|
|
if action == "rate":
|
|
try:
|
|
score = extra_info.get("score")
|
|
rating_category = extra_info.get("category_id")
|
|
except BaseException:
|
|
# TODO: better error handling?
|
|
return False
|
|
try:
|
|
rating = Rating.objects.filter(user=user, media=media, rating_category_id=rating_category).first()
|
|
if rating:
|
|
rating.score = score
|
|
rating.save(update_fields=["score"])
|
|
else:
|
|
rating = Rating.objects.create(
|
|
user=user,
|
|
media=media,
|
|
rating_category_id=rating_category,
|
|
score=score,
|
|
)
|
|
except Exception:
|
|
# TODO: more specific handling, for errors in score, or
|
|
# rating_category?
|
|
return False
|
|
|
|
ma = MediaAction(
|
|
user=user,
|
|
session_key=session_key,
|
|
media=media,
|
|
action=action,
|
|
extra_info=extra_info,
|
|
remote_ip=remote_ip,
|
|
)
|
|
ma.save()
|
|
|
|
if action == "watch":
|
|
media.views += 1
|
|
Media.objects.filter(friendly_token=friendly_token).update(views=media.views)
|
|
|
|
# update field without calling save, to avoid post_save signals being triggered
|
|
# same in other actions
|
|
|
|
elif action == "report":
|
|
media.reported_times += 1
|
|
|
|
if media.reported_times >= settings.REPORTED_TIMES_THRESHOLD:
|
|
media.state = "private"
|
|
media.save(update_fields=["reported_times", "state"])
|
|
|
|
notify_users(
|
|
friendly_token=media.friendly_token,
|
|
action="media_reported",
|
|
extra=extra_info,
|
|
)
|
|
elif action == "like":
|
|
media.likes += 1
|
|
Media.objects.filter(friendly_token=friendly_token).update(likes=media.likes)
|
|
elif action == "dislike":
|
|
media.dislikes += 1
|
|
Media.objects.filter(friendly_token=friendly_token).update(dislikes=media.dislikes)
|
|
|
|
return True
|
|
|
|
|
|
@task(name="get_list_of_popular_media", queue="long_tasks")
|
|
def get_list_of_popular_media():
|
|
"""Experimental task for preparing media listing
|
|
for index page / recommended section
|
|
calculate and return the top 50 popular media, based on two rules
|
|
X = the top 25 videos that have the most views during the last week
|
|
Y = the most recent 25 videos that have been liked over the last 6 months
|
|
"""
|
|
|
|
valid_media_x = {}
|
|
valid_media_y = {}
|
|
basic_query = Q(listable=True)
|
|
media_x = Media.objects.filter(basic_query).values("friendly_token")
|
|
|
|
period_x = datetime.now() - timedelta(days=7)
|
|
period_y = datetime.now() - timedelta(days=30 * 6)
|
|
|
|
for media in media_x:
|
|
ft = media["friendly_token"]
|
|
num = MediaAction.objects.filter(action_date__gte=period_x, action="watch", media__friendly_token=ft).count()
|
|
if num:
|
|
valid_media_x[ft] = num
|
|
num = MediaAction.objects.filter(action_date__gte=period_y, action="like", media__friendly_token=ft).count()
|
|
if num:
|
|
valid_media_y[ft] = num
|
|
|
|
x = sorted(valid_media_x.items(), key=lambda kv: kv[1], reverse=True)[:25]
|
|
y = sorted(valid_media_y.items(), key=lambda kv: kv[1], reverse=True)[:25]
|
|
|
|
media_ids = [a[0] for a in x]
|
|
media_ids.extend([a[0] for a in y])
|
|
media_ids = list(set(media_ids))
|
|
cache.set("popular_media_ids", media_ids, 60 * 60 * 12)
|
|
logger.info("saved popular media ids")
|
|
|
|
return True
|
|
|
|
|
|
@task(name="update_listings_thumbnails", queue="long_tasks")
|
|
def update_listings_thumbnails():
|
|
"""Populate listings_thumbnail field for models"""
|
|
|
|
# Categories
|
|
used_media = []
|
|
saved = 0
|
|
qs = Category.objects.filter().order_by("-media_count")
|
|
for object in qs:
|
|
media = Media.objects.exclude(friendly_token__in=used_media).filter(category=object, state="public", is_reviewed=True).order_by("-views").first()
|
|
if media:
|
|
object.listings_thumbnail = media.thumbnail_url
|
|
object.save(update_fields=["listings_thumbnail"])
|
|
used_media.append(media.friendly_token)
|
|
saved += 1
|
|
logger.info("updated {} categories".format(saved))
|
|
|
|
# Tags
|
|
used_media = []
|
|
saved = 0
|
|
qs = Tag.objects.filter().order_by("-media_count")
|
|
for object in qs:
|
|
media = Media.objects.exclude(friendly_token__in=used_media).filter(tags=object, state="public", is_reviewed=True).order_by("-views").first()
|
|
if media:
|
|
object.listings_thumbnail = media.thumbnail_url
|
|
object.save(update_fields=["listings_thumbnail"])
|
|
used_media.append(media.friendly_token)
|
|
saved += 1
|
|
logger.info("updated {} tags".format(saved))
|
|
|
|
return True
|
|
|
|
|
|
@task_revoked.connect
|
|
def task_sent_handler(sender=None, headers=None, body=None, **kwargs):
|
|
# For encode_media tasks that are revoked,
|
|
# ffmpeg command won't be stopped, since
|
|
# it got started by a subprocess.
|
|
# Need to stop that process
|
|
# Also, removing the Encoding object,
|
|
# since the task that would prepare it was killed
|
|
# Maybe add a killed state for Encoding objects
|
|
try:
|
|
uid = kwargs["request"].task_id
|
|
if uid:
|
|
encoding = Encoding.objects.get(task_id=uid)
|
|
encoding.delete()
|
|
logger.info("deleted the Encoding object")
|
|
if encoding.temp_file:
|
|
kill_ffmpeg_process(encoding.temp_file)
|
|
|
|
except BaseException:
|
|
pass
|
|
|
|
return True
|
|
|
|
|
|
def kill_ffmpeg_process(filepath):
|
|
# this is not ideal, ffmpeg pid could be linked to the Encoding object
|
|
cmd = "ps aux|grep 'ffmpeg'|grep %s|grep -v grep |awk '{print $2}'" % filepath
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
|
|
pid = result.stdout.decode("utf-8").strip()
|
|
if pid:
|
|
cmd = "kill -9 %s" % pid
|
|
result = subprocess.run(cmd, stdout=subprocess.PIPE, shell=True)
|
|
return result
|
|
|
|
|
|
@task(name="remove_media_file", base=Task, queue="long_tasks")
|
|
def remove_media_file(media_file=None):
|
|
rm_file(media_file)
|
|
return True
|
|
|
|
|
|
# TODO LIST
|
|
# 1 chunks are deleted from original server when file is fully encoded.
|
|
# however need to enter this logic in cases of fail as well
|
|
# 2 script to delete chunks in fail status
|
|
# (and check for their encdings, and delete them as well, along with
|
|
# all chunks)
|
|
# 3 beat task, remove chunks
|