from concurrent.futures import ThreadPoolExecutor, as_completed
from bs4 import BeautifulSoup
# Function to convert time format (MM:SS) to seconds
def time_to_seconds(time_str):
minutes, seconds = map(int, time_str.split(':'))
return minutes * 60 + seconds
# Parse the markdown file and extract the table associated with the video file
def parse_markdown_for_video_table(markdown_file, video_file_name):
with open(markdown_file, 'r') as f:
markdown_content = f.read()
# Use BeautifulSoup to parse the markdown tables
html = markdown.markdown(markdown_content, extensions=['tables'])
soup = BeautifulSoup(html, 'html.parser')
headings = soup.find_all(re.compile('^h[1-6]$'))
print("Headings found in the markdown file:")
print(heading.get_text())
# Find the heading that matches the video base name
video_base_name = os.path.splitext(os.path.basename(video_file_name))[0]
if heading.get_text().strip() == video_base_name:
raise ValueError(f"No heading matching '{video_base_name}' found in the markdown file.")
# The table should be the next sibling after the heading
table = target_heading.find_next_sibling('table')
raise ValueError(f"No table found under heading '{video_base_name}' in the markdown file.")
# Convert the HTML table to a pandas DataFrame
df = pd.read_html(str(table))[0]
# Function to sanitize file names
def sanitize_filename(name):
return re.sub(r'\W+', '_', name)
# Function to split a single clip using FFmpeg
def split_clip_ffmpeg(video_file, start_time, end_time, output_filepath):
'-y', # Overwrite output files without asking
subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print(f"Created {output_filepath}")
except subprocess.CalledProcessError as e:
print(f"Error creating {output_filepath}: {e.stderr.decode()}")
# Function to split the video based on the extracted table
def split_video(video_file, markdown_file, output_dir, max_workers=4):
# Parse the markdown file to extract the correct table based on the video file name
table = parse_markdown_for_video_table(markdown_file, video_file)
# Create output directory if it doesn't exist
os.makedirs(output_dir, exist_ok=True)
original_filename = os.path.splitext(os.path.basename(video_file))[0]
for _, row in table.iterrows():
part_no = str(row['Id']).zfill(2) # Pad part number with leading zero
part_name = sanitize_filename(row['Part Name']) # Sanitize file name
start_time = time_to_seconds(row['Start'])
end_time = time_to_seconds(row['End'])
output_filename = f"{original_filename}_Part{part_no}_{part_name}.mp4"
output_filepath = os.path.join(output_dir, output_filename)
split_tasks.append((start_time, end_time, output_filepath))
# Use ThreadPoolExecutor to split clips in parallel
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.submit(split_clip_ffmpeg, video_file, start, end, filepath)
for start, end, filepath in split_tasks
for future in as_completed(futures):
future.result() # To catch exceptions if any
markdown_file = "video_database.md" # Path to your markdown file
video_file = os.path.expanduser("//wsl.localhost/Ubuntu/home/ulc/monolith/act_03/completed/ACT_03-WIL_02_LEFT-EDIT.mp4") # Path to your video file
output_dir = os.path.expanduser("//wsl.localhost/Ubuntu/home/ulc/monolith/act_03/ACT_03-WIL_02_LEFT") # Directory to save the split videos
# Run the optimized script
if __name__ == "__main__":
split_video(video_file, markdown_file, output_dir, max_workers=8)