Building a RAG Pipeline with AWS: A Complete Guide
RAG Conceptual Overview
What is RAG
Retrieval-Augmented Generation (RAG) is an architectural pattern that enhances Large Language Model (LLM) responses by grounding them in external, domain-specific knowledge retrieved at query time. Rather than relying solely on the parametric knowledge baked into a model during pre-training, RAG injects relevant context from a curated knowledge base directly into the prompt, enabling the model to produce answers that are accurate, current, and traceable to source documents.
A standalone LLM generates responses entirely from its training data. This creates three fundamental limitations for enterprise use cases. First, the model’s knowledge has a training cutoff date, so it cannot answer questions about events or documents created after that date. Second, the model has no awareness of proprietary internal data such as company policies, product documentation, or customer records. Third, the model may hallucinate plausible-sounding but factually incorrect information because it has no mechanism to verify its outputs against authoritative sources.
RAG addresses all three limitations by decoupling knowledge storage from the language model itself. Source documents are ingested into a vector database, where they are chunked, embedded into high-dimensional vector representations, and indexed for similarity search. At query time, the user’s question is embedded using the same model, a similarity search retrieves the most relevant document chunks, and those chunks are passed as context to the LLM alongside the original question. The model then generates a response grounded in the retrieved evidence.
For enterprise applications, RAG delivers several distinct advantages. It enables organizations to leverage proprietary data without fine-tuning expensive foundation models. It provides citation and attribution capabilities since every response can reference the source documents that informed it. It supports real-time knowledge updates because new documents can be ingested into the vector store without retraining the model. And it reduces hallucination risk by constraining the model’s generation to information present in the retrieved context. These properties make RAG the preferred pattern for building production-grade question-answering systems, internal knowledge assistants, and customer-facing support tools on AWS.
RAG Workflow Stages
A RAG pipeline operates in two distinct phases: an offline ingestion phase that prepares the knowledge base, and an online retrieval phase that answers user queries. Each stage maps to a specific AWS service responsible for executing that operation.
Stage 1: Document Ingestion (AWS S3) — Source documents (PDFs, text files, HTML pages, CSVs, Markdown files) are uploaded to an S3 bucket that serves as the data source for the pipeline. S3 provides durable, versioned storage and integrates natively with downstream AWS services through IAM trust relationships. A data synchronization process transfers documents from an upstream datalake into the designated S3 bucket, organizing them by category or ingestion batch.
Stage 2: Embedding Generation (AWS Bedrock) — AWS Bedrock’s embedding models (such as Amazon Titan Embeddings V2) convert document chunks into dense vector representations. Each chunk is transformed into a fixed-dimensional numerical array that captures its semantic meaning. Bedrock Knowledge Bases orchestrate this process automatically, handling document parsing, chunking, and embedding generation as a managed workflow.
Stage 3: Vector Storage (Amazon OpenSearch Serverless) — The generated embeddings are indexed in an OpenSearch Serverless collection configured with the VECTORSEARCH collection type. OpenSearch Serverless provides a fully managed vector database that supports approximate nearest neighbor (ANN) search using algorithms like HNSW (Hierarchical Navigable Small World). Each vector is stored alongside metadata fields that enable filtered retrieval.
Stage 4: Query Processing (API Gateway + Lambda) — When a user submits a question, the request arrives through an API Gateway REST endpoint. A Lambda function receives the request, validates the input parameters, and prepares the query for the retrieval stage. The API layer handles authentication, request throttling, and input validation before forwarding the query downstream.
Stage 5: Context Retrieval (OpenSearch Serverless + Bedrock) — The user’s query is embedded using the same Bedrock embedding model used during ingestion, producing a query vector. This vector is used to perform a similarity search against the OpenSearch Serverless index, returning the top-k most semantically similar document chunks. The retrieval configuration controls the number of results, similarity threshold, and optional metadata filters.
Stage 6: Response Generation (AWS Bedrock LLM) — The retrieved document chunks are assembled into a context window and passed to a Bedrock foundation model (such as Anthropic Claude or Amazon Titan Text) along with the original user query. The LLM generates a natural language response grounded in the provided context. Bedrock’s retrieve-and-generate API orchestrates this final stage, combining retrieval results with LLM inference in a single API call.
Architecture Diagram
The following diagram illustrates the end-to-end RAG pipeline architecture on AWS, showing both the ingestion path (offline document processing) and the retrieval path (online query handling). Each service is labeled with its role in the pipeline, and directional arrows indicate the flow of data between components.
AWS Bedrock Integration
Foundation Models
AWS Bedrock is a fully managed service that provides access to foundation models (FMs) from leading AI providers through a unified API. In the context of a RAG pipeline, Bedrock serves two distinct roles: generating vector embeddings from text and performing large language model (LLM) inference for response generation. Understanding the distinction between these two model types is critical to building an effective pipeline.
Embedding models transform text into dense numerical vectors that capture semantic meaning. These vectors are stored in a vector database and used for similarity search during retrieval. The key characteristic of an embedding model is that it produces a fixed-dimension output vector regardless of input length. For example, amazon.titan-embed-text-v1 produces 1536-dimensional vectors, meaning every input text—whether a single sentence or a full paragraph—is represented as a point in 1536-dimensional space.
Text generation models (LLMs), on the other hand, accept a prompt and produce natural language output. In a RAG pipeline, the LLM receives the user’s query along with retrieved context passages and generates a coherent answer grounded in that context. Models like anthropic.claude-v2 and amazon.titan-text-express-v1 are well-suited for this task because they support large context windows and can follow instructions to cite retrieved sources.
The separation of concerns is important: embedding models are optimized for producing meaningful vector representations with low latency, while LLMs are optimized for reasoning over context and generating fluent responses. Bedrock allows you to select different models for each role, giving you flexibility to optimize cost, latency, and quality independently.
Embedding Generation
For embedding generation in a RAG pipeline, amazon.titan-embed-text-v1 is a strong default choice. It produces 1536-dimensional vectors, supports input text up to 8,192 tokens, and is optimized for semantic similarity tasks. The model accepts a simple JSON payload with the input text and returns the embedding vector directly.
Key configuration parameters for the Titan embedding model include:
- modelId:
amazon.titan-embed-text-v1— identifies the embedding model - inputText: The text string to embed (max 8,192 tokens)
- Output dimensions: 1536 (fixed for this model — your vector index must match)
The following code demonstrates how to call the Bedrock Runtime API to generate an embedding vector using boto3. This is the core operation that runs during both ingestion (embedding documents) and retrieval (embedding the user query).
import boto3
import json
# Initialize the Bedrock Runtime client
bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name='us-east-1'
)
def generate_embedding(text: str) -> list:
"""
Generate a 1536-dimensional embedding vector for the given text
using Amazon Titan Embed Text v1.
"""
request_body = json.dumps({
"inputText": text
})
response = bedrock_runtime.invoke_model(
modelId='amazon.titan-embed-text-v1',
contentType='application/json',
accept='application/json',
body=request_body
)
response_body = json.loads(response['body'].read())
# The response contains the embedding vector and input token count
embedding = response_body['embedding'] # List of 1536 floats
input_token_count = response_body['inputTextTokenCount']
print(f"Generated embedding with {len(embedding)} dimensions")
print(f"Input consumed {input_token_count} tokens")
return embedding
# Example usage: embed a document chunk during ingestion
document_chunk = (
"AWS Lambda is a serverless compute service that runs your code "
"in response to events and automatically manages the underlying "
"compute resources for you."
)
embedding_vector = generate_embedding(document_chunk)
# embedding_vector is now a list of 1536 float values
# ready to be indexed in OpenSearch Serverless
The response structure from the Titan embedding model is straightforward: it returns the embedding field (a list of floating-point numbers) and inputTextTokenCount indicating how many tokens were consumed. During ingestion, you call this function for each document chunk before storing the vector in OpenSearch Serverless. During retrieval, you call it once for the user’s query to produce a query vector for similarity search.
LLM Inference
Once relevant context passages are retrieved from the vector store, the next step is to pass them along with the user’s query to an LLM for response generation. AWS Bedrock supports several text generation models suitable for RAG, including anthropic.claude-v2 (100K token context window, strong instruction following) and amazon.titan-text-express-v1 (8K token context, lower cost).
Key invocation parameters for LLM inference include:
- modelId:
anthropic.claude-v2oramazon.titan-text-express-v1 - maxTokens: Maximum number of tokens to generate in the response (e.g., 512–2048)
- temperature: Controls randomness in generation (0.0 for deterministic, 0.7 for creative; use 0.1–0.3 for factual RAG responses)
- topP: Nucleus sampling parameter (0.9 is a common default)
- stopSequences: Tokens that signal the model to stop generating
The following code demonstrates how to invoke Claude v2 through Bedrock for RAG response generation. The prompt is constructed by combining the retrieved context with the user’s question, instructing the model to answer based only on the provided context.
import boto3
import json
bedrock_runtime = boto3.client(
service_name='bedrock-runtime',
region_name='us-east-1'
)
def generate_rag_response(query: str, context_passages: list) -> str:
"""
Generate a response using Claude v2 with retrieved context passages.
The model is instructed to answer based only on the provided context.
"""
# Format retrieved passages into a context block
context_block = "\n\n".join(
f"[Source {i+1}]: {passage}"
for i, passage in enumerate(context_passages)
)
# Construct the prompt with context and query
prompt = f"""\n\nHuman: You are a helpful assistant that answers questions
based on the provided context. Only use information from the context below.
If the context does not contain enough information to answer, say so.
Context:
{context_block}
Question: {query}
\n\nAssistant:"""
request_body = json.dumps({
"prompt": prompt,
"max_tokens_to_sample": 1024,
"temperature": 0.2,
"top_p": 0.9,
"stop_sequences": ["\n\nHuman:"]
})
response = bedrock_runtime.invoke_model(
modelId='anthropic.claude-v2',
contentType='application/json',
accept='application/json',
body=request_body
)
response_body = json.loads(response['body'].read())
generated_text = response_body['completion']
stop_reason = response_body['stop_reason']
print(f"Response generated ({stop_reason})")
return generated_text.strip()
# Example usage: generate a response from retrieved context
retrieved_passages = [
"AWS Lambda supports Python, Node.js, Java, Go, and .NET runtimes. "
"Functions can be triggered by over 200 AWS services and SaaS applications.",
"Lambda functions have a maximum execution timeout of 15 minutes "
"and can be allocated between 128 MB and 10,240 MB of memory."
]
user_query = "What programming languages does AWS Lambda support?"
answer = generate_rag_response(user_query, retrieved_passages)
print(f"Answer: {answer}")
For RAG workloads, a low temperature (0.1–0.3) is recommended to keep responses factual and grounded in the retrieved context. Higher temperatures introduce more variation, which can lead to hallucination—exactly what RAG is designed to prevent. The max_tokens_to_sample parameter should be set based on your expected response length; for concise answers, 512 tokens is often sufficient, while detailed explanations may need 1024–2048 tokens.
If you choose amazon.titan-text-express-v1 instead of Claude, the request body structure differs slightly. Titan uses inputText for the prompt and textGenerationConfig for parameters like maxTokenCount, temperature, and topP. The trade-off is a smaller context window (8K tokens) at lower per-token cost, which may be acceptable if your retrieved passages are short.
Knowledge Bases
Bedrock Knowledge Bases is a managed orchestration layer that automates the ingestion pipeline from raw documents to indexed vectors. Rather than writing custom code to chunk documents, generate embeddings, and index them in a vector store, Knowledge Bases handles this entire workflow as a managed service.
When you create a Knowledge Base, you configure three core components:
- Data source: An S3 bucket (and optional prefix) containing your source documents. Knowledge Bases supports PDF, TXT, CSV, HTML, MD, and DOCX formats.
- Embedding model: The foundation model used to generate vectors from document chunks. You specify the model ID (e.g.,
amazon.titan-embed-text-v1) and the Knowledge Base uses it automatically during ingestion. - Vector store: The target index where embeddings are stored. Knowledge Bases integrates natively with OpenSearch Serverless, Amazon Aurora PostgreSQL (pgvector), Pinecone, and Redis Enterprise Cloud.
During an ingestion sync job, Knowledge Bases performs the following steps automatically: it reads documents from the configured S3 data source, splits them into chunks based on your chunking strategy (fixed-size, semantic, or no chunking), generates embedding vectors for each chunk using the configured embedding model, and indexes the vectors along with metadata in the configured vector store.
The chunking strategy is configurable at Knowledge Base creation time. Fixed-size chunking splits documents into segments of a specified token count (e.g., 300 tokens with 20% overlap). This overlap ensures that context is not lost at chunk boundaries. Semantic chunking uses the embedding model to identify natural breakpoints in the text, producing more coherent chunks at the cost of variable sizes.
Knowledge Bases also provides a managed retrieval API (retrieve and retrieve_and_generate) that handles query embedding, vector search, and optional LLM response generation in a single API call. This simplifies the retrieval side of the pipeline significantly, though you can also query the vector store directly if you need more control over the search parameters.
Model Access & Quotas
Before you can invoke any foundation model through Bedrock, you must explicitly enable access to that model in your AWS account. This is a one-time prerequisite step performed in the Bedrock console under “Model access.” You select the models you want to use (e.g., Amazon Titan Embed Text v1, Anthropic Claude v2) and submit an access request. Amazon models are typically granted instantly, while third-party models (Anthropic, Cohere, Meta) may require acceptance of an end-user license agreement (EULA).
Model access is region-specific. If you enable Claude v2 in us-east-1, it is not automatically available in eu-west-1. Plan your region strategy early, especially if you need to comply with data residency requirements.
Service quotas are a critical consideration for RAG workloads, which can generate high volumes of API calls during both ingestion and retrieval. Key quotas to monitor include:
- Invocations per minute (IPM): Each model has a default rate limit. For example, Titan Embed Text v1 defaults to 1,000 IPM. During bulk ingestion of thousands of document chunks, you may hit this limit quickly.
- Tokens per minute (TPM): Limits the total token throughput. Large documents with many chunks can exhaust this quota during ingestion.
- Concurrent invocations: The number of simultaneous API calls allowed. Parallel ingestion workers must respect this limit.
- Knowledge Base sync jobs: Limited to a certain number of concurrent sync jobs per account.
To handle throttling gracefully, implement exponential backoff with jitter in your ingestion Lambda functions. You can also request quota increases through the AWS Service Quotas console if your workload requires higher throughput. For production RAG pipelines processing large document corpora, consider provisioned throughput (available for select models) which guarantees a fixed number of invocations per minute at a predictable cost.
Monitoring is equally important: use Amazon CloudWatch metrics for Bedrock (InvocationCount, InvocationLatency, ThrottledCount) to track usage patterns and set alarms before you hit quota ceilings. This visibility helps you right-size your quotas and identify bottlenecks in the ingestion pipeline.
OpenSearch Serverless Vector Database
Vector Search Collections
AWS OpenSearch Serverless provides a fully managed vector database capability through its VECTORSEARCH collection type. Unlike traditional OpenSearch collections designed for log analytics or time-series data, vector search collections are purpose-built for storing high-dimensional embeddings and performing approximate nearest neighbor (ANN) searches at scale. The underlying vector search engine uses the NMSLIB and Faiss libraries with the HNSW (Hierarchical Navigable Small World) algorithm, which provides sub-linear query time complexity even across millions of vectors.
OpenSearch Serverless supports three similarity metrics for vector search: cosine similarity (measuring the angle between vectors, ideal for normalized embeddings), Euclidean distance (L2 norm, measuring straight-line distance between vector endpoints), and dot product (inner product, optimized for vectors that are already normalized). For RAG workloads using Bedrock embedding models like Amazon Titan Embeddings, cosine similarity is the recommended metric because Titan produces normalized vectors where cosine similarity and dot product yield equivalent results.
A key advantage of the serverless model is that you do not manage cluster capacity, shard allocation, or replica counts. OpenSearch Serverless automatically scales compute (OCUs — OpenSearch Compute Units) based on indexing and search workload. Collections are created with encryption and network access policies that control who can reach the endpoints, making them well-suited for integration with other AWS services like Bedrock Knowledge Bases.
The following code demonstrates how to create an OpenSearch Serverless vector search collection using boto3, including the required encryption and network policies:
import boto3
import json
import time
aoss_client = boto3.client('opensearchserverless')
collection_name = 'rag-vectors'
# Step 1: Create encryption policy (required before collection creation)
encryption_policy = json.dumps({
"Rules": [
{
"Resource": [f"collection/{collection_name}"],
"ResourceType": "collection"
}
],
"AWSOwnedKey": True
})
aoss_client.create_security_policy(
name=f"{collection_name}-enc-policy",
type="encryption",
policy=encryption_policy,
description="Encryption policy for RAG vector collection"
)
# Step 2: Create network policy (controls endpoint access)
network_policy = json.dumps([
{
"Rules": [
{
"Resource": [f"collection/{collection_name}"],
"ResourceType": "collection"
}
],
"AllowFromPublic": True # Use VPC endpoint in production
}
])
aoss_client.create_security_policy(
name=f"{collection_name}-net-policy",
type="network",
policy=network_policy,
description="Network policy for RAG vector collection"
)
# Step 3: Create the vector search collection
response = aoss_client.create_collection(
name=collection_name,
type="VECTORSEARCH",
description="Vector store for RAG document embeddings"
)
collection_id = response['createCollectionDetail']['id']
print(f"Collection created: {collection_id}")
# Step 4: Wait for collection to become active
while True:
status = aoss_client.batch_get_collection(ids=[collection_id])
state = status['collectionDetails'][0]['status']
if state == 'ACTIVE':
endpoint = status['collectionDetails'][0]['collectionEndpoint']
print(f"Collection active at: {endpoint}")
break
time.sleep(10)
The collection creation process requires encryption and network policies to be in place before the collection itself can be created. The VECTORSEARCH type instructs OpenSearch Serverless to provision the collection with vector-optimized storage and indexing capabilities. Once active, the collection exposes an HTTPS endpoint that accepts standard OpenSearch API calls for index management and document operations.
Index Configuration
After the collection is active, you create an index that defines the vector field schema. The index configuration specifies the vector dimensions (which must match your embedding model output), the similarity engine, the space type (similarity metric), and any metadata fields you want to store alongside the vectors for filtering during search.
For a RAG pipeline using Amazon Titan Embeddings V1, the vector dimension is 1536. If you use Titan Embeddings V2, you can choose between 256, 512, or 1024 dimensions depending on your latency and accuracy requirements. The index mapping also includes metadata fields such as the source document path, chunk identifier, and any categorical fields used for filtered search.
The following code creates an index with vector field configuration using the opensearch-py client connected to the serverless endpoint:
from opensearchpy import OpenSearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
# Authenticate using SigV4 for OpenSearch Serverless
region = 'us-east-1'
service = 'aoss'
credentials = boto3.Session().get_credentials()
aws_auth = AWS4Auth(
credentials.access_key,
credentials.secret_key,
region,
service,
session_token=credentials.token
)
# Connect to the OpenSearch Serverless endpoint
host = 'your-collection-id.us-east-1.aoss.amazonaws.com'
client = OpenSearch(
hosts=[{'host': host, 'port': 443}],
http_auth=aws_auth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection,
timeout=60
)
# Define the index with vector field configuration
index_name = 'rag-index'
index_body = {
"settings": {
"index": {
"knn": True # Enable k-NN plugin for vector search
}
},
"mappings": {
"properties": {
"embedding": {
"type": "knn_vector",
"dimension": 1536, # Matches Titan Embeddings V1 output
"method": {
"name": "hnsw",
"engine": "faiss",
"space_type": "cosinesimil", # Cosine similarity
"parameters": {
"ef_construction": 256, # Index-time accuracy
"m": 48 # Graph connectivity
}
}
},
"text": {
"type": "text" # Original chunk text for retrieval
},
"source_document": {
"type": "keyword" # S3 key of source document
},
"chunk_id": {
"type": "keyword" # Unique chunk identifier
},
"category": {
"type": "keyword" # Document category for filtering
},
"ingestion_timestamp": {
"type": "date" # When the chunk was indexed
}
}
}
}
response = client.indices.create(index=index_name, body=index_body)
print(f"Index created: {response}")
The HNSW parameters ef_construction and m control the trade-off between index build time and search accuracy. Higher values of ef_construction produce a more accurate graph at the cost of slower indexing. The m parameter controls how many bidirectional links each node maintains — higher values improve recall but increase memory usage. For RAG workloads with moderate corpus sizes (under 1 million chunks), the values shown above provide a good balance between accuracy and performance.
The space_type field must align with how your embedding model produces vectors. Amazon Titan Embeddings outputs normalized vectors, so cosinesimil (cosine similarity) is the appropriate choice. If you were using a model that produces unnormalized vectors and you cared about magnitude, l2 (Euclidean distance) would be more appropriate.
Similarity Search
Once documents are indexed, you perform similarity search by converting a user query into an embedding vector (using the same embedding model used during ingestion) and then executing a k-NN search against the index. The search returns the top-k most similar document chunks ranked by the configured similarity metric, along with their metadata fields and similarity scores.
The query supports additional parameters like k (number of results), pre-filtering on metadata fields, and a minimum score threshold to exclude low-relevance results. You can combine vector search with traditional keyword filters to implement hybrid search strategies.
The following code demonstrates a similarity search query that retrieves the top 5 most relevant document chunks for a given query embedding, with optional metadata filtering:
import boto3
import json
# Step 1: Generate query embedding using Bedrock Titan Embeddings
bedrock_runtime = boto3.client('bedrock-runtime', region_name='us-east-1')
query_text = "How does the ingestion pipeline handle document chunking?"
embed_response = bedrock_runtime.invoke_model(
modelId='amazon.titan-embed-text-v1',
contentType='application/json',
accept='application/json',
body=json.dumps({"inputText": query_text})
)
query_embedding = json.loads(embed_response['body'].read())['embedding']
# Step 2: Execute k-NN similarity search against OpenSearch Serverless
search_body = {
"size": 5, # Return top 5 results
"query": {
"knn": {
"embedding": {
"vector": query_embedding,
"k": 5
}
}
},
"_source": ["text", "source_document", "chunk_id", "category"]
}
# Optional: Add metadata filter to restrict search to a specific category
search_with_filter = {
"size": 5,
"query": {
"bool": {
"must": [
{
"knn": {
"embedding": {
"vector": query_embedding,
"k": 5
}
}
}
],
"filter": [
{"term": {"category": "architecture"}}
]
}
},
"_source": ["text", "source_document", "chunk_id", "category"]
}
results = client.search(index='rag-index', body=search_body)
# Step 3: Process results — extract text chunks for LLM context
retrieved_chunks = []
for hit in results['hits']['hits']:
retrieved_chunks.append({
"text": hit['_source']['text'],
"source": hit['_source']['source_document'],
"score": hit['_score'],
"chunk_id": hit['_source']['chunk_id']
})
print(f"Score: {hit['_score']:.4f} | Source: {hit['_source']['source_document']}")
# The retrieved_chunks list is passed as context to the LLM for generation
The similarity scores returned by OpenSearch range from 0 to 1 for cosine similarity, where 1 indicates identical vectors. In practice, scores above 0.7 typically indicate strong semantic relevance for Titan Embeddings. You can set a minimum score threshold in your application logic to filter out low-confidence results before passing context to the LLM, reducing the chance of hallucination from irrelevant chunks.
Bedrock Integration
AWS Bedrock Knowledge Bases integrate directly with OpenSearch Serverless as a managed vector store backend. When you create a Knowledge Base in Bedrock, you specify an OpenSearch Serverless collection as the storage configuration. Bedrock then handles the entire ingestion pipeline automatically: it reads documents from your S3 data source, chunks them according to your configured strategy, generates embeddings using your selected model, and indexes the vectors into the OpenSearch collection — all without custom code.
The integration requires an IAM role that Bedrock assumes to access both the S3 data source and the OpenSearch Serverless collection. This role needs a trust policy allowing the bedrock.amazonaws.com service principal to assume it, plus permissions for aoss:APIAccessAll on the collection. Additionally, you must create a data access policy on the OpenSearch Serverless collection that grants the Bedrock IAM role permission to create indexes, write documents, and perform searches.
The data access policy is a critical component that many implementations miss. Without it, Bedrock can authenticate to the collection endpoint but cannot perform any operations. The policy grants index-level and collection-level permissions to the IAM role ARN:
import boto3
import json
aoss_client = boto3.client('opensearchserverless')
# Data access policy granting Bedrock role permissions on the collection
bedrock_role_arn = "arn:aws:iam::123456789012:role/BedrockKnowledgeBaseRole"
collection_name = "rag-vectors"
data_access_policy = json.dumps([
{
"Rules": [
{
"Resource": [f"collection/{collection_name}"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
],
"ResourceType": "collection"
},
{
"Resource": [f"index/{collection_name}/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
],
"ResourceType": "index"
}
],
"Principal": [bedrock_role_arn],
"Description": "Grant Bedrock Knowledge Base access to vector collection"
}
])
aoss_client.create_access_policy(
name=f"{collection_name}-bedrock-access",
type="data",
policy=data_access_policy,
description="Data access policy for Bedrock Knowledge Base integration"
)
print("Data access policy created for Bedrock integration")
During ingestion, Bedrock Knowledge Bases automatically creates the index in OpenSearch Serverless if it does not already exist, using the vector dimensions from the configured embedding model. Each ingestion sync job processes new or modified documents from S3, generates embeddings, and upserts vectors into the index. Deleted source documents are also removed from the vector store during sync, keeping the index consistent with the data source.
Dimension Alignment
One of the most common configuration errors in RAG pipelines is a mismatch between the embedding model’s output dimensions and the OpenSearch index vector field dimensions. If the index is configured with "dimension": 1024 but the embedding model produces 1536-dimensional vectors, indexing operations will fail with a dimension mismatch error. This alignment must be verified at configuration time and maintained whenever the embedding model is changed.
Each Bedrock embedding model produces vectors of a specific size. Amazon Titan Embeddings V1 always outputs 1536 dimensions. Amazon Titan Embeddings V2 supports configurable dimensions (256, 512, or 1024) specified at inference time via the dimensions parameter. Cohere Embed models on Bedrock produce 1024 dimensions. When configuring your OpenSearch index, the dimension field in the knn_vector mapping must exactly match the output size of your chosen model.
The alignment requirement extends beyond initial setup. If you later switch embedding models (for example, migrating from Titan V1 at 1536 dimensions to Titan V2 at 1024 dimensions for lower latency), you must re-create the index with the new dimension value and re-ingest all documents to generate new embeddings. Vectors from different models or different dimension configurations are not compatible and cannot coexist in the same index field.
When using Bedrock Knowledge Bases, the service handles this alignment automatically — it reads the embedding model configuration and creates the index with matching dimensions. However, if you pre-create the index manually (for example, to customize HNSW parameters), you must ensure the dimension value matches. The table below summarizes common Bedrock embedding models and their output dimensions:
| Model | Model ID | Output Dimensions | Notes |
|---|---|---|---|
| Amazon Titan Embeddings V1 | amazon.titan-embed-text-v1 | 1536 | Fixed dimension, normalized output |
| Amazon Titan Embeddings V2 | amazon.titan-embed-text-v2:0 | 256 / 512 / 1024 | Configurable via dimensions parameter |
| Cohere Embed English | cohere.embed-english-v3 | 1024 | Fixed dimension |
| Cohere Embed Multilingual | cohere.embed-multilingual-v3 | 1024 | Fixed dimension, multilingual support |
To validate alignment programmatically, you can query the index mapping and compare it against the embedding model’s output. This check should be part of your deployment validation or health check routine to catch configuration drift early, especially in environments where infrastructure is provisioned separately from application configuration.
S3 Data Source Storage
Bucket Configuration
AWS S3 serves as the primary data source bucket for storing documents that feed into the RAG ingestion pipeline. The bucket acts as the origin point for all content that Bedrock Knowledge Bases will process, chunk, embed, and index into OpenSearch Serverless. A dedicated S3 bucket isolates RAG source documents from other application data and provides fine-grained access control through bucket policies and IAM roles.
The core requirement for the S3 data source is an IAM trust relationship that grants Bedrock Knowledge Bases read access to the bucket contents. Bedrock assumes a service role with permissions to list and retrieve objects from the bucket during ingestion jobs. Without this trust relationship, the Knowledge Base cannot access documents for processing, and ingestion jobs will fail with access denied errors.
When creating the bucket, enable versioning to track document changes over time. Versioning allows the ingestion pipeline to identify updated documents and re-process them during subsequent sync operations. Server-side encryption with AWS-managed keys (SSE-S3) or customer-managed KMS keys protects documents at rest. Block all public access to ensure documents remain private and accessible only through authorized IAM roles.
import boto3
import json
s3_client = boto3.client('s3', region_name='us-east-1')
# Create the S3 bucket for RAG data source documents
bucket_name = 'rag-knowledge-base-datasource-prod'
s3_client.create_bucket(
Bucket=bucket_name,
CreateBucketConfiguration={
'LocationConstraint': 'us-east-1'
}
)
# Enable versioning to track document changes
s3_client.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={
'Status': 'Enabled'
}
)
# Enable server-side encryption with AWS-managed keys
s3_client.put_bucket_encryption(
Bucket=bucket_name,
ServerSideEncryptionConfiguration={
'Rules': [
{
'ApplyServerSideEncryptionByDefault': {
'SSEAlgorithm': 'aws:kms',
'KMSMasterKeyID': 'alias/rag-datasource-key'
},
'BucketKeyEnabled': True
}
]
}
)
# Block all public access
s3_client.put_public_access_block(
Bucket=bucket_name,
PublicAccessBlockConfiguration={
'BlockPublicAcls': True,
'IgnorePublicAcls': True,
'BlockPublicPolicy': True,
'RestrictPublicBuckets': True
}
)
print(f"Bucket '{bucket_name}' created with versioning and encryption enabled")The bucket configuration above establishes a secure foundation for the RAG data source. Versioning ensures that every document modification creates a new version, enabling the ingestion pipeline to detect changes. KMS encryption with a dedicated key provides audit trails through CloudTrail for compliance requirements. The public access block prevents accidental exposure of sensitive documents.
Document Organization
Organizing documents within the S3 bucket follows a folder structure that groups content by category or ingestion batch. While S3 uses a flat object namespace with key prefixes rather than true directories, the prefix-based structure provides logical separation that simplifies management, access control, and selective ingestion. Bedrock Knowledge Bases can be configured to ingest from specific prefixes, allowing you to control which document categories are included in the knowledge base.
A recommended folder structure separates documents by content type and processing status:
import boto3
from datetime import datetime
s3_client = boto3.client('s3', region_name='us-east-1')
bucket_name = 'rag-knowledge-base-datasource-prod'
# Define the folder structure for document organization
folder_structure = [
'documents/technical-guides/', # Technical documentation and guides
'documents/api-references/', # API reference documentation
'documents/tutorials/', # Step-by-step tutorials
'documents/whitepapers/', # Research and whitepapers
'documents/release-notes/', # Product release notes
'staging/', # Staging area for new documents before ingestion
'archive/', # Archived documents removed from active ingestion
]
# Create folder markers (zero-byte objects) for organization
for folder in folder_structure:
s3_client.put_object(
Bucket=bucket_name,
Key=folder,
Body=b''
)
print(f"Created folder: {folder}")
# Upload a sample document to the technical guides folder
document_content = open('/tmp/rag-architecture-guide.pdf', 'rb').read()
s3_client.put_object(
Bucket=bucket_name,
Key='documents/technical-guides/rag-architecture-guide.pdf',
Body=document_content,
ContentType='application/pdf',
Metadata={
'ingestion-batch': '2024-01-15',
'document-category': 'technical-guides',
'source-system': 'datalake-export'
}
)
# Upload a markdown document
markdown_content = "# Vector Search Configuration\n\nThis guide covers..."
s3_client.put_object(
Bucket=bucket_name,
Key='documents/tutorials/vector-search-setup.md',
Body=markdown_content.encode('utf-8'),
ContentType='text/markdown',
Metadata={
'ingestion-batch': '2024-01-15',
'document-category': 'tutorials',
'source-system': 'content-management'
}
)
print("Documents uploaded with metadata tags for tracking")Bedrock Knowledge Bases supports the following file formats for ingestion: PDF, TXT, CSV, HTML, MD (Markdown), DOC, and DOCX. Each format is parsed differently during the chunking phase. PDF documents are extracted with text and structural information preserved. Markdown and HTML files retain their heading hierarchy, which can inform chunking boundaries. CSV files are processed row-by-row, with each row potentially becoming a separate chunk depending on the chunking strategy configured in the Knowledge Base.
Object metadata tags on each uploaded document provide additional context for tracking ingestion batches, source systems, and document categories. These tags do not affect the ingestion process directly but are valuable for operational monitoring, cost allocation, and debugging when specific documents fail to process correctly.
Versioning considerations are important for document updates. When a document is overwritten with a new version, the next ingestion sync detects the change and re-processes the document. The previous version remains accessible in S3 for rollback purposes. For documents that should be removed from the knowledge base, delete the object from the active prefix and trigger a new ingestion sync — Bedrock will remove the corresponding vectors from OpenSearch Serverless.
IAM Policies
The IAM policy configuration for the S3 data source establishes the trust relationship between Bedrock Knowledge Bases and the S3 bucket. Bedrock requires an IAM role that it can assume to read objects from the bucket during ingestion. This role must have a trust policy allowing the Bedrock service principal to assume it, and a permissions policy granting read access to the specific bucket and prefixes containing documents.
The following IAM policy grants Bedrock Knowledge Bases the minimum permissions required to read documents from the S3 data source bucket:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockToAccessS3DataSource",
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::rag-knowledge-base-datasource-prod",
"arn:aws:s3:::rag-knowledge-base-datasource-prod/documents/*"
],
"Condition": {
"StringEquals": {
"aws:PrincipalAccount": "123456789012"
}
}
},
{
"Sid": "AllowKMSDecryptForEncryptedObjects",
"Effect": "Allow",
"Action": [
"kms:Decrypt",
"kms:GenerateDataKey"
],
"Resource": "arn:aws:kms:us-east-1:123456789012:key/rag-datasource-key-id"
}
]
}The trust policy on the IAM role allows the Bedrock service to assume the role. This trust relationship is what enables Bedrock Knowledge Bases to authenticate and access the S3 bucket on behalf of the ingestion process:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "AllowBedrockToAssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "bedrock.amazonaws.com"
},
"Action": "sts:AssumeRole",
"Condition": {
"StringEquals": {
"aws:SourceAccount": "123456789012"
},
"ArnLike": {
"aws:SourceArn": "arn:aws:bedrock:us-east-1:123456789012:knowledge-base/*"
}
}
}
]
}The condition keys in the trust policy restrict role assumption to Bedrock Knowledge Base resources within your specific account, preventing cross-account confused deputy attacks. The aws:SourceAccount condition ensures only your account’s Bedrock service can assume the role, while aws:SourceArn further limits it to Knowledge Base resources specifically.
If the bucket uses KMS encryption, the IAM role also needs kms:Decrypt permissions on the KMS key used for server-side encryption. Without this permission, Bedrock can list objects but cannot read their contents, resulting in ingestion failures for individual documents while the overall job continues processing other accessible files.
Data Sync Process
The data synchronization process transfers documents from a centralized datalake to the S3 data source bucket, ensuring the RAG knowledge base stays current with the latest content. This sync operates as a scheduled or event-driven pipeline that identifies new and updated documents in the source datalake, copies them to the appropriate S3 prefix, and optionally triggers a Bedrock Knowledge Base ingestion job to process the changes.
The sync mechanism uses S3 object metadata and version IDs to determine which documents require transfer. A sync manifest stored in the staging prefix tracks the last successful sync timestamp and the set of objects processed. On each sync execution, the pipeline compares the source datalake contents against the manifest to identify additions, modifications, and deletions.
The folder mapping between the source datalake and the S3 data source bucket follows a convention-based approach. Source paths in the datalake map to corresponding prefixes in the destination bucket:
import boto3
import json
from datetime import datetime, timezone
s3_client = boto3.client('s3', region_name='us-east-1')
SOURCE_BUCKET = 'enterprise-datalake-prod'
DEST_BUCKET = 'rag-knowledge-base-datasource-prod'
# Folder mapping: datalake prefix -> RAG data source prefix
FOLDER_MAPPING = {
'exports/technical-docs/': 'documents/technical-guides/',
'exports/api-docs/': 'documents/api-references/',
'exports/tutorials/': 'documents/tutorials/',
'exports/whitepapers/': 'documents/whitepapers/',
'exports/changelogs/': 'documents/release-notes/',
}
SUPPORTED_FORMATS = ['.pdf', '.txt', '.csv', '.html', '.md', '.doc', '.docx']
def get_last_sync_timestamp():
"""Retrieve the last successful sync timestamp from the manifest."""
try:
response = s3_client.get_object(
Bucket=DEST_BUCKET,
Key='staging/sync-manifest.json'
)
manifest = json.loads(response['Body'].read())
return datetime.fromisoformat(manifest['last_sync_timestamp'])
except s3_client.exceptions.NoSuchKey:
return datetime(2000, 1, 1, tzinfo=timezone.utc)
def sync_documents_from_datalake():
"""Sync new and updated documents from datalake to RAG data source bucket."""
last_sync = get_last_sync_timestamp()
synced_objects = []
for source_prefix, dest_prefix in FOLDER_MAPPING.items():
# List objects in the source datalake prefix
paginator = s3_client.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=SOURCE_BUCKET, Prefix=source_prefix)
for page in pages:
for obj in page.get('Contents', []):
source_key = obj['Key']
last_modified = obj['LastModified']
# Skip files modified before last sync
if last_modified <= last_sync:
continue
# Check if file format is supported
if not any(source_key.lower().endswith(fmt) for fmt in SUPPORTED_FORMATS):
continue
# Build destination key by replacing source prefix with dest prefix
relative_path = source_key[len(source_prefix):]
dest_key = f"{dest_prefix}{relative_path}"
# Copy object from datalake to RAG data source bucket
s3_client.copy_object(
CopySource={'Bucket': SOURCE_BUCKET, 'Key': source_key},
Bucket=DEST_BUCKET,
Key=dest_key,
MetadataDirective='REPLACE',
Metadata={
'sync-timestamp': datetime.now(timezone.utc).isoformat(),
'source-key': source_key,
'source-bucket': SOURCE_BUCKET
}
)
synced_objects.append(dest_key)
# Update sync manifest with current timestamp
manifest = {
'last_sync_timestamp': datetime.now(timezone.utc).isoformat(),
'objects_synced': len(synced_objects),
'synced_keys': synced_objects
}
s3_client.put_object(
Bucket=DEST_BUCKET,
Key='staging/sync-manifest.json',
Body=json.dumps(manifest, indent=2),
ContentType='application/json'
)
return manifest
# Execute sync
result = sync_documents_from_datalake()
print(f"Sync complete: {result['objects_synced']} documents transferred")The sync process identifies new or updated documents by comparing the LastModified timestamp of each source object against the last successful sync timestamp stored in the manifest. Only documents modified after the last sync are transferred, minimizing unnecessary data movement and reducing costs. Deleted documents in the source datalake are handled separately — a reconciliation step compares the full set of source keys against destination keys and removes orphaned objects from the data source bucket.
After the sync completes, the pipeline can optionally trigger a Bedrock Knowledge Base ingestion job via the StartIngestionJob API. This ensures that newly synced documents are processed, chunked, embedded, and indexed in OpenSearch Serverless without manual intervention. The ingestion job detects which objects have changed since the last ingestion and only re-processes those documents, making incremental updates efficient even for large document collections.
Service Catalog Infrastructure
Provisioning Workflow
AWS Service Catalog provides a managed framework for organizing and distributing approved infrastructure templates to development teams. In the context of a RAG pipeline, Service Catalog acts as the orchestration layer that connects CloudFormation templates to a self-service provisioning experience. Rather than requiring each team to write and maintain their own infrastructure code, a central platform team publishes pre-approved CloudFormation templates as Service Catalog products. Developers then provision complete RAG infrastructure stacks by launching these products through the Service Catalog console or API, without needing direct access to CloudFormation or the underlying AWS services.
The end-to-end provisioning workflow follows a structured sequence from template authoring through operational infrastructure. The workflow begins with the platform team creating a portfolio — a logical container that groups related products and controls access. Within the portfolio, each product represents a single CloudFormation template that provisions one or more AWS resources. Products are versioned, allowing the platform team to iterate on templates while maintaining rollback capability.
The provisioning sequence operates as follows:
- Portfolio creation: The platform team creates a Service Catalog portfolio named for the RAG pipeline (e.g., "RAG Infrastructure Portfolio") and assigns IAM principals (users, groups, or roles) who are authorized to browse and launch products from it.
- Product definition: Each infrastructure component (S3 bucket, OpenSearch collection, Bedrock Knowledge Base, API Gateway) is defined as a separate product within the portfolio. Each product references a CloudFormation template stored in S3 or provided inline.
- Constraint configuration: Launch constraints attach an IAM role to each product, defining the permissions used during provisioning. This allows end users to provision resources they could not create directly, because the launch role carries the necessary permissions.
- Product launch: An authorized user selects a product, provides parameter values (environment name, resource tags, configuration options), and initiates provisioning. Service Catalog calls CloudFormation with the specified template and the launch constraint role.
- Stack creation: CloudFormation creates the resources defined in the template. Service Catalog tracks the provisioned product, linking it to the user who launched it and the underlying CloudFormation stack.
- Operational handoff: Once provisioning completes, the resources are live and connected. Output values from each stack (ARNs, endpoints, bucket names) are available for cross-referencing between components.
This workflow ensures that every RAG infrastructure deployment follows the same approved pattern, uses consistent naming conventions, applies mandatory tags, and operates under controlled IAM permissions. The platform team maintains the templates centrally, and updates propagate to all future provisioning operations through product versioning.
Portfolio & Products
A Service Catalog portfolio is the top-level organizational unit that groups related products and controls who can access them. For the RAG pipeline, a single portfolio contains all infrastructure products needed to stand up the complete system. The portfolio is shared with specific IAM principals — typically a developer role or a team group — who gain permission to browse products and launch provisioned instances.
Portfolio creation establishes the governance boundary. Only users granted access to the portfolio can see and launch its products. This access is managed through portfolio-principal associations, which link IAM users, groups, or roles to the portfolio. The platform team retains ownership of the portfolio and its products, while consumers interact only through the provisioning interface.
Each product within the portfolio represents a discrete infrastructure component. For the RAG pipeline, the portfolio contains four products:
- RAG S3 Data Source: Provisions the S3 bucket with versioning, encryption, public access blocks, and the IAM role trust policy for Bedrock access.
- RAG OpenSearch Collection: Provisions the OpenSearch Serverless vector search collection with encryption policies, network policies, and data access policies for Bedrock integration.
- RAG Bedrock Knowledge Base: Provisions the Bedrock Knowledge Base resource with data source configuration, embedding model selection, and vector store integration pointing to the OpenSearch collection.
- RAG API Gateway: Provisions the API Gateway REST API with resource definitions, Lambda integrations, request validators, and stage deployments for ingestion and retrieval endpoints.
Products are versioned using provisioning artifacts. Each artifact points to a specific CloudFormation template version stored in S3. When the platform team updates a template (for example, adding a new parameter or changing a default value), they create a new provisioning artifact version on the product. Existing provisioned instances remain on their original version until explicitly updated, providing stability for running infrastructure while allowing new deployments to use the latest template.
The sequence for configuring the portfolio and products follows a dependency order: create the portfolio first, then create each product with its CloudFormation template reference, then associate the portfolio with authorized principals, and finally attach launch constraints to each product. This ordering ensures that all components are in place before users attempt to provision infrastructure.
CloudFormation Templates
Each Service Catalog product references a CloudFormation template that defines the AWS resources to provision. The templates below demonstrate the infrastructure-as-code definitions for the four core RAG pipeline components. Each template accepts parameters for environment-specific configuration and outputs resource identifiers needed by downstream components.
The S3 data source template provisions a versioned, encrypted bucket with the IAM role and trust policy required for Bedrock Knowledge Base access:
AWSTemplateFormatVersion: '2010-09-09'
Description: 'RAG Pipeline - S3 Data Source Bucket with Bedrock Access Role'
Parameters:
EnvironmentName:
Type: String
Default: prod
AllowedValues: [dev, staging, prod]
ProjectTag:
Type: String
Default: rag-pipeline
Resources:
DataSourceBucket:
Type: AWS::S3::Bucket
Properties:
BucketName: !Sub 'rag-datasource-${EnvironmentName}-${AWS::AccountId}'
VersioningConfiguration:
Status: Enabled
BucketEncryption:
ServerSideEncryptionConfiguration:
- ServerSideEncryptionByDefault:
SSEAlgorithm: aws:kms
KMSMasterKeyID: !Ref DataSourceKMSKey
BucketKeyEnabled: true
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true
Tags:
- Key: Project
Value: !Ref ProjectTag
- Key: Environment
Value: !Ref EnvironmentName
- Key: ManagedBy
Value: ServiceCatalog
DataSourceKMSKey:
Type: AWS::KMS::Key
Properties:
Description: KMS key for RAG data source bucket encryption
KeyPolicy:
Version: '2012-10-17'
Statement:
- Sid: AllowRootAccountAccess
Effect: Allow
Principal:
AWS: !Sub 'arn:aws:iam::${AWS::AccountId}:root'
Action: 'kms:*'
Resource: '*'
- Sid: AllowBedrockDecrypt
Effect: Allow
Principal:
AWS: !GetAtt BedrockAccessRole.Arn
Action:
- kms:Decrypt
- kms:GenerateDataKey
Resource: '*'
BedrockAccessRole:
Type: AWS::IAM::Role
Properties:
RoleName: !Sub 'BedrockKBRole-${EnvironmentName}'
AssumeRolePolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Principal:
Service: bedrock.amazonaws.com
Action: sts:AssumeRole
Condition:
StringEquals:
aws:SourceAccount: !Ref AWS::AccountId
ArnLike:
aws:SourceArn: !Sub 'arn:aws:bedrock:${AWS::Region}:${AWS::AccountId}:knowledge-base/*'
Policies:
- PolicyName: S3DataSourceReadAccess
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- s3:GetObject
- s3:ListBucket
Resource:
- !GetAtt DataSourceBucket.Arn
- !Sub '${DataSourceBucket.Arn}/*'
Outputs:
BucketName:
Value: !Ref DataSourceBucket
Description: S3 bucket name for RAG data source
BucketArn:
Value: !GetAtt DataSourceBucket.Arn
Description: S3 bucket ARN
BedrockRoleArn:
Value: !GetAtt BedrockAccessRole.Arn
Description: IAM role ARN for Bedrock Knowledge Base access
The OpenSearch Serverless template provisions a vector search collection with the required security policies and a data access policy granting the Bedrock role permissions to manage indexes and documents:
AWSTemplateFormatVersion: '2010-09-09'
Description: 'RAG Pipeline - OpenSearch Serverless Vector Search Collection'
Parameters:
EnvironmentName:
Type: String
Default: prod
CollectionName:
Type: String
Default: rag-vectors
BedrockRoleArn:
Type: String
Description: ARN of the Bedrock Knowledge Base IAM role
Resources:
EncryptionPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub '${CollectionName}-enc'
Type: encryption
Policy: !Sub |
{
"Rules": [
{
"Resource": ["collection/${CollectionName}"],
"ResourceType": "collection"
}
],
"AWSOwnedKey": true
}
NetworkPolicy:
Type: AWS::OpenSearchServerless::SecurityPolicy
Properties:
Name: !Sub '${CollectionName}-net'
Type: network
Policy: !Sub |
[
{
"Rules": [
{
"Resource": ["collection/${CollectionName}"],
"ResourceType": "collection"
},
{
"Resource": ["collection/${CollectionName}"],
"ResourceType": "dashboard"
}
],
"AllowFromPublic": true
}
]
DataAccessPolicy:
Type: AWS::OpenSearchServerless::AccessPolicy
Properties:
Name: !Sub '${CollectionName}-access'
Type: data
Policy: !Sub |
[
{
"Rules": [
{
"Resource": ["collection/${CollectionName}"],
"Permission": [
"aoss:CreateCollectionItems",
"aoss:DeleteCollectionItems",
"aoss:UpdateCollectionItems",
"aoss:DescribeCollectionItems"
],
"ResourceType": "collection"
},
{
"Resource": ["index/${CollectionName}/*"],
"Permission": [
"aoss:CreateIndex",
"aoss:DeleteIndex",
"aoss:UpdateIndex",
"aoss:DescribeIndex",
"aoss:ReadDocument",
"aoss:WriteDocument"
],
"ResourceType": "index"
}
],
"Principal": ["${BedrockRoleArn}"],
"Description": "Bedrock KB access to vector collection"
}
]
VectorCollection:
Type: AWS::OpenSearchServerless::Collection
DependsOn:
- EncryptionPolicy
- NetworkPolicy
- DataAccessPolicy
Properties:
Name: !Ref CollectionName
Type: VECTORSEARCH
Description: !Sub 'Vector store for RAG pipeline - ${EnvironmentName}'
Tags:
- Key: Project
Value: rag-pipeline
- Key: Environment
Value: !Ref EnvironmentName
- Key: ManagedBy
Value: ServiceCatalog
Outputs:
CollectionId:
Value: !Ref VectorCollection
Description: OpenSearch Serverless collection ID
CollectionArn:
Value: !GetAtt VectorCollection.Arn
Description: Collection ARN for IAM policies
CollectionEndpoint:
Value: !GetAtt VectorCollection.CollectionEndpoint
Description: HTTPS endpoint for OpenSearch API calls
The Bedrock Knowledge Base template provisions the Knowledge Base resource, connecting the S3 data source to the OpenSearch vector store through the embedding model configuration:
AWSTemplateFormatVersion: '2010-09-09'
Description: 'RAG Pipeline - Bedrock Knowledge Base with S3 and OpenSearch Integration'
Parameters:
EnvironmentName:
Type: String
Default: prod
KnowledgeBaseName:
Type: String
Default: rag-knowledge-base
BedrockRoleArn:
Type: String
Description: ARN of the IAM role for Bedrock KB
DataSourceBucketArn:
Type: String
Description: ARN of the S3 data source bucket
OpenSearchCollectionArn:
Type: String
Description: ARN of the OpenSearch Serverless collection
EmbeddingModelId:
Type: String
Default: amazon.titan-embed-text-v1
AllowedValues:
- amazon.titan-embed-text-v1
- amazon.titan-embed-text-v2:0
- cohere.embed-english-v3
VectorIndexName:
Type: String
Default: rag-index
Resources:
KnowledgeBase:
Type: AWS::Bedrock::KnowledgeBase
Properties:
Name: !Sub '${KnowledgeBaseName}-${EnvironmentName}'
Description: !Sub 'RAG Knowledge Base for ${EnvironmentName} environment'
RoleArn: !Ref BedrockRoleArn
KnowledgeBaseConfiguration:
Type: VECTOR
VectorKnowledgeBaseConfiguration:
EmbeddingModelArn: !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/${EmbeddingModelId}'
StorageConfiguration:
Type: OPENSEARCH_SERVERLESS
OpensearchServerlessConfiguration:
CollectionArn: !Ref OpenSearchCollectionArn
VectorIndexName: !Ref VectorIndexName
FieldMapping:
VectorField: embedding
TextField: text
MetadataField: metadata
Tags:
Project: rag-pipeline
Environment: !Ref EnvironmentName
ManagedBy: ServiceCatalog
S3DataSource:
Type: AWS::Bedrock::DataSource
Properties:
KnowledgeBaseId: !Ref KnowledgeBase
Name: !Sub 's3-datasource-${EnvironmentName}'
Description: S3 bucket data source for document ingestion
DataSourceConfiguration:
Type: S3
S3Configuration:
BucketArn: !Ref DataSourceBucketArn
InclusionPrefixes:
- documents/
VectorIngestionConfiguration:
ChunkingConfiguration:
ChunkingStrategy: FIXED_SIZE
FixedSizeChunkingConfiguration:
MaxTokens: 300
OverlapPercentage: 20
Outputs:
KnowledgeBaseId:
Value: !Ref KnowledgeBase
Description: Bedrock Knowledge Base ID for API calls
KnowledgeBaseArn:
Value: !GetAtt KnowledgeBase.KnowledgeBaseArn
Description: Knowledge Base ARN
DataSourceId:
Value: !Ref S3DataSource
Description: Data source ID for ingestion job triggers
The API Gateway template provisions the REST API with resource paths, method integrations, request validators, and a deployment stage for the ingestion and retrieval endpoints:
AWSTemplateFormatVersion: '2010-09-09'
Description: 'RAG Pipeline - API Gateway REST API for Ingestion and Retrieval'
Parameters:
EnvironmentName:
Type: String
Default: prod
IngestionLambdaArn:
Type: String
Description: ARN of the ingestion Lambda function
RetrievalLambdaArn:
Type: String
Description: ARN of the retrieval Lambda function
StageName:
Type: String
Default: v1
Resources:
RagApi:
Type: AWS::ApiGateway::RestApi
Properties:
Name: !Sub 'rag-pipeline-api-${EnvironmentName}'
Description: REST API for RAG pipeline ingestion and retrieval
EndpointConfiguration:
Types:
- REGIONAL
Tags:
- Key: Project
Value: rag-pipeline
- Key: Environment
Value: !Ref EnvironmentName
- Key: ManagedBy
Value: ServiceCatalog
RequestValidator:
Type: AWS::ApiGateway::RequestValidator
Properties:
RestApiId: !Ref RagApi
Name: body-and-params-validator
ValidateRequestBody: true
ValidateRequestParameters: true
IngestionResource:
Type: AWS::ApiGateway::Resource
Properties:
RestApiId: !Ref RagApi
ParentId: !GetAtt RagApi.RootResourceId
PathPart: ingest
IngestionMethod:
Type: AWS::ApiGateway::Method
Properties:
RestApiId: !Ref RagApi
ResourceId: !Ref IngestionResource
HttpMethod: POST
AuthorizationType: AWS_IAM
RequestValidatorId: !Ref RequestValidator
Integration:
Type: AWS_PROXY
IntegrationHttpMethod: POST
Uri: !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${IngestionLambdaArn}/invocations'
RetrievalResource:
Type: AWS::ApiGateway::Resource
Properties:
RestApiId: !Ref RagApi
ParentId: !GetAtt RagApi.RootResourceId
PathPart: retrieve
RetrievalMethod:
Type: AWS::ApiGateway::Method
Properties:
RestApiId: !Ref RagApi
ResourceId: !Ref RetrievalResource
HttpMethod: POST
AuthorizationType: AWS_IAM
RequestValidatorId: !Ref RequestValidator
Integration:
Type: AWS_PROXY
IntegrationHttpMethod: POST
Uri: !Sub 'arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${RetrievalLambdaArn}/invocations'
ApiDeployment:
Type: AWS::ApiGateway::Deployment
DependsOn:
- IngestionMethod
- RetrievalMethod
Properties:
RestApiId: !Ref RagApi
ApiStage:
Type: AWS::ApiGateway::Stage
Properties:
RestApiId: !Ref RagApi
DeploymentId: !Ref ApiDeployment
StageName: !Ref StageName
MethodSettings:
- ResourcePath: '/*'
HttpMethod: '*'
ThrottlingBurstLimit: 100
ThrottlingRateLimit: 50
IngestionLambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref IngestionLambdaArn
Action: lambda:InvokeFunction
Principal: apigateway.amazonaws.com
SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${RagApi}/*/POST/ingest'
RetrievalLambdaPermission:
Type: AWS::Lambda::Permission
Properties:
FunctionName: !Ref RetrievalLambdaArn
Action: lambda:InvokeFunction
Principal: apigateway.amazonaws.com
SourceArn: !Sub 'arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${RagApi}/*/POST/retrieve'
Outputs:
ApiId:
Value: !Ref RagApi
Description: API Gateway REST API ID
ApiEndpoint:
Value: !Sub 'https://${RagApi}.execute-api.${AWS::Region}.amazonaws.com/${StageName}'
Description: Base URL for the RAG API
IngestionUrl:
Value: !Sub 'https://${RagApi}.execute-api.${AWS::Region}.amazonaws.com/${StageName}/ingest'
Description: Ingestion endpoint URL
RetrievalUrl:
Value: !Sub 'https://${RagApi}.execute-api.${AWS::Region}.amazonaws.com/${StageName}/retrieve'
Description: Retrieval endpoint URL
Each template follows a consistent structure: parameters for environment-specific values, resources defining the AWS infrastructure, and outputs exposing identifiers that downstream templates consume. The templates are designed to be provisioned in sequence — S3 first (produces the bucket ARN and Bedrock role ARN), then OpenSearch (produces the collection ARN and endpoint), then Bedrock Knowledge Base (consumes outputs from both), and finally API Gateway (consumes Lambda ARNs deployed separately). This dependency chain ensures each component has the references it needs at provisioning time.
Governance Controls
Service Catalog governance controls ensure that self-service provisioning operates within organizational guardrails. These controls prevent configuration drift, enforce security baselines, and maintain operational consistency across all RAG pipeline deployments — regardless of which team member initiates the provisioning. Three primary governance mechanisms work together: IAM launch constraints, approved template versioning, and resource tagging policies.
IAM Launch Constraints are the most critical governance mechanism. A launch constraint attaches a specific IAM role to a Service Catalog product, and that role is used to execute the CloudFormation stack creation — not the end user's own IAM permissions. This separation means the provisioning user does not need direct permissions to create S3 buckets, OpenSearch collections, or IAM roles. They only need permission to launch Service Catalog products. The launch constraint role carries the elevated permissions required to create infrastructure, while the user's access is limited to the Service Catalog API.
This pattern implements the principle of least privilege at the provisioning layer. A developer can deploy a complete RAG pipeline without having s3:CreateBucket, aoss:CreateCollection, or iam:CreateRole permissions in their own IAM policy. The launch constraint role is managed by the platform team and includes only the permissions needed for the specific product's CloudFormation template. If a template is updated to include a new resource type, the platform team updates the launch role accordingly.
Approved Template Versioning provides change control over infrastructure definitions. Each product in the portfolio has one or more provisioning artifact versions. The platform team controls which versions are available and can designate a specific version as the default. When a template needs modification — for example, adding a new encryption configuration or updating a default parameter — the platform team creates a new artifact version rather than modifying the existing one. This ensures that:
- Existing provisioned products remain stable on their deployed version
- New provisioning operations use the latest approved template
- Rollback is possible by reverting to a previous artifact version
- An audit trail exists showing which template version was used for each deployment
Version history is maintained indefinitely, and the platform team can retire old versions to prevent new deployments from using outdated templates while leaving existing stacks unaffected.
Resource Tagging Policies enforce consistent metadata across all provisioned resources. Service Catalog supports TagOptions — predefined tag key-value pairs associated with portfolios or products. When a user provisions a product, TagOptions are automatically applied to all resources created by the CloudFormation stack. For the RAG pipeline, mandatory tags include:
Project: rag-pipeline— identifies resources belonging to the RAG system for cost allocationEnvironment: dev|staging|prod— distinguishes deployment environmentsManagedBy: ServiceCatalog— indicates the resource was provisioned through the governed workflowOwner: {provisioning-user}— tracks who initiated the deployment for accountabilityCostCenter: {team-cost-center}— enables chargeback to the consuming team
These tags propagate to all resources in the stack, enabling consistent cost reporting, access control via tag-based IAM policies, and automated compliance scanning. Resources missing mandatory tags are flagged by AWS Config rules, providing a feedback loop that catches any drift from the tagging standard.
Together, these three governance mechanisms create a provisioning environment where developers have the autonomy to deploy infrastructure on demand while the platform team maintains control over what gets deployed, how it is configured, and who is accountable for each resource.
Pipeline Assembly
After all four Service Catalog products are provisioned, the individual infrastructure components must connect to form a functioning RAG pipeline. Each CloudFormation stack produces output values — ARNs, endpoints, bucket names, and resource IDs — that serve as the integration points between components. The pipeline assembly process wires these outputs together so that data flows from S3 through Bedrock into OpenSearch and back out through API Gateway.
The connection topology follows the data flow architecture of the RAG pipeline:
- S3 → Bedrock Knowledge Base: The Knowledge Base's data source configuration references the S3 bucket ARN (output from the S3 stack). The Bedrock IAM role (also output from the S3 stack) has a trust policy allowing Bedrock to assume it and read permissions on the bucket. This connection enables Bedrock to pull documents from S3 during ingestion sync jobs.
- Bedrock Knowledge Base → OpenSearch Serverless: The Knowledge Base's storage configuration references the OpenSearch collection ARN and endpoint (outputs from the OpenSearch stack). The data access policy on the collection grants the Bedrock role permission to create indexes, write documents, and perform searches. This connection enables Bedrock to index embeddings and execute retrieval queries against the vector store.
- API Gateway → Lambda → Bedrock: The API Gateway integration URIs reference Lambda function ARNs. The Lambda functions use the Bedrock Knowledge Base ID (output from the Knowledge Base stack) to call the
retrieveandretrieve_and_generateAPIs. Lambda execution roles include permissions forbedrock:Retrieveandbedrock:RetrieveAndGenerateactions on the Knowledge Base resource. - API Gateway → Lambda → S3: The ingestion Lambda function uses the S3 bucket name (output from the S3 stack) and the Knowledge Base data source ID (output from the Knowledge Base stack) to trigger sync operations. The Lambda execution role includes
s3:PutObjectpermissions on the bucket andbedrock:StartIngestionJobon the Knowledge Base.
In practice, these cross-stack references are resolved either through CloudFormation exports (where one stack exports a value and another imports it), through SSM Parameter Store (where stack outputs are written to parameters that other stacks or applications read), or through the Service Catalog provisioned product outputs API. The SSM Parameter Store approach is often preferred because it decouples the stacks — each stack writes its outputs to well-known parameter paths, and consuming stacks or Lambda functions read from those paths at runtime.
Once all connections are established, the pipeline operates as an integrated system. Documents uploaded to the S3 bucket are ingested by Bedrock Knowledge Base into the OpenSearch vector index. Queries submitted through the API Gateway endpoint trigger Lambda functions that call Bedrock's retrieval APIs, which search the OpenSearch index and generate LLM responses. The entire flow — from document upload to query response — traverses all four provisioned components in sequence, with each component performing its designated role in the pipeline.
Operational validation after assembly involves running a test ingestion (uploading a sample document to S3, triggering a sync job, and verifying the vector appears in OpenSearch) followed by a test retrieval (submitting a query through the API endpoint and confirming a relevant response is returned). This end-to-end smoke test confirms that all cross-component connections are correctly configured and that data flows through the complete pipeline as designed.
API Gateway for Ingestion & Retrieval
REST API Design
AWS API Gateway provides the HTTP interface layer for the RAG pipeline, exposing both ingestion and retrieval operations as RESTful endpoints. The API follows a resource-oriented design where each pipeline operation maps to a distinct resource path with appropriate HTTP methods. API Gateway handles request routing, input validation, throttling, and authorization before forwarding requests to backend Lambda functions that execute the actual pipeline logic.
The REST API is organized around two primary resource paths: /ingest for triggering data synchronization and ingestion jobs, and /retrieve for submitting queries and receiving generated responses. Each resource uses POST methods because both operations require request bodies containing configuration parameters. The API is deployed to a stage (e.g., prod) that provides the base URL for all endpoints.
The API design separates concerns cleanly: API Gateway manages the HTTP protocol layer (TLS termination, request validation, CORS, throttling), while Lambda functions contain the business logic for interacting with Bedrock, S3, and OpenSearch Serverless. This separation allows you to modify authorization policies, rate limits, or request schemas without changing the underlying Lambda code, and vice versa.
Key design decisions for the REST API include:
- API type: REST API (not HTTP API) — provides request validation models, usage plans, and API keys for consumer management
- Integration type: Lambda proxy integration — passes the full request context (headers, path parameters, body) to Lambda and returns the Lambda response directly to the client
- Authorization: IAM authorization — callers must sign requests with SigV4, restricting access to authenticated AWS principals with the appropriate IAM policy
- Request validation: Model-based validation — API Gateway validates request bodies against JSON Schema models before invoking Lambda, rejecting malformed requests at the gateway level
The following CloudFormation template defines the API Gateway REST API with its resource hierarchy, method configurations, and Lambda integrations for both the ingestion and retrieval endpoints:
AWSTemplateFormatVersion: '2010-09-09'
Description: API Gateway REST API for RAG Pipeline - Ingestion and Retrieval
Resources:
RagPipelineApi:
Type: AWS::ApiGateway::RestApi
Properties:
Name: rag-pipeline-api
Description: REST API for RAG ingestion sync and retrieval query operations
EndpointConfiguration:
Types:
- REGIONAL
# --- Ingestion Resource: /ingest ---
IngestResource:
Type: AWS::ApiGateway::Resource
Properties:
RestApiId: !Ref RagPipelineApi
ParentId: !GetAtt RagPipelineApi.RootResourceId
PathPart: ingest
IngestMethod:
Type: AWS::ApiGateway::Method
Properties:
RestApiId: !Ref RagPipelineApi
ResourceId: !Ref IngestResource
HttpMethod: POST
AuthorizationType: AWS_IAM
RequestValidatorId: !Ref RequestBodyValidator
RequestModels:
application/json: !Ref IngestRequestModel
Integration:
Type: AWS_PROXY
IntegrationHttpMethod: POST
Uri: !Sub
- arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaArn}/invocations
- LambdaArn: !GetAtt IngestLambdaFunction.Arn
IntegrationResponses:
- StatusCode: '200'
# --- Retrieval Resource: /retrieve ---
RetrieveResource:
Type: AWS::ApiGateway::Resource
Properties:
RestApiId: !Ref RagPipelineApi
ParentId: !GetAtt RagPipelineApi.RootResourceId
PathPart: retrieve
RetrieveMethod:
Type: AWS::ApiGateway::Method
Properties:
RestApiId: !Ref RagPipelineApi
ResourceId: !Ref RetrieveResource
HttpMethod: POST
AuthorizationType: AWS_IAM
RequestValidatorId: !Ref RequestBodyValidator
RequestModels:
application/json: !Ref RetrieveRequestModel
Integration:
Type: AWS_PROXY
IntegrationHttpMethod: POST
Uri: !Sub
- arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaArn}/invocations
- LambdaArn: !GetAtt RetrieveLambdaFunction.Arn
IntegrationResponses:
- StatusCode: '200'
# --- Request Validator ---
RequestBodyValidator:
Type: AWS::ApiGateway::RequestValidator
Properties:
RestApiId: !Ref RagPipelineApi
Name: validate-request-body
ValidateRequestBody: true
ValidateRequestParameters: false
# --- Request Models for Validation ---
IngestRequestModel:
Type: AWS::ApiGateway::Model
Properties:
RestApiId: !Ref RagPipelineApi
ContentType: application/json
Name: IngestRequest
Schema:
$schema: 'http://json-schema.org/draft-04/schema#'
type: object
required:
- knowledgeBaseId
- dataSourceId
properties:
knowledgeBaseId:
type: string
pattern: '^[A-Z0-9]{10}$'
dataSourceId:
type: string
pattern: '^[A-Z0-9]{10}$'
description:
type: string
maxLength: 256
RetrieveRequestModel:
Type: AWS::ApiGateway::Model
Properties:
RestApiId: !Ref RagPipelineApi
ContentType: application/json
Name: RetrieveRequest
Schema:
$schema: 'http://json-schema.org/draft-04/schema#'
type: object
required:
- query
- knowledgeBaseId
properties:
query:
type: string
minLength: 1
maxLength: 1000
knowledgeBaseId:
type: string
numberOfResults:
type: integer
minimum: 1
maximum: 25
searchType:
type: string
enum:
- SEMANTIC
- HYBRID
# --- API Deployment ---
ApiDeployment:
Type: AWS::ApiGateway::Deployment
DependsOn:
- IngestMethod
- RetrieveMethod
Properties:
RestApiId: !Ref RagPipelineApi
ApiStage:
Type: AWS::ApiGateway::Stage
Properties:
RestApiId: !Ref RagPipelineApi
DeploymentId: !Ref ApiDeployment
StageName: prod
ThrottlingBurstLimit: 100
ThrottlingRateLimit: 50
This template establishes the complete API structure: a REST API with two resource paths (/ingest and /retrieve), each with a POST method configured for IAM authorization and request body validation. The request models define JSON Schema constraints that API Gateway enforces before the request reaches Lambda — invalid payloads are rejected with a 400 status code at the gateway level, reducing unnecessary Lambda invocations and protecting downstream services from malformed input.
Lambda Integrations
API Gateway uses Lambda proxy integration to connect each API route to a dedicated Lambda function. With proxy integration, API Gateway passes the entire HTTP request (method, headers, path, query parameters, and body) as a structured event object to the Lambda handler. The Lambda function processes the request and returns a response object containing the status code, headers, and body — API Gateway forwards this directly to the client without transformation.
Each endpoint has its own Lambda function to maintain separation of concerns. The ingestion Lambda handles sync job orchestration with S3 and Bedrock Knowledge Bases, while the retrieval Lambda handles query processing with Bedrock's retrieve-and-generate API. Separate functions allow independent scaling, different memory/timeout configurations, and isolated IAM permissions following the principle of least privilege.
The Lambda functions require specific IAM permissions to interact with downstream AWS services. The ingestion function needs bedrock:StartIngestionJob and s3:ListBucket permissions, while the retrieval function needs bedrock:RetrieveAndGenerate and bedrock:Retrieve permissions. Both functions also need CloudWatch Logs permissions for operational monitoring. API Gateway itself requires lambda:InvokeFunction permission on each Lambda function, granted through a resource-based policy on the Lambda.
Authorization is handled at the API Gateway level using AWS IAM authorization. Every request must include a valid SigV4 signature, which means callers must have an IAM identity (user or role) with an attached policy granting execute-api:Invoke on the specific API resource ARN. This approach integrates naturally with other AWS services — for example, a Step Functions state machine or another Lambda function can call the API using its execution role without managing separate API keys or tokens.
Request validation is configured at the method level using API Gateway request models. Each model defines a JSON Schema that the request body must conform to. When a request fails validation, API Gateway returns a 400 Bad Request response immediately without invoking the Lambda function. This provides a first line of defense against malformed input and reduces costs by preventing unnecessary Lambda executions for invalid requests.
Ingestion Endpoint
The ingestion endpoint (POST /ingest) triggers a Bedrock Knowledge Base ingestion sync job. When called, it initiates the process of scanning the configured S3 data source for new or modified documents, chunking them, generating embeddings, and indexing the vectors in OpenSearch Serverless. The endpoint accepts the Knowledge Base ID and Data Source ID as required parameters, with an optional description field for tracking the sync job.
The ingestion sync is an asynchronous operation — the Lambda function starts the job and returns immediately with the job ID and status. Callers can poll the job status separately or rely on EventBridge notifications for completion. This design prevents API Gateway's 29-second timeout from being a constraint, since ingestion jobs can run for minutes or hours depending on the volume of documents to process.
The following Lambda handler implements the ingestion endpoint. It validates the request parameters, calls the Bedrock Agent API to start an ingestion job, and returns the job details to the caller:
import boto3
import json
import logging
from datetime import datetime
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_agent = boto3.client('bedrock-agent', region_name='us-east-1')
def lambda_handler(event, context):
"""
Lambda handler for POST /ingest endpoint.
Triggers a Bedrock Knowledge Base ingestion sync job.
"""
try:
# Parse and validate request body
body = json.loads(event.get('body', '{}'))
knowledge_base_id = body.get('knowledgeBaseId')
data_source_id = body.get('dataSourceId')
description = body.get('description', f'Sync triggered at {datetime.utcnow().isoformat()}')
if not knowledge_base_id or not data_source_id:
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'ValidationError',
'message': 'knowledgeBaseId and dataSourceId are required fields'
})
}
# Start the ingestion job via Bedrock Agent API
logger.info(f"Starting ingestion job for KB: {knowledge_base_id}, DS: {data_source_id}")
response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=knowledge_base_id,
dataSourceId=data_source_id,
description=description
)
ingestion_job = response['ingestionJob']
return {
'statusCode': 202,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'ingestionJobId': ingestion_job['ingestionJobId'],
'knowledgeBaseId': ingestion_job['knowledgeBaseId'],
'dataSourceId': ingestion_job['dataSourceId'],
'status': ingestion_job['status'],
'startedAt': ingestion_job['startedAt'].isoformat(),
'description': description
})
}
except bedrock_agent.exceptions.ConflictException as e:
logger.warning(f"Ingestion job conflict: {str(e)}")
return {
'statusCode': 409,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'ConflictError',
'message': 'An ingestion job is already running for this data source'
})
}
except bedrock_agent.exceptions.ValidationException as e:
logger.error(f"Bedrock validation error: {str(e)}")
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'ValidationError',
'message': str(e)
})
}
except Exception as e:
logger.error(f"Unexpected error starting ingestion job: {str(e)}")
return {
'statusCode': 500,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'InternalError',
'message': 'Failed to start ingestion job. Check CloudWatch logs for details.'
})
}
The handler returns a 202 Accepted status code on success, indicating that the ingestion job has been accepted for processing but has not yet completed. The response includes the job ID that callers can use to check status. Error handling covers three scenarios: a 409 Conflict when another job is already running for the same data source, a 400 Bad Request for invalid parameters, and a 500 Internal Server Error for unexpected failures.
A typical request and response exchange for the ingestion endpoint looks like this:
// POST https://api-id.execute-api.us-east-1.amazonaws.com/prod/ingest
// Headers: Content-Type: application/json, Authorization: AWS4-HMAC-SHA256 ...
// Request Body:
{
"knowledgeBaseId": "ABCDE12345",
"dataSourceId": "FGHIJ67890",
"description": "Weekly document sync - 2024-01-15"
}
// Response (202 Accepted):
{
"ingestionJobId": "ing-job-abc123def456",
"knowledgeBaseId": "ABCDE12345",
"dataSourceId": "FGHIJ67890",
"status": "STARTING",
"startedAt": "2024-01-15T10:30:00.000Z",
"description": "Weekly document sync - 2024-01-15"
}
Retrieval Endpoint
The retrieval endpoint (POST /retrieve) accepts a user query and returns a generated response grounded in the knowledge base content. This endpoint orchestrates the full retrieval-augmented generation flow: it embeds the query, performs a similarity search against the vector store, retrieves relevant document chunks, and passes them to an LLM for response generation — all through Bedrock's retrieve_and_generate API.
The endpoint accepts configurable search parameters that allow callers to tune retrieval behavior per request. The numberOfResults parameter controls how many document chunks are retrieved (default 5, maximum 25). The searchType parameter selects between semantic search (vector similarity only) and hybrid search (combining vector similarity with keyword matching for improved recall). These parameters give consumers flexibility to balance response quality against latency for their specific use case.
The following Lambda handler implements the retrieval endpoint. It extracts query parameters from the request, calls Bedrock's retrieve-and-generate API, and formats the response with both the generated answer and source citations:
import boto3
import json
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_agent_runtime = boto3.client('bedrock-agent-runtime', region_name='us-east-1')
def lambda_handler(event, context):
"""
Lambda handler for POST /retrieve endpoint.
Performs retrieve-and-generate using Bedrock Knowledge Base.
"""
try:
# Parse request body
body = json.loads(event.get('body', '{}'))
query = body.get('query')
knowledge_base_id = body.get('knowledgeBaseId')
num_results = body.get('numberOfResults', 5)
search_type = body.get('searchType', 'SEMANTIC')
if not query or not knowledge_base_id:
return {
'statusCode': 400,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'ValidationError',
'message': 'query and knowledgeBaseId are required fields'
})
}
# Configure retrieval parameters
retrieval_config = {
'vectorSearchConfiguration': {
'numberOfResults': num_results,
'overrideSearchType': search_type
}
}
# Call Bedrock retrieve-and-generate API
logger.info(f"Retrieval query: '{query[:100]}...' against KB: {knowledge_base_id}")
response = bedrock_agent_runtime.retrieve_and_generate(
input={'text': query},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': knowledge_base_id,
'modelArn': 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2',
'retrievalConfiguration': retrieval_config
}
}
)
# Extract generated response and citations
generated_text = response['output']['text']
citations = []
for citation in response.get('citations', []):
for reference in citation.get('retrievedReferences', []):
citations.append({
'content': reference['content']['text'][:200],
'source': reference.get('location', {}).get('s3Location', {}).get('uri', 'unknown'),
'score': reference.get('metadata', {}).get('score', None)
})
return {
'statusCode': 200,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'query': query,
'generatedResponse': generated_text,
'citations': citations,
'searchType': search_type,
'numberOfResults': num_results
})
}
except bedrock_agent_runtime.exceptions.ResourceNotFoundException as e:
logger.error(f"Knowledge base not found: {str(e)}")
return {
'statusCode': 404,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'NotFoundError',
'message': 'The specified knowledge base does not exist'
})
}
except bedrock_agent_runtime.exceptions.ThrottlingException as e:
logger.warning(f"Bedrock throttling: {str(e)}")
return {
'statusCode': 429,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'ThrottlingError',
'message': 'Request rate exceeded. Please retry after a brief delay.'
})
}
except Exception as e:
logger.error(f"Unexpected error during retrieval: {str(e)}")
return {
'statusCode': 500,
'headers': {'Content-Type': 'application/json'},
'body': json.dumps({
'error': 'InternalError',
'message': 'Failed to process retrieval query. Check CloudWatch logs for details.'
})
}
The retrieval handler returns a 200 OK response containing the generated answer along with citations that trace back to the source documents in S3. Each citation includes a snippet of the retrieved content and the S3 URI of the source document, enabling callers to provide attribution and allow users to verify the answer against original sources.
A typical request and response exchange for the retrieval endpoint demonstrates the full query-to-answer flow:
// POST https://api-id.execute-api.us-east-1.amazonaws.com/prod/retrieve
// Headers: Content-Type: application/json, Authorization: AWS4-HMAC-SHA256 ...
// Request Body:
{
"query": "How does the ingestion pipeline handle document chunking?",
"knowledgeBaseId": "ABCDE12345",
"numberOfResults": 5,
"searchType": "SEMANTIC"
}
// Response (200 OK):
{
"query": "How does the ingestion pipeline handle document chunking?",
"generatedResponse": "The ingestion pipeline handles document chunking through Bedrock Knowledge Bases, which supports two primary strategies. Fixed-size chunking splits documents into segments of a specified token count (typically 300 tokens) with a configurable overlap percentage (usually 20%) to preserve context at chunk boundaries. Semantic chunking uses the embedding model to identify natural breakpoints in the text, producing more coherent chunks at the cost of variable sizes. The chunking strategy is configured at Knowledge Base creation time and applies uniformly to all documents processed during ingestion sync jobs.",
"citations": [
{
"content": "Fixed-size chunking splits documents into segments of a specified token count with overlap to ensure context is not lost at chunk boundaries...",
"source": "s3://rag-knowledge-base-datasource-prod/documents/technical-guides/chunking-strategies.pdf",
"score": 0.89
},
{
"content": "Semantic chunking uses the embedding model to identify natural breakpoints in the text, producing more coherent chunks...",
"source": "s3://rag-knowledge-base-datasource-prod/documents/technical-guides/bedrock-knowledge-bases.md",
"score": 0.84
}
],
"searchType": "SEMANTIC",
"numberOfResults": 5
}
Error Handling
Error handling in the API Gateway layer operates at two levels: gateway-level validation errors that are caught before Lambda is invoked, and application-level errors returned by the Lambda functions when downstream service calls fail. This two-tier approach ensures that malformed requests are rejected cheaply at the gateway while legitimate requests that encounter runtime failures receive meaningful error responses with actionable information.
Request Validation Failures — When a request body fails to conform to the JSON Schema model defined on the API method, API Gateway returns a 400 Bad Request response without invoking the Lambda function. This catches issues like missing required fields, incorrect data types, values outside allowed ranges, and strings that do not match required patterns. The validation response includes a generic message indicating which constraint was violated.
Common validation failure scenarios include: submitting a retrieval query with an empty query string (violates minLength: 1), providing a numberOfResults value greater than 25 (violates maximum: 25), omitting the required knowledgeBaseId field, or sending a searchType value that is not in the allowed enum (SEMANTIC or HYBRID). In all these cases, the Lambda function is never invoked, saving compute costs and protecting downstream services from invalid input.
Lambda Invocation Failures — When the Lambda function itself fails (unhandled exception, timeout, or out-of-memory), API Gateway returns a 502 Bad Gateway or 503 Service Unavailable response to the client. The Lambda functions in this pipeline implement structured error handling to prevent unhandled exceptions and return appropriate HTTP status codes for different failure modes:
- 409 Conflict (ingestion): Returned when an ingestion job is already running for the specified data source. The caller should wait for the current job to complete before retrying.
- 404 Not Found (retrieval): Returned when the specified Knowledge Base ID does not exist or is not accessible to the Lambda's execution role.
- 429 Too Many Requests (retrieval): Returned when Bedrock throttles the request due to rate limits. The caller should implement exponential backoff and retry.
- 500 Internal Server Error (both): Returned for unexpected failures such as network timeouts to downstream services, SDK errors, or unhandled exceptions. The error response includes a generic message while detailed diagnostics are logged to CloudWatch.
For production deployments, API Gateway also provides built-in throttling at the stage and method level. The stage-level throttle (configured at 50 requests per second with a burst of 100 in the CloudFormation template above) prevents any single consumer from overwhelming the pipeline. When throttled, API Gateway returns a 429 response with a Retry-After header. Usage plans and API keys can further segment rate limits by consumer, allowing different clients to have different throughput allocations based on their service tier.
Monitoring and alerting for API errors should be configured through CloudWatch. Key metrics to track include 4XXError and 5XXError counts at the API stage level, IntegrationLatency for detecting slow Lambda responses, and Count for overall traffic patterns. Set CloudWatch alarms on elevated 5XX rates to detect systemic failures in the Lambda functions or downstream services early, before they impact a large number of callers.
Ingestion Workflow
End-to-End Flow
The ingestion workflow is the offline pipeline that transforms raw source documents into searchable vector embeddings stored in OpenSearch Serverless. This process runs asynchronously—typically triggered by a schedule or an API call—and prepares the knowledge base that the retrieval workflow queries at runtime. Understanding the full ingestion sequence is essential because each stage depends on the successful completion of the previous one, and failures at any point require specific recovery strategies.
The end-to-end ingestion sequence proceeds through five distinct stages:
- Datalake Sync to S3 — A Lambda function copies new or updated documents from the upstream datalake into the designated S3 data source bucket. This sync operation identifies changed files using object metadata (last modified timestamps or ETags) and transfers only the delta, minimizing data transfer costs and processing time.
- S3 Event Notification — Once documents land in the S3 bucket, an S3 event notification (or the completion of the sync Lambda) signals that new content is available for processing. This acts as the trigger for the next stage.
- Bedrock Knowledge Base Ingestion Job — A second Lambda function calls the Bedrock Agent Runtime API to start an ingestion job on the configured Knowledge Base. The ingestion job reads all documents from the S3 data source, identifies new or modified files since the last sync, and processes them through the chunking and embedding pipeline.
- Document Chunking and Embedding Generation — Bedrock Knowledge Bases splits each document into chunks based on the configured chunking strategy (fixed-size with overlap or semantic boundaries). Each chunk is then passed to the configured embedding model (e.g., Amazon Titan Embeddings V2) to produce a dense vector representation.
- OpenSearch Serverless Indexing — The generated embedding vectors, along with chunk text and metadata (source document URI, chunk position, document title), are indexed into the OpenSearch Serverless vector search collection. Once indexed, these vectors are immediately available for similarity search queries.
The following diagram illustrates the complete ingestion data flow, showing each component as a distinct stage with directional arrows indicating how data moves through the pipeline from the upstream datalake to the final vector index.
Each stage in this pipeline is idempotent by design. The S3 sync only transfers changed files, the Bedrock ingestion job only processes documents modified since the last successful sync, and OpenSearch Serverless upserts vectors by document chunk ID. This means you can safely re-run the entire pipeline without creating duplicate entries or corrupting the index.
S3 Sync
The first stage of the ingestion workflow synchronizes documents from an upstream datalake into the S3 bucket that serves as the Bedrock Knowledge Base data source. This Lambda function is typically invoked on a schedule (via EventBridge) or triggered by an API Gateway endpoint that allows administrators to initiate a manual sync.
The sync process follows these steps:
- List source objects — Query the datalake S3 bucket (or data catalog) to enumerate available documents, filtering by prefix, file extension, or last modified date.
- Compare with destination — For each source object, check whether it already exists in the destination bucket with the same ETag or content hash. Skip objects that have not changed since the last sync.
- Copy changed objects — Use the S3
copy_objectAPI to transfer new or modified documents from the source to the destination bucket, preserving the folder structure defined by the datalake organization (e.g.,documents/policies/,documents/technical/). - Record sync metadata — Write a sync manifest (timestamp, object count, any errors) to a metadata prefix in the destination bucket or to a DynamoDB table for audit and recovery purposes.
- Signal completion — Return a success response with the count of synced objects, which the orchestrating workflow uses to decide whether to trigger the Bedrock ingestion job.
The following Lambda function implements the datalake-to-S3 sync operation. It compares source and destination objects by ETag, copies only changed files, and returns a summary of the sync operation.
import boto3
import json
import logging
from datetime import datetime, timezone
logger = logging.getLogger()
logger.setLevel(logging.INFO)
s3_client = boto3.client('s3')
# Configuration
SOURCE_BUCKET = 'company-datalake-documents'
SOURCE_PREFIX = 'published/'
DEST_BUCKET = 'rag-knowledge-base-source'
DEST_PREFIX = 'documents/'
SYNC_MANIFEST_KEY = 'metadata/last-sync-manifest.json'
def lambda_handler(event, context):
"""
Sync documents from the upstream datalake bucket to the RAG
Knowledge Base S3 data source bucket. Only copies new or
modified objects based on ETag comparison.
"""
synced_count = 0
skipped_count = 0
errors = []
try:
# Step 1: List all objects in the source prefix
source_objects = list_objects(SOURCE_BUCKET, SOURCE_PREFIX)
logger.info(f"Found {len(source_objects)} objects in source")
# Step 2: List existing objects in destination for comparison
dest_objects = list_objects(DEST_BUCKET, DEST_PREFIX)
dest_etag_map = {obj['Key']: obj['ETag'] for obj in dest_objects}
# Step 3: Copy new or modified objects
for source_obj in source_objects:
source_key = source_obj['Key']
# Map source key to destination key
relative_path = source_key[len(SOURCE_PREFIX):]
dest_key = f"{DEST_PREFIX}{relative_path}"
# Skip if destination has same ETag (unchanged)
if dest_key in dest_etag_map:
if dest_etag_map[dest_key] == source_obj['ETag']:
skipped_count += 1
continue
# Copy object from source to destination
try:
s3_client.copy_object(
Bucket=DEST_BUCKET,
Key=dest_key,
CopySource={
'Bucket': SOURCE_BUCKET,
'Key': source_key
}
)
synced_count += 1
logger.info(f"Synced: {source_key} -> {dest_key}")
except Exception as e:
errors.append({
'key': source_key,
'error': str(e)
})
logger.error(f"Failed to sync {source_key}: {e}")
# Step 4: Write sync manifest for audit trail
manifest = {
'timestamp': datetime.now(timezone.utc).isoformat(),
'source_bucket': SOURCE_BUCKET,
'source_prefix': SOURCE_PREFIX,
'total_source_objects': len(source_objects),
'synced_count': synced_count,
'skipped_count': skipped_count,
'error_count': len(errors),
'errors': errors[:10] # Cap stored errors
}
s3_client.put_object(
Bucket=DEST_BUCKET,
Key=SYNC_MANIFEST_KEY,
Body=json.dumps(manifest, indent=2),
ContentType='application/json'
)
logger.info(
f"Sync complete: {synced_count} synced, "
f"{skipped_count} skipped, {len(errors)} errors"
)
return {
'statusCode': 200,
'body': json.dumps({
'message': 'Sync completed successfully',
'synced': synced_count,
'skipped': skipped_count,
'errors': len(errors),
'trigger_ingestion': synced_count > 0
})
}
except Exception as e:
logger.error(f"Sync failed: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'message': 'Sync failed',
'error': str(e)
})
}
def list_objects(bucket, prefix):
"""List all objects under a given prefix using pagination."""
objects = []
paginator = s3_client.get_paginator('list_objects_v2')
for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
if 'Contents' in page:
objects.extend(page['Contents'])
return objects
The function uses ETag comparison as a lightweight change detection mechanism. ETags for single-part uploads are MD5 hashes of the object content, making them reliable indicators of whether a file has changed. For multipart uploads, ETags include a part count suffix (e.g., etag-3), so if your datalake uses multipart uploads, consider storing a separate content hash in object metadata for more reliable comparison.
The trigger_ingestion flag in the response tells the orchestrating workflow whether any new documents were synced. If no documents changed, there is no need to run the Bedrock ingestion job, saving both time and API costs.
Bedrock Ingestion
Once documents are synced to the S3 data source bucket, the next stage initiates a Bedrock Knowledge Base ingestion job. This job instructs Bedrock to scan the configured S3 data source, identify new or modified documents, and process them through the chunking-embedding-indexing pipeline. The ingestion job is an asynchronous operation—you start it and then poll for completion or rely on EventBridge notifications.
The ingestion process follows these steps:
- Start ingestion job — Call the
start_ingestion_jobAPI on the Bedrock Agent client, specifying the Knowledge Base ID and the Data Source ID. This returns an ingestion job ID that you use to track progress. - Bedrock scans S3 — The Knowledge Base service reads all objects from the configured S3 prefix. It compares document checksums against its internal metadata store to identify which documents are new or modified since the last successful ingestion.
- Document processing — For each new or modified document, Bedrock parses the content (handling PDF extraction, HTML stripping, etc.), applies the configured chunking strategy, generates embeddings for each chunk, and indexes the vectors in the configured OpenSearch Serverless collection.
- Job completion — The ingestion job transitions to a
COMPLETEstatus when all documents have been processed. If any documents fail, the job may complete with aCOMPLETEstatus but include failure statistics in the job details.
The following Lambda function starts a Bedrock Knowledge Base ingestion job and polls for its completion. In production, you would typically use a Step Functions state machine with a wait loop instead of polling within a single Lambda invocation, but this example demonstrates the core API interactions.
import boto3
import json
import logging
import time
logger = logging.getLogger()
logger.setLevel(logging.INFO)
bedrock_agent = boto3.client('bedrock-agent')
# Configuration — set these from environment variables in production
KNOWLEDGE_BASE_ID = 'XXXXXXXXXX' # Your Knowledge Base ID
DATA_SOURCE_ID = 'YYYYYYYYYY' # Your Data Source ID
MAX_POLL_ATTEMPTS = 60 # Max polling iterations
POLL_INTERVAL_SECONDS = 30 # Seconds between status checks
def lambda_handler(event, context):
"""
Start a Bedrock Knowledge Base ingestion job and monitor
its progress. Returns the final job status and statistics.
"""
try:
# Step 1: Start the ingestion job
logger.info(
f"Starting ingestion job for KB: {KNOWLEDGE_BASE_ID}, "
f"DS: {DATA_SOURCE_ID}"
)
start_response = bedrock_agent.start_ingestion_job(
knowledgeBaseId=KNOWLEDGE_BASE_ID,
dataSourceId=DATA_SOURCE_ID,
description='Scheduled ingestion after datalake sync'
)
ingestion_job = start_response['ingestionJob']
job_id = ingestion_job['ingestionJobId']
logger.info(f"Ingestion job started: {job_id}")
# Step 2: Poll for job completion
for attempt in range(MAX_POLL_ATTEMPTS):
time.sleep(POLL_INTERVAL_SECONDS)
status_response = bedrock_agent.get_ingestion_job(
knowledgeBaseId=KNOWLEDGE_BASE_ID,
dataSourceId=DATA_SOURCE_ID,
ingestionJobId=job_id
)
job_status = status_response['ingestionJob']
current_status = job_status['status']
logger.info(
f"Poll {attempt + 1}: status={current_status}"
)
if current_status in ('COMPLETE', 'FAILED', 'STOPPED'):
break
else:
logger.warning(
f"Job {job_id} did not complete within polling window"
)
return {
'statusCode': 202,
'body': json.dumps({
'message': 'Ingestion job still in progress',
'jobId': job_id,
'status': current_status
})
}
# Step 3: Extract job statistics
statistics = job_status.get('statistics', {})
result = {
'jobId': job_id,
'status': current_status,
'statistics': {
'numberOfDocumentsScanned': statistics.get(
'numberOfDocumentsScanned', 0
),
'numberOfDocumentsIndexed': statistics.get(
'numberOfNewDocumentsIndexed', 0
),
'numberOfDocumentsFailed': statistics.get(
'numberOfDocumentsFailed', 0
),
'numberOfModifiedDocumentsIndexed': statistics.get(
'numberOfModifiedDocumentsIndexed', 0
)
}
}
if current_status == 'FAILED':
failure_reasons = job_status.get('failureReasons', [])
result['failureReasons'] = failure_reasons
logger.error(
f"Ingestion job failed: {failure_reasons}"
)
return {
'statusCode': 500,
'body': json.dumps(result)
}
logger.info(f"Ingestion job completed: {json.dumps(result)}")
return {
'statusCode': 200,
'body': json.dumps(result)
}
except Exception as e:
logger.error(f"Error managing ingestion job: {e}")
return {
'statusCode': 500,
'body': json.dumps({
'message': 'Failed to manage ingestion job',
'error': str(e)
})
}
The start_ingestion_job API is idempotent in the sense that starting a new job while a previous one is still running will return an error. Your orchestration logic should check for in-progress jobs before starting a new one, or use a Step Functions workflow with a choice state that skips ingestion if a job is already active.
The job statistics returned on completion provide visibility into what happened during ingestion: how many documents were scanned, how many were newly indexed, how many were updated, and how many failed. These metrics are critical for monitoring pipeline health and should be published to CloudWatch custom metrics for alerting.
Chunking & Embedding
Document chunking and embedding generation are handled automatically by Bedrock Knowledge Bases during the ingestion job, but understanding the configuration options is essential for optimizing retrieval quality. The chunking strategy determines how source documents are split into segments, and the embedding model converts those segments into vectors that capture semantic meaning.
Chunking Strategy Configuration
Bedrock Knowledge Bases supports three chunking strategies, configured at Knowledge Base creation time:
- Fixed-size chunking — Splits documents into segments of a specified token count (e.g., 300 tokens) with a configurable overlap percentage (e.g., 20%). The overlap ensures that context spanning a chunk boundary is captured in both adjacent chunks, preventing information loss at split points. This is the most predictable strategy and works well for homogeneous document collections.
- Semantic chunking — Uses the embedding model to identify natural breakpoints in the text where the topic or meaning shifts. This produces variable-length chunks that align with logical content boundaries (paragraphs, sections, topic transitions). Semantic chunking typically produces higher-quality retrieval results but is more computationally expensive during ingestion.
- No chunking — Treats each document as a single chunk. This is only appropriate for very short documents (under the embedding model's token limit) and is rarely used in production RAG pipelines.
For most RAG workloads, fixed-size chunking with 300 tokens and 20% overlap provides a good balance between retrieval precision and ingestion cost. Smaller chunks (100–200 tokens) improve precision but increase the total number of vectors and may fragment important context. Larger chunks (500–1000 tokens) preserve more context per result but may include irrelevant information that dilutes the retrieval signal.
Embedding Generation
After chunking, each text segment is passed to the configured embedding model. For Amazon Titan Embeddings V2, the model accepts text up to 8,192 tokens and produces vectors of configurable dimension (256, 512, or 1024). The embedding captures the semantic meaning of the chunk in a dense numerical representation that enables similarity comparison via cosine distance or dot product.
During the embedding phase, Bedrock Knowledge Bases automatically handles:
- Batching — Groups multiple chunks into batch requests to maximize throughput within the model's rate limits.
- Retry logic — Implements exponential backoff for throttled requests, ensuring all chunks are eventually processed even under heavy load.
- Metadata attachment — Associates each embedding with metadata fields including the source document S3 URI, chunk position within the document, and any custom metadata attributes defined in the Knowledge Base configuration.
Vector Indexing in OpenSearch Serverless
Once embeddings are generated, Bedrock Knowledge Bases writes them to the configured OpenSearch Serverless vector index. Each document in the index contains the embedding vector, the original chunk text (for retrieval display), and metadata fields. The index uses the HNSW algorithm for approximate nearest neighbor search, which builds a navigable graph structure over the vectors during indexing. This graph enables sub-millisecond query times even across millions of vectors.
The indexing operation is an upsert: if a chunk from a previously ingested document has been modified, its vector is updated in place rather than creating a duplicate entry. Bedrock tracks chunk identity using a combination of the source document URI and chunk position, ensuring that re-ingesting an updated document correctly replaces the old vectors without leaving orphaned entries in the index.
Failure & Recovery
Production ingestion pipelines must handle failures gracefully at every stage. Because the pipeline is sequential—each stage depends on the previous one—a failure at any point halts downstream processing. The recovery strategy differs depending on which stage failed and whether the failure is transient (retryable) or permanent (requires intervention).
S3 Sync Failure Scenarios
The datalake-to-S3 sync can fail for several reasons:
- Access denied (403) — The sync Lambda's IAM role lacks
s3:GetObjectpermission on the source bucket ors3:PutObjecton the destination bucket. Recovery: fix the IAM policy and re-run the sync. No data corruption occurs because the sync is additive. - Source bucket unavailable — The datalake bucket is temporarily unavailable or the objects have been moved. Recovery: the sync function logs the error and returns a failure response. The orchestrator retries on the next scheduled run. No partial state is left in the destination bucket because each object copy is atomic.
- Partial sync (some objects fail) — Network issues or throttling cause some
copy_objectcalls to fail while others succeed. Recovery: the sync manifest records which objects failed. On the next run, those objects are detected as "not yet synced" (missing or different ETag in destination) and are retried automatically. The pipeline is self-healing for partial failures. - Lambda timeout — If the document corpus is very large, the sync may exceed the Lambda 15-minute timeout. Recovery: implement pagination with a continuation token stored in DynamoDB, allowing the next invocation to resume where the previous one stopped. Alternatively, use Step Functions with a Map state to parallelize the sync across multiple Lambda invocations.
Bedrock Ingestion Job Failure Scenarios
The Bedrock ingestion job can fail at the job level or at the individual document level:
- Job-level failure (status: FAILED) — The entire ingestion job fails, typically due to misconfigured IAM roles (Knowledge Base cannot access S3 or OpenSearch), an invalid Knowledge Base configuration, or a service outage. Recovery: check the
failureReasonsfield in the job response, fix the underlying configuration issue, and restart the job. No partial vectors are written to the index when the job fails at this level. - Document-level failure — The job completes (status: COMPLETE) but some individual documents fail to process. Common causes include unsupported file formats, documents exceeding the embedding model's token limit, or corrupted file content. Recovery: check the
numberOfDocumentsFailedstatistic and review CloudWatch logs for the specific document URIs that failed. Fix or remove the problematic documents from S3 and re-run ingestion. Successfully processed documents are not affected. - Throttling during embedding — High-volume ingestion may trigger Bedrock model throttling (HTTP 429). Recovery: Bedrock Knowledge Bases handles retry logic internally with exponential backoff. If throttling is persistent, request a quota increase for the embedding model's invocations-per-minute limit through the AWS Service Quotas console.
- OpenSearch indexing failure — The vector store rejects writes due to capacity limits or index mapping conflicts (e.g., dimension mismatch between the embedding model output and the index configuration). Recovery: verify that the OpenSearch Serverless index vector dimension matches the embedding model output dimension. For capacity issues, OpenSearch Serverless auto-scales OCUs, but there may be a brief delay during scale-up events.
Recovery Best Practices
To build a resilient ingestion pipeline, implement these patterns:
- Idempotent operations — Design every stage to be safely re-runnable. The S3 sync uses ETag comparison, and Bedrock ingestion uses document checksums, so re-running the pipeline after a failure does not create duplicates.
- Dead letter queues — If the sync or ingestion Lambda fails after exhausting retries, route the failed event to an SQS dead letter queue for manual investigation.
- CloudWatch alarms — Set alarms on the sync Lambda error rate, ingestion job failure count, and the
numberOfDocumentsFailedmetric. Alert the operations team when failures exceed a threshold. - Sync manifests — Persist a manifest after each sync run recording what was transferred and what failed. This provides an audit trail and enables targeted re-processing of failed objects without re-syncing the entire corpus.
- Step Functions orchestration — Use AWS Step Functions to coordinate the sync → ingestion sequence with built-in retry policies, error handling states, and timeout management. This is more robust than chaining Lambda invocations directly.
Retrieval Workflow
Query Processing
The retrieval workflow is the online, user-facing path of the RAG pipeline. It transforms a natural language question into a grounded, context-aware response by orchestrating five sequential stages: query submission, search configuration, vector similarity search, context retrieval, and LLM response generation. Each stage builds on the output of the previous one, and the entire flow executes within a single API request-response cycle.
Stage 1: User Query Submission — A user submits a natural language question through the retrieval API endpoint exposed by API Gateway. The request arrives as an HTTPS POST with a JSON body containing the query text and optional search configuration parameters. The API Gateway validates the request structure against a defined model schema before forwarding it to the retrieval Lambda function.
Stage 2: Search Configuration — The retrieval Lambda function parses the incoming request and assembles the search configuration. This includes the search type (semantic vector search, hybrid, or keyword), the number of results to retrieve (top-k), similarity score thresholds, and optional metadata filters. These parameters are either provided explicitly in the request or fall back to default values configured in the Lambda environment.
Stage 3: Vector Similarity Search — The user's query text is embedded into a vector using the same Bedrock embedding model (Amazon Titan Embeddings V1) used during ingestion. This query vector is then submitted to OpenSearch Serverless for approximate nearest neighbor (ANN) search against the indexed document embeddings. The search engine returns the top-k document chunks ranked by cosine similarity score.
Stage 4: Context Retrieval — The similarity search results are filtered by the configured score threshold to exclude low-relevance chunks. The remaining chunks, along with their metadata (source document, chunk ID, category), form the context window that will be passed to the LLM. If metadata filters were specified in the request, only chunks matching those filters are included in the results.
Stage 5: LLM Response Generation — The retrieved context passages and the original user query are assembled into a structured prompt and sent to a Bedrock foundation model (such as Anthropic Claude or Amazon Titan Text). The LLM generates a natural language response grounded in the retrieved evidence. The Bedrock retrieve-and-generate API can orchestrate stages 3 through 5 in a single managed call, or you can implement each stage independently for finer control over the retrieval parameters.
Search Configuration
The retrieval API accepts a set of configuration parameters that control how the vector search is performed and how results are filtered. These parameters allow users to customize retrieval behavior per request, tuning the trade-off between precision and recall based on their specific use case. All configuration options are passed in the request body alongside the query text.
The following retrieval configuration options are available:
- query (required): The natural language question to search for. This text is embedded into a vector for similarity search.
- searchType: Controls the search strategy. Options include
SEMANTIC(pure vector similarity search),HYBRID(combines vector search with keyword matching for improved recall), andKEYWORD(traditional text-based search without embeddings). Defaults toSEMANTIC. - numberOfResults: The number of document chunks to retrieve (top-k). Accepts values between 1 and 100. Higher values provide more context but increase LLM token consumption and latency. Defaults to 5.
- similarityThreshold: Minimum cosine similarity score (0.0 to 1.0) for a chunk to be included in results. Chunks scoring below this threshold are discarded. Defaults to 0.5. For high-precision use cases, set to 0.7 or higher.
- metadataFilter: An optional filter object that restricts search results to chunks matching specific metadata field values. Supports equality, range, and list-based filters on indexed metadata fields like
category,source_document, oringestion_timestamp. - overrideSearchConfiguration: A nested object for advanced search tuning, including
vectorSearchConfigurationwith parameters likenumberOfResultsand filter expressions compatible with Bedrock Knowledge Base retrieval APIs.
The following code block shows the structure of a retrieval API request with all configurable search parameters. This request is sent as an HTTPS POST to the retrieval endpoint exposed by API Gateway:
{
"query": "How does the ingestion pipeline handle document chunking?",
"searchType": "SEMANTIC",
"numberOfResults": 5,
"similarityThreshold": 0.7,
"metadataFilter": {
"andAll": [
{
"equals": {
"key": "category",
"value": "technical-guides"
}
},
{
"greaterThan": {
"key": "ingestion_timestamp",
"value": "2024-01-01T00:00:00Z"
}
}
]
},
"overrideSearchConfiguration": {
"vectorSearchConfiguration": {
"numberOfResults": 10,
"overrideSearchType": "HYBRID"
}
}
}The metadataFilter object supports boolean combinators (andAll, orAll) and comparison operators (equals, notEquals, greaterThan, lessThan, in, notIn, startsWith). These filters are applied during the vector search phase, meaning only chunks whose metadata matches the filter criteria are considered as candidates. This is more efficient than post-filtering because it reduces the search space before computing similarity scores.
The searchType parameter significantly affects retrieval quality. Semantic search excels when the user's query uses different terminology than the source documents (because embeddings capture meaning, not exact words). Hybrid search combines the strengths of both approaches — it retrieves results that are semantically similar and also contain keyword matches, improving recall for queries where specific technical terms matter. Keyword search is useful as a fallback when vector search returns no results or when the query contains exact identifiers like error codes or resource ARNs.
Retrieve-and-Generate
AWS Bedrock provides a managed retrieve_and_generate API that orchestrates the entire retrieval-to-response pipeline in a single call. This API accepts a user query, performs vector search against a configured Knowledge Base, retrieves relevant document chunks, and passes them as context to a foundation model for response generation. Using this managed API eliminates the need to implement each retrieval stage independently, reducing code complexity while maintaining full configurability through API parameters.
The retrieve_and_generate call requires three key inputs: the user's query text, the Knowledge Base ID (which identifies the vector store and embedding model configuration), and the model ARN for the foundation model that will generate the response. You can also pass retrieval configuration overrides to control the number of results, search type, and metadata filters on a per-request basis.
The following code demonstrates how to call the Bedrock Agent Runtime retrieve_and_generate API from a Lambda function, including search configuration overrides and response extraction:
import boto3
import json
bedrock_agent_runtime = boto3.client(
service_name='bedrock-agent-runtime',
region_name='us-east-1'
)
def retrieve_and_generate(query: str, config: dict) -> dict:
"""
Execute the Bedrock retrieve-and-generate API to perform
vector search and LLM response generation in a single call.
Args:
query: The user's natural language question
config: Search configuration parameters from the API request
Returns:
Dictionary containing the generated response and source citations
"""
knowledge_base_id = 'KB-RAG-PROD-001'
model_arn = 'arn:aws:bedrock:us-east-1::foundation-model/anthropic.claude-v2'
# Build retrieval configuration from request parameters
retrieval_config = {
'vectorSearchConfiguration': {
'numberOfResults': config.get('numberOfResults', 5),
'overrideSearchType': config.get('searchType', 'SEMANTIC')
}
}
# Add metadata filter if provided in the request
if 'metadataFilter' in config:
retrieval_config['vectorSearchConfiguration']['filter'] = config['metadataFilter']
# Call the retrieve-and-generate API
response = bedrock_agent_runtime.retrieve_and_generate(
input={
'text': query
},
retrieveAndGenerateConfiguration={
'type': 'KNOWLEDGE_BASE',
'knowledgeBaseConfiguration': {
'knowledgeBaseId': knowledge_base_id,
'modelArn': model_arn,
'retrievalConfiguration': {
'vectorSearchConfiguration': retrieval_config['vectorSearchConfiguration']
},
'generationConfiguration': {
'inferenceConfig': {
'textInferenceConfig': {
'maxTokens': 1024,
'temperature': 0.2,
'topP': 0.9
}
}
}
}
}
)
# Extract the generated response and source citations
generated_text = response['output']['text']
citations = []
for citation in response.get('citations', []):
for reference in citation.get('retrievedReferences', []):
citations.append({
'text': reference['content']['text'],
'source': reference['location']['s3Location']['uri'],
'score': reference.get('metadata', {}).get('score', None)
})
return {
'answer': generated_text,
'citations': citations,
'sessionId': response.get('sessionId')
}
# Example invocation from the retrieval Lambda handler
search_config = {
'numberOfResults': 5,
'searchType': 'SEMANTIC',
'metadataFilter': {
'equals': {
'key': 'category',
'value': 'technical-guides'
}
}
}
result = retrieve_and_generate(
query="How does the ingestion pipeline handle document chunking?",
config=search_config
)
print(f"Answer: {result['answer']}")
print(f"Sources: {len(result['citations'])} citations")
for cite in result['citations']:
print(f" - {cite['source']}")
The retrieve_and_generate API returns a structured response containing the generated text, a list of citations with source references, and an optional session ID for multi-turn conversations. Each citation includes the retrieved text chunk, the S3 URI of the source document, and metadata about the retrieval. This citation structure enables your application to display source attribution alongside the generated answer, giving users transparency into which documents informed the response.
The generationConfiguration block controls LLM inference parameters. A low temperature (0.2) keeps responses factual and grounded in the retrieved context. The maxTokens parameter limits response length — set this based on your expected answer complexity. For concise factual answers, 512 tokens is sufficient; for detailed explanations with multiple source references, 1024–2048 tokens provides adequate space.
If you need more control over the retrieval stage (for example, to implement custom re-ranking, apply post-retrieval filtering, or use a different LLM), you can use the separate retrieve API instead. This API performs only the vector search and returns raw chunks without LLM generation, allowing you to implement the generation stage independently with custom prompt engineering.
Response Formatting
The retrieval Lambda function formats the raw output from the Bedrock retrieve-and-generate API into a structured JSON response that the client application can consume. The response includes the generated answer, source citations with document references, metadata about the search execution, and status information. This formatting layer normalizes the Bedrock API output into a consistent contract that frontend applications can rely on regardless of changes to the underlying retrieval implementation.
The response structure is designed to support common UI patterns: displaying the answer with inline citations, showing a list of source documents, and providing confidence indicators based on retrieval scores. The Lambda function also adds request metadata (query echo, execution time, result count) to support debugging and observability.
The following code shows how the retrieval Lambda handler formats the response before returning it through API Gateway:
import json
import time
from datetime import datetime, timezone
def format_retrieval_response(query: str, rag_result: dict, start_time: float) -> dict:
"""
Format the RAG retrieval result into a structured API response.
Args:
query: The original user query
rag_result: Output from the retrieve_and_generate function
start_time: Request start timestamp for latency calculation
Returns:
Formatted API Gateway response with headers and JSON body
"""
execution_time_ms = int((time.time() - start_time) * 1000)
# Build the response body with answer, citations, and metadata
response_body = {
"status": "success",
"query": query,
"answer": rag_result['answer'],
"citations": [
{
"sourceDocument": cite['source'],
"relevantText": cite['text'][:500], # Truncate long passages
"relevanceScore": cite.get('score')
}
for cite in rag_result.get('citations', [])
],
"metadata": {
"resultCount": len(rag_result.get('citations', [])),
"executionTimeMs": execution_time_ms,
"timestamp": datetime.now(timezone.utc).isoformat(),
"sessionId": rag_result.get('sessionId'),
"model": "anthropic.claude-v2",
"knowledgeBaseId": "KB-RAG-PROD-001"
}
}
# Return API Gateway proxy integration response format
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"X-Request-Id": rag_result.get('sessionId', 'unknown'),
"X-Execution-Time-Ms": str(execution_time_ms)
},
"body": json.dumps(response_body, indent=2)
}
def lambda_handler(event, context):
"""
Retrieval Lambda handler — processes the query and returns
a formatted response through API Gateway.
"""
start_time = time.time()
# Parse the incoming request body
body = json.loads(event.get('body', '{}'))
query = body.get('query', '')
search_config = {
'numberOfResults': body.get('numberOfResults', 5),
'searchType': body.get('searchType', 'SEMANTIC'),
'metadataFilter': body.get('metadataFilter')
}
# Execute retrieval and generation
rag_result = retrieve_and_generate(query, search_config)
# Format and return the response
return format_retrieval_response(query, rag_result, start_time)
The formatted response provides a clear contract for client applications. The status field indicates whether the request succeeded, the answer field contains the LLM-generated response, and the citations array provides source attribution with relevance scores. The metadata block includes operational information useful for monitoring and debugging — execution time helps identify slow queries, result count indicates retrieval quality, and the session ID enables multi-turn conversation tracking.
Response headers include CORS configuration for browser-based clients, a request ID for tracing, and execution time for client-side performance monitoring. The relevantText field in each citation is truncated to 500 characters to keep response payloads manageable while still providing enough context for the client to display meaningful source previews.
No-Results Fallback
When a retrieval query returns no document chunks above the configured similarity threshold, the system must handle this gracefully rather than passing an empty context to the LLM (which would produce a hallucinated response) or returning a raw error to the user. The no-results fallback behavior ensures that users receive a clear, informative response explaining that no relevant information was found, along with suggestions for refining their query.
The fallback logic is triggered in two scenarios. First, when the vector search returns zero results entirely — this can happen if the query is completely unrelated to any indexed content or if restrictive metadata filters exclude all candidates. Second, when results are returned but all similarity scores fall below the configured threshold — this indicates that the indexed content is only tangentially related to the query and not relevant enough to produce a reliable answer.
When the fallback is triggered, the system skips the LLM generation stage entirely. Sending an empty or irrelevant context to the LLM would likely produce a response that either admits it cannot answer (wasting latency and cost) or worse, generates a plausible-sounding but unsupported answer. Instead, the Lambda function returns a structured fallback response that communicates the situation clearly to the client application:
import json
import time
from datetime import datetime, timezone
def handle_no_results_fallback(query: str, search_config: dict, start_time: float) -> dict:
"""
Generate a fallback response when retrieval returns no results
above the similarity threshold.
Args:
query: The original user query
search_config: The search configuration that was used
start_time: Request start timestamp for latency calculation
Returns:
API Gateway response with fallback message and suggestions
"""
execution_time_ms = int((time.time() - start_time) * 1000)
threshold = search_config.get('similarityThreshold', 0.5)
fallback_body = {
"status": "no_results",
"query": query,
"answer": None,
"message": (
"No relevant information was found in the knowledge base "
"for your query. The search did not return any documents "
f"with a relevance score above the threshold ({threshold})."
),
"suggestions": [
"Try rephrasing your question using different terminology",
"Broaden your query by removing specific technical terms",
"Remove metadata filters to search across all document categories",
"Lower the similarity threshold if you want less precise but broader results"
],
"citations": [],
"metadata": {
"resultCount": 0,
"executionTimeMs": execution_time_ms,
"timestamp": datetime.now(timezone.utc).isoformat(),
"similarityThreshold": threshold,
"searchType": search_config.get('searchType', 'SEMANTIC'),
"metadataFilterApplied": 'metadataFilter' in search_config,
"fallbackTriggered": True
}
}
return {
"statusCode": 200,
"headers": {
"Content-Type": "application/json",
"Access-Control-Allow-Origin": "*",
"X-Execution-Time-Ms": str(execution_time_ms),
"X-Fallback-Triggered": "true"
},
"body": json.dumps(fallback_body, indent=2)
}
def lambda_handler(event, context):
"""
Retrieval Lambda handler with no-results fallback logic.
"""
start_time = time.time()
body = json.loads(event.get('body', '{}'))
query = body.get('query', '')
search_config = {
'numberOfResults': body.get('numberOfResults', 5),
'searchType': body.get('searchType', 'SEMANTIC'),
'similarityThreshold': body.get('similarityThreshold', 0.5),
'metadataFilter': body.get('metadataFilter')
}
# Execute retrieval
rag_result = retrieve_and_generate(query, search_config)
# Check if results meet the similarity threshold
valid_citations = [
cite for cite in rag_result.get('citations', [])
if cite.get('score', 0) >= search_config['similarityThreshold']
]
# Trigger fallback if no valid results
if not valid_citations and not rag_result.get('answer'):
return handle_no_results_fallback(query, search_config, start_time)
# Normal response path
return format_retrieval_response(query, rag_result, start_time)
The fallback response uses HTTP status code 200 (not 404 or 500) because the request was processed successfully — the absence of results is a valid outcome, not an error. The status field is set to "no_results" to distinguish it from successful responses, and the answer field is explicitly null to signal that no LLM generation occurred. The X-Fallback-Triggered response header provides a machine-readable signal for client applications to detect the fallback without parsing the response body.
The suggestions array provides actionable guidance that client applications can display to help users refine their query. These suggestions are context-aware: if a metadata filter was applied, the system suggests removing it; if the threshold is high, it suggests lowering it. The metadata block includes fallbackTriggered: true and the applied threshold value, enabling monitoring dashboards to track how often queries fail to find relevant content — a signal that the knowledge base may need additional documents or that the similarity threshold needs tuning.
Client applications should handle the "no_results" status gracefully by displaying the message and suggestions to the user rather than showing a generic error. This transparency builds user trust — they understand that the system did not find relevant information rather than assuming it failed or produced an unreliable answer. For production deployments, tracking the no-results rate over time helps identify gaps in the knowledge base content that should be addressed through additional document ingestion.
