Talk to Videos

Developing an interactive AI application for video-based learning in education and business The post Talk to Videos appeared first on Towards Data Science.

Mar 27, 2025 - 21:15
 0
Talk to Videos

Large language models (LLMs) are improving in efficiency and are now able to understand different data formats, offering possibilities for myriads of applications in different domains. Initially, LLMs were inherently able to process only text. The image understanding feature was integrated by coupling an LLM with another image encoding model. However, gpt-4o was trained on both text and images and is the first true multimodal LLM that can understand both text and images. Other modalities such as audio are integrated into modern LLMs through other AI models, e.g., OpenAI’s Whisper models.

LLMs are now being used more as information processors where they can process data in different formats. Integrating multiple modalities into LLMs opens areas of numerous applications in education, Business, and other sectors. One such application is the processing of educational videos, documentaries, webinars, presentations, business meetings, lectures, and other content using LLMs and interacting with this content more naturally. The audio modality in these videos contains rich information that could be used in a number of applications. In educational settings, it can be used for personalized learning, enhancing accessibility of students with special needs, study aid creation, remote learning support without requiring a teacher’s presence to understand content, and assessing students’ knowledge about a topic. In business settings, it can be used for training new employees with onboarding videos, extracting and generating knowledge from recording meetings and presentations, customized learning materials from product demonstration videos, and extracting insights from recorded industry conferences without watching hours of videos, to name a few.

This article discusses the development of an application to interact with videos in a natural way and create learning content from them. The application has the following features:

  • It takes an input video either through a URL or from a local path and extracts audio from the video
  • Transcribes the audio using OpenAI’s state-of-the-art model gpt-4o-transcribe, which has demonstrated improved Word Error Rate (WER) performance over existing Whisper models across multiple established benchmarks
  • Creates a vector store of the transcript and develops a retrieval augment generation (RAG) to establish a conversation with the video transcript
  • Respond to users’ questions in text and speech using different voices, selectable from the application’s UI.
  • Creates learning content such as:
    • Hierarchical representation of the video contents to provide users with quick insights into the main concepts and supporting details
    • Generate quizzes to transform passive video watching into active learning by challenging users to recall and apply information presented in the video.
    • Generates flashcards from the video content that support active recall and spaced repetition learning techniques

The entire workflow of the application is shown in the following figure.

Application workflow (image by author)

The whole codebase, along with detailed instructions for installation and usage, is available on GitHub.

Here is the structure of the GitHub repository. The main Streamlit application implements the GUI interface and calls several other functions from other feature and helper modules (.py files).

GitHub code structure (image by author)

In addition, you can visualize the codebase by opening the “codebase visualization” HTML file in a browser, which describes the structures of each module.

Codebase visualization (image by author)

Let’s delve into the step-by-step development of this application. I will not discuss the entire code, but only its major part. The whole code in the GitHub repository is adequately commented.

Video Input and Processing

Video input and processing logic are implemented in transcriber.py. When the application loads, it verifies whether FFMPEG is present (verify_ffmpeg) in the application’s root directory. FFMPEG is required for downloading a video (if the input is a URL) and extracting audio from the video which is then used to create a transcript.

def verify_ffmpeg():
    """Verify that FFmpeg is available and print its location."""
    # Add FFmpeg to PATH
    os.environ['PATH'] = FFMPEG_LOCATION + os.pathsep + os.environ['PATH']
    # Check if FFmpeg binaries exist
    ffmpeg_path = os.path.join(FFMPEG_LOCATION, 'ffmpeg.exe')
    ffprobe_path = os.path.join(FFMPEG_LOCATION, 'ffprobe.exe')
    if not os.path.exists(ffmpeg_path):
        raise FileNotFoundError(f"FFmpeg executable not found at: {ffmpeg_path}")
    if not os.path.exists(ffprobe_path):
        raise FileNotFoundError(f"FFprobe executable not found at: {ffprobe_path}")
    print(f"FFmpeg found at: {ffmpeg_path}")
    print(f"FFprobe found at: {ffprobe_path}")
    # Try to execute FFmpeg to make sure it works
    try:
        # Add shell=True for Windows and capture errors properly
        result = subprocess.run([ffmpeg_path, '-version'], 
                               stdout=subprocess.PIPE, 
                               stderr=subprocess.PIPE,
                               shell=True,  # This can help with permission issues on Windows
                               check=False)
        if result.returncode == 0:
            print(f"FFmpeg version: {result.stdout.decode().splitlines()[0]}")
        else:
            error_msg = result.stderr.decode()
            print(f"FFmpeg error: {error_msg}")
            # Check for specific permission errors
            if "Access is denied" in error_msg:
                print("Permission error detected. Trying alternative approach...")
                # Try an alternative approach - just check file existence without execution
                if os.path.exists(ffmpeg_path) and os.path.exists(ffprobe_path):
                    print("FFmpeg files exist but execution test failed due to permissions.")
                    print("WARNING: The app may fail when trying to process videos.")
                    # Return paths anyway and hope for the best when actually used
                    return ffmpeg_path, ffprobe_path
                
            raise RuntimeError(f"FFmpeg execution failed: {error_msg}")
    except Exception as e:
        print(f"Error checking FFmpeg: {e}")
        # Fallback option if verification fails but files exist
        if os.path.exists(ffmpeg_path) and os.path.exists(ffprobe_path):
            print("WARNING: FFmpeg files exist but verification failed.")
            print("Attempting to continue anyway, but video processing may fail.")
            return ffmpeg_path, ffprobe_path 
        raise
    return ffmpeg_path, ffprobe_path

