Initial commit
Some checks failed
Build and Publish Docker Images / build (server.Dockerfile, ${{ vars.IMAGE_NAME_SERVER }}) (push) Has been cancelled
Build and Publish Docker Images / build (server.Dockerfile, ${{ vars.IMAGE_NAME_WORKER }}) (push) Has been cancelled
Build and Publish Docker Images / build (worker.Dockerfile, ${{ vars.IMAGE_NAME_SERVER }}) (push) Has been cancelled
Build and Publish Docker Images / build (worker.Dockerfile, ${{ vars.IMAGE_NAME_WORKER }}) (push) Has been cancelled
Build and Publish Docker Images / setup (push) Has been cancelled

This commit is contained in:
Ben Martin 2025-05-10 13:10:49 +01:00
commit 68f6e85c78
Signed by: ben
GPG key ID: 859A655FCD290E4A
17 changed files with 1286 additions and 0 deletions

View file

View file

@ -0,0 +1,7 @@
import os
worker_prefetch_multiplier = 1
worker_concurrency=1
broker_url=os.environ.get("REDIS_URL", "redis://localhost:6379/0")
result_backend=os.environ.get("REDIS_URL", "redis://localhost:6379/0")

View file

@ -0,0 +1,93 @@
from collections.abc import Awaitable, Callable
from dataclasses import asdict, dataclass
from json import JSONDecoder, JSONEncoder
from pathlib import Path
from typing import Any, Dict, Generator, List, cast
import asyncio
from magic import Magic
from redis.asyncio import Redis
from redis.asyncio.connection import ConnectionPool
class RedisManager:
def __init__(self, connection_url: str):
self.connection_pool = ConnectionPool.from_url(connection_url)
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.close()
async def close(self):
await cast(Awaitable, self.connection_pool.disconnect())
def get_client(self):
return Redis(connection_pool=self.connection_pool)
@dataclass
class MediaDTO:
inode: int
paths: list[Path]
is_transcoded: bool = False
def open(self):
return open(self.paths[0], "rb")
def size(self) -> int:
return self.paths[0].stat().st_size
class MediaDAO:
def __init__(self, redis_manager: RedisManager):
self.redis_client = redis_manager.get_client()
async def get_media_by_inode(self, inode: int) -> MediaDTO:
result = await cast(Awaitable[MediaDTO], self.redis_client.json(encoder=json_encoder, decoder=json_decoder).get(f"media_info:{inode}"))
if not result:
raise ValueError(f"Media with inode {inode} not found")
return result
async def get_all_inodes(self) -> list[int]:
keys = await self.redis_client.keys("media_info:*")
return [int(key.decode().split(":")[2]) for key in keys]
async def set_media(self, media: MediaDTO) -> None:
await cast(Awaitable[int], self.redis_client.json(encoder=json_encoder, decoder=json_decoder).set(f"media_info:{media.inode}", "$", asdict(media)))
async def batch_set_media(self, media_list: list[MediaDTO]) -> None:
async with self.redis_client.pipeline() as pipe:
for media in media_list:
await cast(Awaitable[int], pipe.json(encoder=json_encoder, decoder=json_decoder).set(f"media_info:{media.inode}", "$", asdict(media)))
await pipe.execute()
async def delete_media(self, media: MediaDTO) -> None:
await cast(Awaitable[int], self.redis_client.delete(f"media_info:{media.inode}"))
async def mark_as_transcoded(self, media: MediaDTO) -> None:
await cast(Awaitable[int], self.redis_client.json(encoder=json_encoder, decoder=json_decoder).set(f"media_info:{media.inode}", "$.is_transcoded", True))
async def is_transcoded(self, inode: int) -> bool:
return await cast(Awaitable[List[bool]], self.redis_client.json(encoder=json_encoder, decoder=json_decoder).get(f"media_info:{inode}", "$.is_transcoded")) == [True]
class JSONEncoderImpl(JSONEncoder):
def default(self, obj):
if isinstance(obj, Path):
return obj.as_uri()
return super().default(obj)
def object_hook(dict: Dict[str, Any]):
return MediaDTO(
paths=[Path.from_uri(v) for v in dict["paths"]],
inode=int(dict["inode"]),
is_transcoded=dict["is_transcoded"],
)
json_encoder = JSONEncoderImpl()
json_decoder = JSONDecoder(object_hook=object_hook)
def is_media_file(path: Path) -> bool:
return mime_detector.from_file(path).startswith("video/")
mime_detector = Magic(mime=True)

