DEV Community

Jimmy
Jimmy

Posted on

Building My Smart 2nd Brain: Part 1 - Agentic AI with RAG

Overview

In my previous articles, I developed two LangGraph applications utilizing distinct personas for generating blog articles and images, with web search capabilities as tools. To complete this learning journey with LangGraph, I will now build a new application incorporating the following features:

  • Text embedding using a vector database for Retrieval-Augmented Generation (RAG).
  • Long-term memory using Sqlite to store message history.
  • Human-in-the-Loop (HITL) functionality for user verification of generated results.

This new project, my “Smart 2nd Brain”, is a personal knowledge management system designed to make my life more organized and spark creativity. RAG, long-term memory and HITL aren’t groundbreaking, but combining them felt like building a custom toolbox for my brain. The vector DB made retrieving my scattered notes a breeze, Sqlite gave the system a memory that rivals my own, and HITL kept me in control, letting me tweak outputs to feel just right.

This “Smart 2nd Brain” was a rewarding step in my LangGraph journey, blending AI smarts with personal flair. It’s more than a tool — it’s like an extension of my mind, helping me organize thoughts, rediscover ideas, and fuel curiosity.

The Knowledge State Object

Let's tale a look of the master state object:

from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field class KnowledgeState(BaseModel): messages: List[Dict[str, str]] = Field( default_factory=list, description="Conversation-style messages exchanged between user, AI, and system." ) user_input: Optional[str] = Field( None, description="Raw input from user (query or new document)." ) query_type: Optional[str] = Field( None, description="Either 'ingest' for new document or 'query' for retrieval." ) raw_document: Optional[str] = Field( None, description="Full raw text of the ingested document." ) chunks: Optional[List[str]] = Field( None, description="Text chunks after splitting for embedding." ) embeddings: Optional[List[List[float]]] = Field( None, description="Vector embeddings for the chunks." ) source: Optional[str] = Field( None, description="Source of the document (e.g., filename, URL, transcript ID)." ) categories: Optional[List[str]] = Field( None, description="One or more categories/tags for the document." ) metadata: Optional[Dict[str, Any]] = Field( default_factory=dict, description="Additional metadata (author, date, page, etc.)." ) retrieved_docs: Optional[List[Dict[str, Any]]] = Field( None, description="Relevant documents retrieved from vector DB." ) retrieved_chunks: Optional[List[str]] = Field( None, description="Raw chunks retrieved from the vector DB." ) generated_answer: Optional[str] = Field( None, description="AI-generated draft answer or summary." ) final_answer: Optional[str] = Field( None, description="Final, human-approved answer or summary." ) human_feedback: Optional[str] = Field( None, description="Feedback on AI output: 'approved' | 'rejected' | 'edited'." ) edits: Optional[str] = Field( None, description="Manual corrections provided by the human." ) edited_answer: Optional[str] = Field( None, description="The edited version of the answer provided by human feedback." ) knowledge_type: Optional[str] = Field( None, description="Type of knowledge: 'conversational' | 'reusable' | 'verified'." ) conversation_history: Optional[List[Dict[str, str]]] = Field( None, description="Running log of user/AI/system messages for context." ) user_preferences: Optional[Dict[str, Any]] = Field( None, description="User customization, e.g. summary style, tone, etc." ) status: Optional[str] = Field( None, description="Pipeline status: 'pending', 'processing', 'done', 'error'." ) logs: Optional[List[str]] = Field( default_factory=list, description="Debug logs collected during pipeline execution." ) 
Enter fullscreen mode Exit fullscreen mode

The KnowledgeState class includes all the details that are being passed in the course of the graph execution:

  • messages: Conversation-style messages for chat-like interactions
  • user_input: Raw user input (query or document content)
  • query_type: Workflow type identifier ('ingest' or 'query')
  • raw_document: Full document text for ingestion workflows
  • chunks: Text chunks after document splitting
  • embeddings: Vector representations of text chunks
  • source: Document source identifier
  • categories: Document classification tags
  • metadata: Additional structured metadata
  • retrieved_docs: Retrieved documents from vector database
  • retrieved_chunks: Raw text chunks from retrieval
  • generated_answer: AI-generated response draft
  • final_answer: Human-approved final response
  • human_feedback: Human feedback on AI outputs
  • edits: Manual corrections and edits
  • conversation_history: Complete conversation context
  • user_preferences: User customization settings
  • status: Current workflow status
  • logs: Debug and execution logs