The video input is in the form of a URL (for instance, YouTube URL) or a local file path. The process_video function determines the input type and routes it accordingly. If the input is a URL, the helper functions get_video_info and get_video_id extract video metadata (title, description, duration) without downloading it using yt_dlp package.

#Function to determine the input type and route it appropriately
def process_video(youtube_url, output_dir, api_key, model="gpt-4o-transcribe"):
    """
    Process a YouTube video to generate a transcript
    Wrapper function that combines download and transcription
    Args:
        youtube_url: URL of the YouTube video
        output_dir: Directory to save the output
        api_key: OpenAI API key
        model: The model to use for transcription (default: gpt-4o-transcribe)
    Returns:
        dict: Dictionary containing transcript and file paths
    """
    # First download the audio
    print("Downloading video...")
    audio_path = process_video_download(youtube_url, output_dir)
    
    print("Transcribing video...")
    # Then transcribe the audio
    transcript, transcript_path = process_video_transcribe(audio_path, output_dir, api_key, model=model)
    
    # Return the combined results
    return {
        'transcript': transcript,
        'transcript_path': transcript_path,
        'audio_path': audio_path
    }

def get_video_info(youtube_url):
    """Get video information without downloading."""
    # Check local cache first
    global _video_info_cache
    if youtube_url in _video_info_cache:
        return _video_info_cache[youtube_url]
        
    # Extract info if not cached
    with yt_dlp.YoutubeDL() as ydl:
        info = ydl.extract_info(youtube_url, download=False)
        # Cache the result
        _video_info_cache[youtube_url] = info
        # Also cache the video ID separately
        _video_id_cache[youtube_url] = info.get('id', 'video')
        return info

def get_video_id(youtube_url):
    """Get just the video ID without re-extracting if already known."""
    global _video_id_cache
    if youtube_url in _video_id_cache:
        return _video_id_cache[youtube_url]
    
    # If not in cache, extract from URL directly if possible
    if "v=" in youtube_url:
        video_id = youtube_url.split("v=")[1].split("&")[0]
        _video_id_cache[youtube_url] = video_id
        return video_id
    elif "youtu.be/" in youtube_url:
        video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
        _video_id_cache[youtube_url] = video_id
        return video_id
    
    # If we can't extract directly, fall back to full info extraction
    info = get_video_info(youtube_url)
    video_id = info.get('id', 'video')
    return video_id

After the video input is given, the code in app.py checks whether a transcript for the input video already exists (in the case of URL input). This is done by calling the following two helper functions from transcriber.py.

def get_transcript_path(youtube_url, output_dir):
    """Get the expected transcript path for a given YouTube URL."""
    # Get video ID with caching
    video_id = get_video_id(youtube_url)
    # Return expected transcript path
    return os.path.join(output_dir, f"{video_id}_transcript.txt")

def transcript_exists(youtube_url, output_dir):
    """Check if a transcript already exists for this video."""
    transcript_path = get_transcript_path(youtube_url, output_dir)
    return os.path.exists(transcript_path)

If transcript_exists returns the path of an existing transcript, the next step is to create the vector store for the RAG. If no existing transcript is found, the next step is to download audio from the URL and convert it to a standard audio format. The function process_video_download downloads audio from the URL using the FFMPEG library and converts it to .mp3 format. If the input is a local video file, app.py proceeds to convert it to .mp3 file.

def process_video_download(youtube_url, output_dir):
    """
    Download audio from a YouTube video
    Args:
        youtube_url: URL of the YouTube video
        output_dir: Directory to save the output
        
    Returns:
        str: Path to the downloaded audio file
    """
    # Create output directory if it doesn't exist
    os.makedirs(output_dir, exist_ok=True)
    
    # Extract video ID from URL
    video_id = None
    if "v=" in youtube_url:
        video_id = youtube_url.split("v=")[1].split("&")[0]
    elif "youtu.be/" in youtube_url:
        video_id = youtube_url.split("youtu.be/")[1].split("?")[0]
    else:
        raise ValueError("Could not extract video ID from URL")
    # Set output paths
    audio_path = os.path.join(output_dir, f"{video_id}.mp3")
    
    # Configure yt-dlp options
    ydl_opts = {
        'format': 'bestaudio/best',
        'postprocessors': [{
            'key': 'FFmpegExtractAudio',
            'preferredcodec': 'mp3',
            'preferredquality': '192',
        }],
        'outtmpl': os.path.join(output_dir, f"{video_id}"),
        'quiet': True
    }
    
    # Download audio
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([youtube_url])
    
    # Verify audio file exists
    if not os.path.exists(audio_path):
        # Try with an extension that yt-dlp might have used
        potential_paths = [
            os.path.join(output_dir, f"{video_id}.mp3"),
            os.path.join(output_dir, f"{video_id}.m4a"),
            os.path.join(output_dir, f"{video_id}.webm")
        ]
        
        for path in potential_paths:
            if os.path.exists(path):
                # Convert to mp3 if it's not already
                if not path.endswith('.mp3'):
                    ffmpeg_path = verify_ffmpeg()[0]
                    output_mp3 = os.path.join(output_dir, f"{video_id}.mp3")
                    subprocess.run([
                        ffmpeg_path, '-i', path, '-c:a', 'libmp3lame', 
                        '-q:a', '2', output_mp3, '-y'
                    ], check=True, capture_output=True)
                    os.remove(path)  # Remove the original file
                    audio_path = output_mp3
                else:
                    audio_path = path
                break
        else:
            raise FileNotFoundError(f"Could not find downloaded audio file for video {video_id}")
    return audio_path