View file

@ -0,0 +1,54 @@
from argparse import ArgumentParser
from ast import Dict
from pathlib import Path
from typing import Generator
import asyncio
from auto_transcoder import tasks
from auto_transcoder.model import MediaDAO, MediaDTO, RedisManager
from auto_transcoder.web import run as run_web_app
def walk_directory(path: Path, bin: Path | None) -> Generator[Path, None, None]:
for root, _, files in path.walk():
if bin and root.is_relative_to(bin):
continue
for file_name in files:
yield Path(root, file_name)
async def main(directory_to_watch: Path, redis_connection_url: str, recycle_bin: Path | None = None):
print(f"Watching {directory_to_watch} for media files.")
print(f"Original files will be moved to recycle bin: {recycle_bin}.")
if not directory_to_watch.exists():
print(f"Directory {directory_to_watch} does not exist")
exit(1)
media_files_by_inode: dict[int, list[Path]] = {}
for path in walk_directory(directory_to_watch, recycle_bin):
inode = path.stat().st_ino
if inode not in media_files_by_inode:
media_files_by_inode[inode] = []
media_files_by_inode[inode].append(path)
async with RedisManager(redis_connection_url) as redis_manager:
media_dao = MediaDAO(redis_manager)
media_files_by_inode = {k: v for k, v in media_files_by_inode.items() if not await media_dao.is_transcoded(k)}
await media_dao.batch_set_media([MediaDTO(inode=i, paths=p, is_transcoded=False) for i, p in media_files_by_inode.items()])
for inode in media_files_by_inode.keys():
print(f"Sent transcode task for inode {inode}")
tasks.transcode_media_task.delay(inode, recycle_bin.as_uri() if recycle_bin else None)
run_web_app(redis_manager=redis_manager, host='0.0.0.0', port=5000, use_reloader=False)
if __name__ == '__main__':
parser = ArgumentParser(description='Auto Transcoder Server')
parser.add_argument('directory_to_watch', type=str, help='Directory to watch for media files')
parser.add_argument('--recycle-bin', type=str, default=None, help='Recycle bin directory for original files')
args = parser.parse_args()
recycle_bin = Path(args.recycle_bin) if args.recycle_bin else None
asyncio.run(main(Path(args.directory_to_watch), args.redis_connection_url, recycle_bin))

View file

@ -0,0 +1,58 @@
import os
from pathlib import Path
import shutil
import subprocess
import tempfile
from typing import List
from celery.utils.log import get_task_logger
from auto_transcoder.model import MediaDTO
logger = get_task_logger(__name__)
async def transcode_media(media: MediaDTO, recycle_bin: Path | None = None) -> List[MediaDTO] | None:
file_path = media.paths[0]
temp_path = Path(tempfile.mkstemp()[1])
full_command = ['ffmpeg', '-i', "-",
'-vf', 'scale=\'min(1920,iw)\':-1:flags=lanczos',
'-c:v', 'libsvtav1', '-crf', '30', '-preset', '6', '-g', '240', '-pix_fmt', 'yuv420p10le',
'-c:a', 'libopus', '-b:a', '128k', '-ac', '2',
'-c:s', 'webvtt',
'-map_chapters', '-1', '-map_metadata', '-1',
'-f', 'webm',
'-y',
temp_path.resolve()]
try:
subprocess.run(full_command, check=True, stdin=media.open())
except BaseException as e:
temp_path.unlink(missing_ok=True)
raise e
if temp_path.stat().st_size > media.size():
temp_path.unlink()
logger.warning(f"Transcoding did not reduce file size for {file_path}, keeping original")
return
paths_by_inode: dict[int, List[Path]] = {}
for file_path in media.paths:
media_directory = file_path.parent
bin(file_path, recycle_bin)
new_media = shutil.move(temp_path, media_directory.joinpath(os.path.splitext(file_path.name)[0] + ".webm"))
inode = new_media.stat().st_ino
if inode in paths_by_inode:
paths_by_inode[inode].append(new_media)
else:
paths_by_inode[inode] = [new_media]
logger.info(f"Transcoded {media.paths} to {paths_by_inode.values()}")
return [MediaDTO(inode, paths, True) for inode, paths in paths_by_inode.items()]
def bin(file: Path, recycle_bin: Path | None):
if recycle_bin:
recycle_bin.mkdir(parents=True, exist_ok=True)
shutil.move(file, recycle_bin.joinpath(file.name))
else:
file.unlink()