Please note that in this KnowledgeState object some properties are cumulative (e.g. conversation_history) while some are reflecting just the latest values (e.g. generated_answer, final_answer); I will go into the details in later sections. Anyway, this state object facilitates the capability of the Smart 2nd Brain system, enables all required information can be passed throughout the graph.

The Tales of Two Paths

The application graph supports two different paths: "ingest" and "query".

The Smart 2nd Brain supports PDF document ingestion into a vector database for text embedding, this is the so-called "ingest" path. After the documents are stored into the vector database they are available for enriching the LLM to answer questions from users. We call this path as the "query" path.

In the MasterGraphBuilder class, this graph is created by the build method:

def build(self) -> CompiledStateGraph: """ Build and compile the complete LangGraph workflow. This method constructs the workflow graph by defining nodes, setting up conditional routing, and establishing the flow between different processing stages. Returns: CompiledStateGraph: Compiled and checkpointed workflow graph Graph Structure: Entry Point: router Conditional Routing: Based on query_type Ingestion Branch: router -> chunk -> embed -> store -> END Query Branch: router -> retriever -> answer -> review -> validated_store -> END Features: - Conditional edges for workflow routing - Checkpointing for state persistence - Error handling and logging throughout - Human-in-the-loop integration """ # Create the base state graph graph = StateGraph(KnowledgeState) # ============================================================================= # NODE DEFINITIONS # ============================================================================= # Add all workflow nodes graph.add_node("router", self.input_router) # Entry point and routing graph.add_node("chunk", self.chunk_doc_node) # Document chunking graph.add_node("embed", self.embed_node) # Embedding generation graph.add_node("store", self.store_node) # Document storage graph.add_node("retriever", self.retriever_node) # Document retrieval graph.add_node("answer", self.answer_gen_node) # Answer generation graph.add_node("review", self.human_review_node) # Human review graph.add_node("validated_store", self.validated_store_node) # Validated storage # Set the entry point graph.set_entry_point("router") # ============================================================================= # CONDITIONAL ROUTING # ============================================================================= # Define routing logic based on query_type def route_condition(state): if hasattr(state, 'query_type'): if state.query_type == "ingest": return "ingest" elif state.query_type == "query": return "query" else: return "__end__" return "query" # Add conditional edges from router graph.add_conditional_edges("router", route_condition, { "ingest": "chunk", # Route to document ingestion "query": "retriever", # Route to knowledge query "__end__": END, # End workflow for invalid types }) # ============================================================================= # INGESTION BRANCH # ============================================================================= # Document processing pipeline graph.add_edge("chunk", "embed") # Chunk -> Embed graph.add_edge("embed", "store") # Embed -> Store graph.add_edge("store", END) # Store -> End # ============================================================================= # QUERY BRANCH # ============================================================================= # Knowledge query pipeline graph.add_edge("retriever", "answer") # Retrieve -> Generate graph.add_edge("answer", "review") # Generate -> Review graph.add_edge("review", "validated_store") # Review -> Store graph.add_edge("validated_store", END) # Store -> End # Compile the graph with checkpointing return graph.compile(checkpointer=self.checkpointer) 
Enter fullscreen mode Exit fullscreen mode

Feeding the Brain Chunk by Chunk

Let's check the ingest branch first. If the graph is invoked for ingest, the first encountered is the chunk node.

def chunk_doc_node(self, state: KnowledgeState): if state.raw_document: # Use RecursiveCharacterTextSplitter for intelligent text segmentation splitter = RecursiveCharacterTextSplitter( chunk_size=500, # Maximum characters per chunk chunk_overlap=50 # Overlap between consecutive chunks for context ) state.chunks = splitter.split_text(state.raw_document) state.logs = (state.logs or []) + [ f"Chunked document into {len(state.chunks)} chunks" ] else: state.logs = (state.logs or []) + ["No raw_document found for chunking"] return state 
Enter fullscreen mode Exit fullscreen mode

