Applying RAG pattern to navigate your knowledge store
Build a knowledge chatbot powered by OpenSearch® and Amazon Bedrock
Build a knowledge chatbot powered by OpenSearch® and Amazon Bedrock
Retrieval-augmented generation (RAG) enriches large language models (LLMs) by incorporating an external knowledge source during predictions to provide more context, historical data, and relevant information. We can use it for a variety of cases:
RAG offers a significantly cheaper and faster alternative to retraining or fine-tuning an existing model. That's why, RAG workloads are starting to be adopted across retail, financial services, and healthcare customers.
In this article we'll show you and example of such scenario, how you can create a chatbot to answer questions which require specific technical knowledge.
To create the chatbot we'll use Amazon Bedrock foundation models, OpenSearch® will play the role of a vector database and to create a pipeline we'll rely on AWS Serverless Application Model. The overall architecture can be split into two parts. First, we process each documentation entry to create an embedding and store in OpenSearch, next we set up a Lambda function to accept user's question, enhance the prompt by adding additional context and then pass it to LLM to retrieve an answer:
To follow along you'll need:
All necessary code you'll find in this github repository. We recommend you clone the code to run the commands we'll describe below.
Start by creating Aiven for OpenSearch service, this is where we store the embeddings for the documents.
If you don't have AWS CLI installed on your machine follow these steps to set it up.
We'll also use AWS Serverless Application Model (AWS SAM) to simplify the process of deploying resources on AWS. Follow these instructions to install SAM CLI.
To start using AWS CLI we need to configure it and provide authentication information:
aws configure and specify AWS Access Key ID and AWS Secret Access Key.Note, that these permissions are necessary to allow SAM CLI to deploy the resources while granular permissions for each resource are specified individually in SAM template.
Access to Amazon Bedrock foundation models isn't granted by default. To get Go to Amazon Bedrock Model access. Click on "Manage model access" and select two items:
Titan Embeddings G1 that we need to convert our text documents into embeddings.
To create AWS pipeline and deploy all related resources together as a single entity we use AWS SAM. With AWS SAM we can list all resources and their requirements within SAM template. You can find the complete template's code in the GitHub repo.
The most interesting part happens in the section Resources, where we specify which AWS services we want to deploy and with which settings. Let's go one by one and see what they do:
DocumentBucket creates a storage bucket on Amazon S3. This is where we'll add documents with external knowledge. Whenever a new document is added, upload notifications are sent to an SQS queue. Additionally, here we block public access to the bucket and specify the rules for cross-origin resource sharing for security reasons.
Loading code...
DocumentBucketPolicy sets a policy for each document bucket, ensuring that access is denied over non-secure (HTTP) connections.
Loading code...
EmbeddingQueue creates a queue in Amazon SQS. This queue will receive messages triggered by documents being uploaded to S3.
Loading code...
EmbeddingQueuePolicy sets a policy allowing S3 to send messages to the EmbeddingQueue in SQS.
Loading code...
GenerateEmbeddingsFunction creates a Lambda function responsible for generating embeddings. It's configured to have access to the necessary environment variables and is triggered by incoming messages in the EmbeddingQueue.
Loading code...
The code of this function is take from src/generate_embeddings of the main.py file:
Loading code...
StreamingFunction creates another Lambda function. Its purpose is to handle streaming responses. It's granted permissions to perform certain actions related to managed service Bedrock.
Loading code...
The code is taken from src/generate_response_streaming and written in JavaScript, because at the moment Lambda supports response streaming only on Node.js managed runtimes. Here is the content of index.js:
Loading code...
StreamingFunctionInvocationURL provides a URL for invoking the StreamingFunction in streaming mode. This URL requires AWS IAM authentication.
Loading code...
Time to bring our SAM template into action: deploy and build our application.
Make sure that your environment has python at least of version 3.11 and run:
sam build
if you experience problems try running a verbose version of the command with:
sam build --debug
Once the project is built, we can deploy it. Run
sam deploy --guided
and provide the service URI of the OpenSearch service that we've created:
Wait for the resources being deployed.
Now that our pipeline is running, time to flow some knowledge into it. You can use any text documents that you have, I used a set of documentation article for Aiven platform.
Go to the S3 bucket that was just created and upload the text files there.
Each time a new document is uploaded, it is added to an embedding queue, transformed into embedding using AWS Bedrock model and the embedding is uploaded into OpenSearch.
And finally it is time to run the test and invoke our function! There are different ways how you can achieve this.
By default the way we set up our Lambda function, it requires AWS IAM authentication to have access to execute it. Alternatively you can also grant public access to your function URL:
If you choose auth type NONE, anyone with the URL can access your function.
You can run it directly from the terminal using CURL:
In the example above we've asked a question how to set up ClickHouse. One of the provided documents contained instructions describing how to create a service Aiven for ClickHouse. Vector search returned those documents to LLM and the information was used as a part of response. If you run this live you'll also see that the response was output as a chain of text chunk by chunk.
Note, that this Bedrock model doesn't like very short questions, so make sure to ask something that is sufficiently descriptive. If you have problems, check out Lambda logs in CloudWatch.
To clean up all resources created in this article:
sam delete to terminate and remove created AWS resources.After following this tutorial you've learned how to build a knowledge chatbot powered by OpenSearch and AWS Bedrock . If you're interested in related topics check out these articles:
DocumentBucket:
Type: "AWS::S3::Bucket"
Properties:
BucketName: !Sub "${AWS::StackName}-${AWS::Region}-${AWS::AccountId}"
NotificationConfiguration:
QueueConfigurations:
- Event: 's3:ObjectCreated:*'
Queue: !GetAtt EmbeddingQueue.Arn
CorsConfiguration:
CorsRules:
- AllowedHeaders:
- "*"
AllowedMethods:
- GET
- PUT
- HEAD
- POST
- DELETE
AllowedOrigins:
- "*"
PublicAccessBlockConfiguration:
BlockPublicAcls: true
BlockPublicPolicy: true
IgnorePublicAcls: true
RestrictPublicBuckets: true DocumentBucketPolicy:
Type: "AWS::S3::BucketPolicy"
Properties:
PolicyDocument:
Id: EnforceHttpsPolicy
Version: "2012-10-17"
Statement:
- Sid: EnforceHttpsSid
Effect: Deny
Principal: "*"
Action: "s3:*"
Resource:
- !Sub "arn:aws:s3:::${DocumentBucket}/*"
- !Sub "arn:aws:s3:::${DocumentBucket}"
Condition:
Bool:
"aws:SecureTransport": "false"
Bucket: !Ref DocumentBucket EmbeddingQueue:
Type: AWS::SQS::Queue
DeletionPolicy: Delete
UpdateReplacePolicy: Delete
Properties:
VisibilityTimeout: 180
MessageRetentionPeriod: 3600 EmbeddingQueuePolicy:
Type: AWS::SQS::QueuePolicy
Properties:
Queues:
- !Ref EmbeddingQueue
PolicyDocument:
Version: "2012-10-17"
Id: SecureTransportPolicy
Statement:
Effect: Allow
Principal:
Service: "s3.amazonaws.com"
Action:
- "sqs:SendMessage"
Resource: !GetAtt EmbeddingQueue.ArnGenerateEmbeddingsFunction:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/generate_embeddings/
Timeout: 180
MemorySize: 2048
Policies:
- SQSPollerPolicy:
QueueName: !GetAtt EmbeddingQueue.QueueName
- S3CrudPolicy:
BucketName: !Ref DocumentBucket
- Statement:
- Sid: "BedrockScopedAccess"
Effect: "Allow"
Action: "bedrock:InvokeModel"
Resource: "arn:aws:bedrock:*::foundation-model/amazon.titan-embed-text-v1"
Environment:
Variables:
OPENSEARCH_URL: !Ref OpensearchURL
BUCKET: !Ref DocumentBucket
Events:
EmbeddingQueueEvent:
Type: SQS
Properties:
Queue: !GetAtt EmbeddingQueue.Arn
BatchSize: 1def lambda_handler(event, context):
# identify filename
event_body = json.loads(event["Records"][0]["body"])
print(event_body)
key = event_body["Records"][0]["s3"]["object"]["key"]
file_name_full = key.split("/")[-1]
# download data record
s3.download_file(BUCKET, key, f"/tmp/{file_name_full}")
loader = TextLoader(f'/tmp/{file_name_full}')
document = loader.load()
# setup Bedrock Embeddings for OpenSearch
bedrock_runtime = boto3.client(
service_name="bedrock-runtime",
region_name="us-east-1",
)
# Interface for embedding models
model = BedrockEmbeddings(
model_id="amazon.titan-embed-text-v1",
client=bedrock_runtime,
region_name="us-east-1",
)
vector_search = OpenSearchVectorSearch(OPENSEARCH_URL, index_name, model)
# perform vector search
response = vector_search.from_documents(
documents=document,
embedding=model,
opensearch_url=OPENSEARCH_URL,
use_ssl = True,
index_name=index_name,
bulk_size=5000,
vector_field="embedding"
)
print(response) GenerateResponseStreaming:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/generate_response_streaming
Handler: index.handler
Runtime: nodejs18.x
Timeout: 30
MemorySize: 256
Policies:
- Statement:
- Effect: Allow
Action: 'bedrock:*'
Resource: '*'
Environment:
Variables:
OPENSEARCH_URL: !Ref OpensearchURLconst runChain = async (query, responseStream) => {
try {
// create OpenSearch client
const openSearchClient = new Client({
nodes: process.env.OPENSEARCH_URL
});
// setup Bedrock Embeddings for OpenSearch
// interface for embedding models
const model = new BedrockEmbeddings({region: awsRegion});
const index_name = "knowledge-embeddings"
const vectorStore = new OpenSearchVectorStore(model, {
openSearchClient,
indexName: index_name,
vector_field: index_name
});
const retriever = vectorStore.asRetriever();
// define prompt template
const prompt = PromptTemplate.fromTemplate(
`Answer the following question based on the following context:
{context}
Question: {question}`
);
const llmModel = new BedrockChat({
model: 'cohere.command-text-v14',
region: awsRegion,
streaming: true,
maxTokens: 1000,
});
// create streaming chain
const chain = RunnableSequence.from([
{
context: retriever.pipe(formatDocumentsAsString),
question: new RunnablePassthrough()
},
prompt,
llmModel,
new StringOutputParser()
]);
const stream = await chain.stream(query);
for await (const chunk of stream) {
responseStream.write(chunk);
}
responseStream.end();
} catch (error) {
// Output the error
responseStream.write(`Error: ${error.message}`);
responseStream.end();
}
};
export const handler = awslambda.streamifyResponse(async (event, responseStream, _context) => {
console.log(JSON.stringify(event));
const query = event["queryStringParameters"]["query"]
await runChain(query, responseStream);
console.log(JSON.stringify({"status": "complete"}));
});
GenerateResponseStreamingInvocationURL:
Type: AWS::Lambda::Url
Properties:
TargetFunctionArn: !Ref GenerateResponseStreaming
AuthType: AWS_IAM
InvokeMode: RESPONSE_STREAM