View file

@ -0,0 +1,49 @@
import os
from pathlib import Path
import asyncio
from celery import Celery
from celery.signals import worker_init, worker_shutdown
from celery.utils.log import get_task_logger
from auto_transcoder.model import MediaDAO, RedisManager
from auto_transcoder.services import transcode_media
redis_manager: RedisManager | None = None
logger = get_task_logger(__name__)
celery_app = Celery('auto_transcoder', config_source='auto_transcoder.celeryconfig')
@worker_init.connect
def setup_worker(**kwargs):
global redis_manager
redis_manager = RedisManager(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
@worker_shutdown.connect
def teardown_worker(**kwargs):
asyncio.run(__teardown_worker())
async def __teardown_worker():
global redis_manager
if redis_manager:
await redis_manager.close()
redis_manager = None
@celery_app.task(ignore_result=True)
def transcode_media_task(inode: int, recycle_bin_path: str | None = None):
async def process(inode: int, recycle_bin_path: str | None = None):
global redis_manager
if not redis_manager:
raise RuntimeError("RedisManager is not initialized")
async with redis_manager as manager:
media_dao = MediaDAO(manager)
media_dto = await media_dao.get_media_by_inode(inode)
if not media_dto.is_transcoded:
logger.info(f"Transcoding media with inode {inode}")
new_medias = await transcode_media(media_dto, Path.from_uri(recycle_bin_path) if recycle_bin_path else None)
if new_medias:
await asyncio.gather(
media_dao.batch_set_media(new_medias),
media_dao.delete_media(media_dto),
)
asyncio.run(process(inode, recycle_bin_path))

View file

@ -0,0 +1,28 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Auto Transcoder</title>
<script defer src="https://cdn.jsdelivr.net/npm/alpinejs@3.x.x/dist/cdn.min.js"></script>
<script>
async function fetchWorkers() {
const response = await fetch('/api/workers');
const data = await response.json();
return data;
}
</script>
</head>
<body>
<h1>Welcome to Auto Transcoder</h1>
<h2>Workers</h2>
<ul x-data="{ workers: {} }" x-init="workers = await fetchWorkers()">
<template x-for="id in Object.keys(workers)" :key="id">
<li x-text="id"></li>
</template>
</ul>
</body>
</html>

View file

@ -0,0 +1,27 @@
from flask import Flask, jsonify, render_template
from celery.app.control import Inspect
from auto_transcoder.model import MediaDAO, RedisManager
from auto_transcoder.tasks import celery_app as celery_app
def run(redis_manager: RedisManager, *args, **kwargs):
app = Flask(__name__)
@app.route('/')
def index():
return render_template('index.html')
@app.route('/api/workers')
def queues():
i = inspect()
workers = i.stats()
return jsonify(workers)
@app.route('/api/inodes')
async def inodes():
return await MediaDAO(redis_manager=redis_manager).get_all_inodes()
app.run(*args, **kwargs)
def inspect() -> Inspect:
return celery_app.control.inspect()