To ingest lengthy text, it must first be divided into smaller chunks for efficient processing. The chunk_doc_node function takes the raw_document and splits it into manageable segments for downstream tasks.

In LangGraph, the RecursiveCharacterTextSplitter facilitates this by breaking text into semantically coherent chunks, ideal for NLP applications like embeddings generation or RAG. It employs a hierarchical approach, using customizable separators (e.g., paragraphs, sentences, words) to prioritize natural boundaries and preserve context.

Here, we set chunk_size=500 and chunk_overlap=50, striking a balance between coherence and minimal redundancy. Once the state returns the chunks, the next step is generating embeddings.

def embed_node(self, state: KnowledgeState): # Use embedding model if available, otherwise fallback to placeholder if state.chunks: if self.embedding_model: try: logger.info(f"🔤 Generating embeddings for {len(state.chunks)} chunks") state.embeddings = self.embedding_model.embed_documents(state.chunks) logger.info(f"✅ Generated {len(state.embeddings)} embeddings") except Exception as e: logger.error(f"❌ Embedding generation failed: {e}") # Fallback to placeholder embeddings for continued processing state.embeddings = [[0.1, 0.2]] * len(state.chunks) state.logs = (state.logs or []) + [f"Embedding error: {str(e)}"] else: logger.warning("⚠️ No embedding model provided, using placeholder embeddings") state.embeddings = [[0.1, 0.2]] * len(state.chunks) return state 
Enter fullscreen mode Exit fullscreen mode

This node transforms text chunks into high-dimensional vector embeddings that encode semantic meaning, enabling effective similarity searches. The embedding model handles this conversion, and LangChain’s AzureOpenAIEmbeddings class seamlessly integrates the model for robust embedding generation.

# Initialize Azure embedding model for document vectorization embedding_model = AzureOpenAIEmbeddings( azure_deployment="text-embedding-3-small", # Your embedding deployment name openai_api_version="2024-12-01-preview", # Azure OpenAI API version azure_endpoint=azure_endpoint, # Azure service endpoint openai_api_key=api_key # API key for authentication ) 
Enter fullscreen mode Exit fullscreen mode

At the conclusion of the pipeline, the store_node function saves the embedded chunks into a vector database, utilizing Chroma, a widely adopted solution for vector storage.

The metadata, which includes multiple categories, is designed to enhance the querying of these embedded chunks during the query path, enabling more precise and context-aware retrieval.

def store_node(self, state: KnowledgeState): if state.embeddings and state.chunks: try: # Initialize vectorstore if not provided if not self.vectorstore: self.vectorstore = Chroma( collection_name="knowledge_base", embedding_function=self.embedding_model, persist_directory=self.chromadb_dir ) # Prepare metadata for each chunk metadatas = [ { "source": state.source or "unknown", "categories": ", ".join(state.categories) if state.categories else "general", "chunk_id": i } for i in range(len(state.chunks)) ] # Store chunks with metadata in ChromaDB self.vectorstore.add_texts( texts=state.chunks, metadatas=metadatas ) # Data is automatically persisted when using persist_directory state.status = "stored" state.logs = (state.logs or []) + [ f"Stored {len(state.chunks)} chunks in ChromaDB with categories {state.categories or ['general']}" ] except Exception as e: state.status = "error" state.logs = (state.logs or []) + [f"Storing failed: {e}"] else: state.logs = (state.logs or []) + ["No embeddings/chunks to store"] return state 
Enter fullscreen mode Exit fullscreen mode

More Exciting Features Await

Rather than overwhelming you with every detail now, I’ll explore the query path, long-term memory, and Human-in-the-Loop capabilities in upcoming articles. I’ll also showcase how to integrate a FastAPI suite with a sleek user interface to seamlessly connect all components. Stay tuned for what’s next!

(Source code will be provided when concluding this series)

Top comments (0)