Audio Transcription Using OpenAI’s gpt-4o-transcribe Model

After extracting audio and converting it to a standard audio format, the next step is to transcribe the audio to text format. For this purpose, I used OpenAI’s newly launched gpt-4o-transcribe speech-to-text model accessible through speech-to-text API.  This model has outperformed OpenAI’s Whisper models in terms of both transcription accuracy and robust language coverage.

The function process_video_transcribe in transcriber.py receives the converted audio file and interfaces with gpt-4o-transcribe model with OpenAI’s speech-to-text API. The gpt-4o-transcribe model currently has an audio file limit of 25MB and 1500 duration. To overcome this limitation, I split the longer files into multiple chunks and transcribe these chunks separately. The process_video_transcribe function checks whether the input file exceeds the size and/or duration limit. If either threshold is exceeded, it calls split_and_transcribe function, which first calculates the number of chunks needed based on both size and duration and takes the maximum of these two as the final number of chunks for transcription. It then finds the start and end times for each chunk and extracts these chunks from the audio file. Subsequently, it transcribes each chunk using gpt-4o-transcribe model with OpenAI’s speech-to-text API and then combines transcripts of all chunks to generate the final transcript.

def process_video_transcribe(audio_path, output_dir, api_key, progress_callback=None, model="gpt-4o-transcribe"):
    """
    Transcribe an audio file using OpenAI API, with automatic chunking for large files
    Always uses the selected model, with no fallback
    
    Args:
        audio_path: Path to the audio file
        output_dir: Directory to save the transcript
        api_key: OpenAI API key
        progress_callback: Function to call with progress updates (0-100)
        model: The model to use for transcription (default: gpt-4o-transcribe)
        
    Returns:
        tuple: (transcript text, transcript path)
    """
    # Extract video ID from audio path
    video_id = os.path.basename(audio_path).split('.')[0]
    transcript_path = os.path.join(output_dir, f"{video_id}_transcript.txt")
    
    # Setup OpenAI client
    client = OpenAI(api_key=api_key)
    
    # Update progress
    if progress_callback:
        progress_callback(10)
    
    # Get file size in MB
    file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
    
    # Universal chunking thresholds - apply to both models
    max_size_mb = 25  # 25MB chunk size for both models
    max_duration_seconds = 1500  # 1500 seconds chunk duration for both models
    
    # Load the audio file to get its duration
    try:
        audio = AudioSegment.from_file(audio_path)
        duration_seconds = len(audio) / 1000  # pydub uses milliseconds
    except Exception as e:
        print(f"Error loading audio to check duration: {e}")
        audio = None
        duration_seconds = 0
    
    # Determine if chunking is needed
    needs_chunking = False
    chunking_reason = []
    
    if file_size_mb > max_size_mb:
        needs_chunking = True
        chunking_reason.append(f"size ({file_size_mb:.2f}MB exceeds {max_size_mb}MB)")
    
    if duration_seconds > max_duration_seconds:
        needs_chunking = True
        chunking_reason.append(f"duration ({duration_seconds:.2f}s exceeds {max_duration_seconds}s)")
    
    # Log the decision
    if needs_chunking:
        reason_str = " and ".join(chunking_reason)
        print(f"Audio needs chunking due to {reason_str}. Using {model} for transcription.")
    else:
        print(f"Audio file is within limits. Using {model} for direct transcription.")
    
    # Check if file needs chunking
    if needs_chunking:
        if progress_callback:
            progress_callback(15)
        
        # Split the audio file into chunks and transcribe each chunk using the selected model only
        full_transcript = split_and_transcribe(
            audio_path, client, model, progress_callback, 
            max_size_mb, max_duration_seconds, audio
        )
    else:
        # File is small enough, transcribe directly with the selected model
        with open(audio_path, "rb") as audio_file:
            if progress_callback:
                progress_callback(30)
                
            transcript_response = client.audio.transcriptions.create(
                model=model, 
                file=audio_file
            )
            
            if progress_callback:
                progress_callback(80)
            
            full_transcript = transcript_response.text
    
    # Save transcript to file
    with open(transcript_path, "w", encoding="utf-8") as f:
        f.write(full_transcript)
    
    # Update progress
    if progress_callback:
        progress_callback(100)
    
    return full_transcript, transcript_path

def split_and_transcribe(audio_path, client, model, progress_callback=None, 
                         max_size_mb=25, max_duration_seconds=1500, audio=None):
    """
    Split an audio file into chunks and transcribe each chunk 
    
    Args:
        audio_path: Path to the audio file
        client: OpenAI client
        model: Model to use for transcription (will not fall back to other models)
        progress_callback: Function to call with progress updates
        max_size_mb: Maximum file size in MB
        max_duration_seconds: Maximum duration in seconds
        audio: Pre-loaded AudioSegment (optional)
        
    Returns:
        str: Combined transcript from all chunks
    """
    # Load the audio file if not provided
    if audio is None:
        audio = AudioSegment.from_file(audio_path)
    
    # Get audio duration in seconds
    duration_seconds = len(audio) / 1000
    
    # Calculate the number of chunks needed based on both size and duration
    file_size_mb = os.path.getsize(audio_path) / (1024 * 1024)
    
    chunks_by_size = math.ceil(file_size_mb / (max_size_mb * 0.9))  # Use 90% of max to be safe
    chunks_by_duration = math.ceil(duration_seconds / (max_duration_seconds * 0.95))  # Use 95% of max to be safe
    num_chunks = max(chunks_by_size, chunks_by_duration)
    
    print(f"Splitting audio into {num_chunks} chunks based on size ({chunks_by_size}) and duration ({chunks_by_duration})")
    
    # Calculate chunk duration in milliseconds
    chunk_length_ms = len(audio) // num_chunks
    
    # Create temp directory for chunks if it doesn't exist
    temp_dir = os.path.join(os.path.dirname(audio_path), "temp_chunks")
    os.makedirs(temp_dir, exist_ok=True)
    
    # Split the audio into chunks and transcribe each chunk
    transcripts = []
    
    for i in range(num_chunks):
        if progress_callback:
            # Update progress: 20% for splitting, 60% for transcribing
            progress_percent = 20 + int((i / num_chunks) * 60)
            progress_callback(progress_percent)
        
        # Calculate start and end times for this chunk
        start_ms = i * chunk_length_ms
        end_ms = min((i + 1) * chunk_length_ms, len(audio))
        
        # Extract the chunk
        chunk = audio[start_ms:end_ms]
        
        # Save the chunk to a temporary file
        chunk_path = os.path.join(temp_dir, f"chunk_{i}.mp3")
        chunk.export(chunk_path, format="mp3")
        
        # Log chunk information
        chunk_size_mb = os.path.getsize(chunk_path) / (1024 * 1024)
        chunk_duration = len(chunk) / 1000
        print(f"Chunk {i+1}/{num_chunks}: {chunk_size_mb:.2f}MB, {chunk_duration:.2f}s")
        
        # Transcribe the chunk 
        try:
            with open(chunk_path, "rb") as chunk_file:
                transcript_response = client.audio.transcriptions.create(
                    model=model,
                    file=chunk_file
                )
                
                # Add to our list of transcripts
                transcripts.append(transcript_response.text)
        except Exception as e:
            print(f"Error transcribing chunk {i+1} with {model}: {e}")
            # Add a placeholder for the failed chunk
            transcripts.append(f"[Transcription failed for segment {i+1}]")
        
        # Clean up the temporary chunk file
        os.remove(chunk_path)
    
    # Clean up the temporary directory
    try:
        os.rmdir(temp_dir)
    except:
        print(f"Note: Could not remove temporary directory {temp_dir}")
    
    # Combine all transcripts with proper spacing
    full_transcript = " ".join(transcripts)
    
    return full_transcript

The following screenshot of the Streamlit app shows the video processing and transcribing workflow for one of my webinars, Integrating LLMs into Business,” available on my YouTube channel.

Snapshot of the Streamlit app showing the process of extracting audio and transcribing (image by author)

Retrieval Augmented Generation (RAG) for Interactive Conversations

After generating the video transcript, the application develops a RAG to facilitate both text and speech-based interactions. The conversational intelligence is implemented through VideoRAG class in rag_system.py which initializes chunk size and overlap, OpenAI embeddings, ChatOpenAI instance to generate responses with gpt-4o model, and ConversationBufferMemory to maintain chat history for contextual continuity.

The create_vector_store method splits the documents into chunks and creates a vector store using the FAISS vector database. The handle_question_submission method processes text questions and appends each new question and its answer to the conversation history. The handle_speech_input function implements the complete voice-to-text-to-voice pipeline. It first records the question audio, transcribes the question, processes the query through the RAG system, and synthesizes speech for the response.

class VideoRAG:
    def __init__(self, api_key=None, chunk_size=1000, chunk_overlap=200):
        """Initialize the RAG system with OpenAI API key."""
        # Use provided API key or try to get from environment
        self.api_key = api_key if api_key else st.secrets["OPENAI_API_KEY"]
        if not self.api_key:
            raise ValueError("OpenAI API key is required either as parameter or environment variable")
            
        self.embeddings = OpenAIEmbeddings(openai_api_key=self.api_key)
        self.llm = ChatOpenAI(
            openai_api_key=self.api_key,
            model="gpt-4o",
            temperature=0
        )
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.vector_store = None
        self.chain = None
        self.memory = ConversationBufferMemory(
            memory_key="chat_history",
            return_messages=True
        )
    
    def create_vector_store(self, transcript):
        """Create a vector store from the transcript."""
        # Split the text into chunks
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=self.chunk_size,
            chunk_overlap=self.chunk_overlap,
            separators=["nn", "n", " ", ""]
        )
        chunks = text_splitter.split_text(transcript)
        
        # Create vector store
        self.vector_store = FAISS.from_texts(chunks, self.embeddings)
        
        # Create prompt template for the RAG system
        system_template = """You are a specialized AI assistant that answers questions about a specific video. 
        
        You have access to snippets from the video transcript, and your role is to provide accurate information ONLY based on these snippets.
        
        Guidelines:
        1. Only answer questions based on the information provided in the context from the video transcript, otherwise say that "I don't know. The video doesn't cover that information."
        2. The question may ask you to summarize the video or tell what the video is about. In that case, present a summary of the context. 
        3. Don't make up information or use knowledge from outside the provided context
        4. Keep your answers concise and directly related to the question
        5. If asked about your capabilities or identity, explain that you're an AI assistant that specializes in answering questions about this specific video
        
        Context from the video transcript:
        {context}
        
        Chat History:
        {chat_history}
        """
        user_template = "{question}"
        
        # Create the messages for the chat prompt
        messages = [
            SystemMessagePromptTemplate.from_template(system_template),
            HumanMessagePromptTemplate.from_template(user_template)
        ]
        
        # Create the chat prompt
        qa_prompt = ChatPromptTemplate.from_messages(messages)
        
        # Initialize the RAG chain with the custom prompt
        self.chain = ConversationalRetrievalChain.from_llm(
            llm=self.llm,
            retriever=self.vector_store.as_retriever(
                search_kwargs={"k": 5}
            ),
            memory=self.memory,
            combine_docs_chain_kwargs={"prompt": qa_prompt},
            verbose=True
        )
        
        return len(chunks)
    
    def set_chat_history(self, chat_history):
        """Set chat history from external session state."""
        if not self.memory:
            return
            
        # Clear existing memory
        self.memory.clear()
        
        # Convert standard chat history format to LangChain message format
        for message in chat_history:
            if message["role"] == "user":
                self.memory.chat_memory.add_user_message(message["content"])
            elif message["role"] == "assistant":
                self.memory.chat_memory.add_ai_message(message["content"])
    
    def ask(self, question, chat_history=None):
        """Ask a question to the RAG system."""
        if not self.chain:
            raise ValueError("Vector store not initialized. Call create_vector_store first.")
        
        # If chat history is provided, update the memory
        if chat_history:
            self.set_chat_history(chat_history)
        
        # Get response
        response = self.chain.invoke({"question": question})
        return response["answer"]

See the following snapshot of the Streamlit app, showing the interactive conversation interface with the video.

Snapshot showing conversational interface and interactive learning content (image by author)

The following snapshot shows a conversation with the video with speech input and text+speech output.

Conversation with video (image by author)

Feature Generation

The application generates three features: hierarchical summary, quiz, and flashcards. Please refer to their respective commented codes in the GitHub repo.

The SummaryGenerator class in summary.py provides structured content summarization by creating a hierarchical representation of the video content to provide users with quick insights into the main concepts and supporting details. The system retrieves key contextual segments from the transcript using RAG. Using a prompt (see generate_summary), it creates a hierarchical summary with three levels: main points, sub-points, and additional details. The create_summary_popup_html method transforms the generated summary into an interactive visual representation using CSS and JavaScript.

# summary.py
class SummaryGenerator:
    def __init__(self):
        pass
    
    def generate_summary(self, rag_system, api_key, model="gpt-4o", temperature=0.2):
        """
        Generate a hierarchical bullet-point summary from the video transcript
        
        Args:
            rag_system: The RAG system with vector store
            api_key: OpenAI API key
            model: Model to use for summary generation
            temperature: Creativity level (0.0-1.0)
            
        Returns:
            str: Hierarchical bullet-point summary text
        """
        if not rag_system:
            st.error("Please transcribe the video first before creating a summary!")
            return ""
        
        with st.spinner("Generating hierarchical summary..."):
            # Create LLM for summary generation
            summary_llm = ChatOpenAI(
                openai_api_key=api_key,
                model=model,
                temperature=temperature  # Lower temperature for more factual summaries
            )
            
            # Use the RAG system to get relevant context
            try:
                # Get broader context since we're summarizing the whole video
                relevant_docs = rag_system.vector_store.similarity_search(
                    "summarize the main points of this video", k=10
                )
                context = "nn".join([doc.page_content for doc in relevant_docs])
                
                prompt = """Based on the video transcript, create a hierarchical bullet-point summary of the content.
                Structure your summary with exactly these levels:
                
                • Main points (use • or * at the start of the line for these top-level points)
                  - Sub-points (use - at the start of the line for these second-level details)
                    * Additional details (use spaces followed by * for third-level points)
                
                For example:
                • First main point
                  - Important detail about the first point
                  - Another important detail
                    * A specific example
                    * Another specific example
                • Second main point
                  - Detail about second point
                
                Be consistent with the exact formatting shown above. Each bullet level must start with the exact character shown (• or *, -, and spaces+*).
                Create 3-5 main points with 2-4 sub-points each, and add third-level details where appropriate.
                Focus on the most important information from the video.
                """
                
                # Use the LLM with context to generate the summary
                messages = [
                    {"role": "system", "content": f"You are given the following context from a video transcript:nn{context}nnUse this context to create a hierarchical summary according to the instructions."},
                    {"role": "user", "content": prompt}
                ]
                
                response = summary_llm.invoke(messages)
                return response.content
            except Exception as e:
                # Fallback to the regular RAG system if there's an error
                st.warning(f"Using standard summary generation due to error: {str(e)}")
                return rag_system.ask(prompt)
    
    def create_summary_popup_html(self, summary_content):
        """
        Create HTML for the summary popup with properly formatted hierarchical bullets
        
        Args:
            summary_content: Raw summary text with markdown bullet formatting
            
        Returns:
            str: HTML for the popup with properly formatted bullets
        """
        # Instead of relying on markdown conversion, let's manually parse and format the bullet points
        lines = summary_content.strip().split('n')
        formatted_html = []
        
        in_list = False
        list_level = 0
        
        for line in lines:
            line = line.strip()
            
            # Skip empty lines
            if not line:
                continue
                
            # Detect if this is a markdown header
            if line.startswith('# '):
                if in_list:
                    # Close any open lists
                    for _ in range(list_level):
                        formatted_html.append('')
                    in_list = False
                    list_level = 0
                formatted_html.append(f'

{line[2:]}

') continue # Check line for bullet point markers if line.startswith('• ') or line.startswith('* '): # Top level bullet content = line[2:].strip() if not in_list: # Start a new list formatted_html.append('
    ') in_list = True list_level = 1 elif list_level > 1: # Close nested lists to get back to top level for _ in range(list_level - 1): formatted_html.append('
') list_level = 1 else: # Close previous list item if needed if formatted_html and not formatted_html[-1].endswith('') and in_list: formatted_html.append('') formatted_html.append(f'
  • {content}') elif line.startswith('- '): # Second level bullet content = line[2:].strip() if not in_list: # Start new lists formatted_html.append('
    • Second level items') formatted_html.append('
        ') in_list = True list_level = 2 elif list_level == 1: # Add a nested list formatted_html.append('
          ') list_level = 2 elif list_level > 2: # Close deeper nested lists to get to second level for _ in range(list_level - 2): formatted_html.append('
        ') list_level = 2 else: # Close previous list item if needed if formatted_html and not formatted_html[-1].endswith('
    • ') and list_level == 2: formatted_html.append('') formatted_html.append(f'
    • {content}') elif line.startswith(' * ') or line.startswith(' * '): # Third level bullet content = line.strip()[2:].strip() if not in_list: # Start new lists (all levels) formatted_html.append('
      • Top level') formatted_html.append('
        • Second level') formatted_html.append('
            ') in_list = True list_level = 3 elif list_level == 2: # Add a nested list formatted_html.append('
              ') list_level = 3 elif list_level < 3: # We missed a level, adjust formatted_html.append('
            • Missing level
            • ') formatted_html.append('
                ') list_level = 3 else: # Close previous list item if needed if formatted_html and not formatted_html[-1].endswith('
              ') and list_level == 3: formatted_html.append('') formatted_html.append(f'
            • {content}') else: # Regular paragraph if in_list: # Close any open lists for _ in range(list_level): formatted_html.append('
            ') if list_level > 1: formatted_html.append('') in_list = False list_level = 0 formatted_html.append(f'

            {line}') # Close any open lists if in_list: # Close final item formatted_html.append('') # Close any open lists for _ in range(list_level): if list_level > 1: formatted_html.append('

        • ') else: formatted_html.append('
        ') summary_html = 'n'.join(formatted_html) html = f""" """ return html
  • Heirarchical summary (image by author)

    Talk-to-Videos app generates quizzes from the video through the QuizGenerator class in quiz.py. The quiz generator creates multiple-choice questions targeting specific facts and concepts presented in the video. Unlike RAG, where I use a zero temperature, I increased the LLM temperature to 0.4 to encourage some creativity in quiz generation. A structured prompt guides the quiz generation process. The parse_quiz_response method extracts and validates the generated quiz elements to make sure that each question has all the required components. To prevent the users from recognizing the pattern and to promote real understanding, the quiz generator shuffles the answer options. Questions are presented one at a time, followed by immediate feedback on each answer. After completing all questions, the calculate_quiz_results method assesses user answers and the user is presented with an overall score, a visual breakdown of correct versus incorrect answers, and feedback on the performance level. In this way, the quiz generation functionality transforms passive video watching into active learning by challenging users to recall and apply information presented in the video.

    # quiz.py
    class QuizGenerator:
        def __init__(self):
            pass
        
        def generate_quiz(self, rag_system, api_key, transcript=None, model="gpt-4o", temperature=0.4):
            """
            Generate quiz questions based on the video transcript
            
            Args:
                rag_system: The RAG system with vector store2
                api_key: OpenAI API key
                transcript: The full transcript text (optional)
                model: Model to use for question generation
                temperature: Creativity level (0.0-1.0)
                
            Returns:
                list: List of question objects
            """
            if not rag_system:
                st.error("Please transcribe the video first before creating a quiz!")
                return []
            
            # Create a temporary LLM with slightly higher temperature for more creative questions
            creative_llm = ChatOpenAI(
                openai_api_key=api_key,
                model=model,
                temperature=temperature
            )
    
            num_questions = 10
            
            # Prompt to generate quiz
            prompt = f"""Based on the video transcript, generate {num_questions} multiple-choice questions to test understanding of the content.
            For each question:
            1. The question should be specific to information mentioned in the video
            2. Include 4 options (A, B, C, D)
            3. Clearly indicate the correct answer
            
            Format your response exactly as follows for each question:
            QUESTION: [question text]
            A: [option A]
            B: [option B]
            C: [option C]
            D: [option D]
            CORRECT: [letter of correct answer]
           
            Make sure all questions are based on facts from the video."""
            
            try:
                if transcript:
                    # If we have the full transcript, use it
                    messages = [
                        {"role": "system", "content": f"You are given the following transcript from a video:nn{transcript}nnUse this transcript to create quiz questions according to the instructions."},
                        {"role": "user", "content": prompt}
                    ]
                    
                    response = creative_llm.invoke(messages)
                    response_text = response.content
                else:
                    # Fallback to RAG approach if no transcript is provided
                    relevant_docs = rag_system.vector_store.similarity_search(
                        "what are the main topics covered in this video?", k=5
                    )
                    context = "nn".join([doc.page_content for doc in relevant_docs])
                    
                    # Use the creative LLM with context to generate questions
                    messages = [
                        {"role": "system", "content": f"You are given the following context from a video transcript:nn{context}nnUse this context to create quiz questions according to the instructions."},
                        {"role": "user", "content": prompt}
                    ]
                    
                    response = creative_llm.invoke(messages)
                    response_text = response.content
            except Exception as e:
                # Fallback to the regular RAG system if there's an error
                st.warning(f"Using standard question generation due to error: {str(e)}")
                response_text = rag_system.ask(prompt)
            
            return self.parse_quiz_response(response_text)
    
        # The rest of the class remains unchanged
        def parse_quiz_response(self, response_text):
            """
            Parse the LLM response to extract questions, options, and correct answers
            
            Args:
                response_text: Raw text response from LLM
                
            Returns:
                list: List of parsed question objects
            """
            quiz_questions = []
            current_question = {}
            
            for line in response_text.strip().split('n'):
                line = line.strip()
                if line.startswith('QUESTION:'):
                    if current_question and 'question' in current_question and 'options' in current_question and 'correct' in current_question:
                        quiz_questions.append(current_question)
                    current_question = {
                        'question': line[len('QUESTION:'):].strip(),
                        'options': [],
                        'correct': None
                    }
                elif line.startswith(('A:', 'B:', 'C:', 'D:')):
                    option_letter = line[0]
                    option_text = line[2:].strip()
                    current_question.setdefault('options', []).append((option_letter, option_text))
                elif line.startswith('CORRECT:'):
                    current_question['correct'] = line[len('CORRECT:'):].strip()
            
            # Add the last question
            if current_question and 'question' in current_question and 'options' in current_question and 'correct' in current_question:
                quiz_questions.append(current_question)
            
            # Randomize options for each question
            randomized_questions = []
            for q in quiz_questions:
                # Get the original correct answer
                correct_letter = q['correct']
                correct_option = None
                
                # Find the correct option text
                for letter, text in q['options']:
                    if letter == correct_letter:
                        correct_option = text
                        break
                
                if correct_option is None:
                    # If we can't find the correct answer, keep the question as is
                    randomized_questions.append(q)
                    continue
                    
                # Create a list of options texts and shuffle them
                option_texts = [text for _, text in q['options']]
                
                # Create a copy of the original letters
                option_letters = [letter for letter, _ in q['options']]
                
                # Create a list of (letter, text) pairs
                options_pairs = list(zip(option_letters, option_texts))
                
                # Shuffle the pairs
                random.shuffle(options_pairs)
                
                # Find the new position of the correct answer
                new_correct_letter = None
                for letter, text in options_pairs:
                    if text == correct_option:
                        new_correct_letter = letter
                        break
                
                # Create a new question with randomized options
                new_q = {
                    'question': q['question'],
                    'options': options_pairs,
                    'correct': new_correct_letter
                }
                
                randomized_questions.append(new_q)
            
            return randomized_questions
        
        def calculate_quiz_results(self, questions, user_answers):
            """
            Calculate quiz results based on user answers
            
            Args:
                questions: List of question objects
                user_answers: Dictionary of user answers keyed by question_key
                
            Returns:
                tuple: (results dict, correct count)
            """
            correct_count = 0
            results = {}
            
            for i, question in enumerate(questions):
                question_key = f"quiz_q_{i}"
                user_answer = user_answers.get(question_key)
                correct_answer = question['correct']
                
                # Only count as correct if user selected an answer and it matches
                is_correct = user_answer is not None and user_answer == correct_answer
                if is_correct:
                    correct_count += 1
                
                results[question_key] = {
                    'user_answer': user_answer,
                    'correct_answer': correct_answer,
                    'is_correct': is_correct
                }
            
            return results, correct_count
    Quiz result (image by author)

    Talk-to-Videos also generates flashcards from the video content, which support active recall and spaced repetition learning techniques. This is done through the FlashcardGenerator class in flashcards.py, which creates a mix of different flashcards focusing on key term definitions, conceptual questions, fill-in-the-blank statements, and true/False questions with explanations. A prompt guides the LLM to output flashcards in a structured JSON format, with each card containing distinct “front” and “back” elements. The shuffle_flashcards produces a randomized presentation, and each flashcard is validated to ensure that it contains both front and back components before being presented to the user. The answer to each flashcard is initially hidden. It is revealed at the user’s input using a classic flashcard reveal functionality. Users can generate a new set of flashcards for more practice. The flashcard and quiz systems are interconnected with each other so that users can switch between them as needed.

    # flashcards.py
    class FlashcardGenerator:
        """Class to generate flashcards from video content using the RAG system."""
        
        def __init__(self):
            """Initialize the flashcard generator."""
            pass
        
        def generate_flashcards(self, rag_system, api_key, transcript=None, num_cards=10, model="gpt-4o") -> List[Dict[str, str]]:
            """
            Generate flashcards based on the video content.
            
            Args:
                rag_system: The initialized RAG system with video content
                api_key: OpenAI API key
                transcript: The full transcript text (optional)
                num_cards: Number of flashcards to generate (default: 10)
                model: The OpenAI model to use
                
            Returns:
                List of flashcard dictionaries with 'front' and 'back' keys
            """
            # Import here to avoid circular imports
            from langchain_openai import ChatOpenAI
            
            # Initialize language model
            llm = ChatOpenAI(
                openai_api_key=api_key,
                model=model,
                temperature=0.4
            )
            
            # Create the prompt for flashcard generation
            prompt = f"""
            Create {num_cards} educational flashcards based on the video content.
            
            Each flashcard should have:
            1. A front side with a question, term, or concept
            2. A back side with the answer, definition, or explanation
            
            Focus on the most important and educational content from the video. 
            Create a mix of different types of flashcards:
            - Key term definitions
            - Conceptual questions
            - Fill-in-the-blank statements
            - True/False questions with explanations
            
            Format your response as a JSON array of objects with 'front' and 'back' properties.
            Example:
            [
                {{"front": "What is photosynthesis?", "back": "The process by which plants convert light energy into chemical energy."}},
                {{"front": "The three branches of government are: Executive, Legislative, and _____", "back": "Judicial"}}
            ]
            
            Make sure your output is valid JSON format with exactly {num_cards} flashcards.
            """
            
            try:
                # Determine the context to use
                if transcript:
                    # Use the full transcript if provided
                    # Create messages for the language model
                    messages = [
                        {"role": "system", "content": f"You are an educational content creator specializing in creating effective flashcards. Use the following transcript from a video to create educational flashcards:nn{transcript}"},
                        {"role": "user", "content": prompt}
                    ]
                else:
                    # Fallback to RAG system if no transcript is provided
                    relevant_docs = rag_system.vector_store.similarity_search(
                        "key points and educational concepts in the video", k=15
                    )
                    context = "nn".join([doc.page_content for doc in relevant_docs])
                    
                    # Create messages for the language model
                    messages = [
                        {"role": "system", "content": f"You are an educational content creator specializing in creating effective flashcards. Use the following context from a video to create educational flashcards:nn{context}"},
                        {"role": "user", "content": prompt}
                    ]
                
                # Generate flashcards
                response = llm.invoke(messages)
                content = response.content
                
                # Extract JSON content in case there's text around it
                json_start = content.find('[')
                json_end = content.rfind(']') + 1
                
                if json_start >= 0 and json_end > json_start:
                    json_content = content[json_start:json_end]
                    flashcards = json.loads(json_content)
                else:
                    # Fallback in case of improper JSON formatting
                    raise ValueError("Failed to extract valid JSON from response")
                
                # Verify we have the expected number of cards (or adjust as needed)
                actual_cards = min(len(flashcards), num_cards)
                flashcards = flashcards[:actual_cards]
                
                # Validate each flashcard has required fields
                validated_cards = []
                for card in flashcards:
                    if 'front' in card and 'back' in card:
                        validated_cards.append({
                            'front': card['front'],
                            'back': card['back']
                        })
                
                return validated_cards
            
            except Exception as e:
                # Handle errors gracefully
                print(f"Error generating flashcards: {str(e)}")
                # Return a few basic flashcards in case of error
                return [
                    {"front": "Error generating flashcards", "back": f"Please try again. Error: {str(e)}"},
                    {"front": "Tip", "back": "Try regenerating flashcards or using a different video"}
                ]
        
        def shuffle_flashcards(self, flashcards: List[Dict[str, str]]) -> List[Dict[str, str]]:
            """Shuffle the order of flashcards"""
            shuffled = flashcards.copy()
            random.shuffle(shuffled)
            return shuffled
    Flashcards (image by author)

    Potential Extensions and Improvements

    This application can be extended and improved in a number of ways. For instance:

    • Integration of visual features in video (such as keyframes) may be explored with audio to extract more meaningful information.
    • Team-based learning experiences can be enabled where office colleagues or classmates can share notes, quiz scores, and summaries.
    • Creating navigable transcripts that allow users to click on specific sections to jump to that point in the video
    • Creating step-by-step action plans for implementing concepts from the video in real business settings
    • Modifying the RAG prompt to elaborate on the answers and provide simpler explanations to difficult concepts.
    • Generating questions that stimulate metacognitive skills in learners by stimulating them to think about their thinking process and learning strategies while engaging with video content.

    That’s all folks! If you liked the article, please follow me on Medium and LinkedIn.

    The post Talk to Videos appeared first on Towards Data Science.