Developing memory-rich AI systems with Valkey™, OpenSearch® and RAG
Adding long term memory to agent communication with OpenSearch®, Amazon Bedrock, RAG and LangChain
Adding long term memory to agent communication with OpenSearch®, Amazon Bedrock, RAG and LangChain
In this step-by-step tutorial, we'll explore how to build long-term memory into agent-based systems. Essentially, we'll create an agent system that can remember and learn from every interaction. Previously, we established a communication process using Valkey™ for pub/sub messaging to keep agents independent. Our initial architecture looked like this:
In this tutorial, we'll expand on that by introducing several new components:
This setup is particularly useful for scenarios like automating customer support, managing IoT devices, or any situation where systems need to operate independently while maintaining a shared state in real-time.
By the end of this tutorial, we'll achieve the following target architecture:
While it might seem complex, don't worry— we'll break it down module by module.
The complete code is available in this GitHub repository. However, you can follow the steps below to build the project from scratch.
This tutorial builds on concepts from Integrating Valkey Pub/Sub with Amazon Bedrock for AI Agent Communication, which you can also find in this GitHub repository.
To follow along, you'll need:
We'll use Terraform to set up Aiven services, which will allow us to automate the deployment of several services, making it easier to create, deploy, and manage the necessary credentials.
If you don't have Terraform installed, follow the instructions on the Terraform documentation page.
The Terraform files for this project are located in the ./terraform folder.
We'll need an Aiven token to run the Terraform script and access your Aiven account.
To get the Aiven token:
./terraform/terraform.tfvars.aiven_api_token and the project name to project_name.terraform folder.export PROVIDER_AIVEN_ENABLE_BETA=true in your terminal (Terraform Valkey resource is still in beta).terraform init.terraform plan.terraform apply.Terraform will create four resources:
Once deployment is complete, Terraform will generate a .env file with the necessary credentials and a certificates folder with Apache Kafka certificates.
To manage environment variables and credentials, we'll use dotenv. Install it with:
Loading code...
To work with agents and their memories, we'll need two models:
Make sure we have enabled access to these models by following steps from AWS documentation for model access
We'll store the ongoing conversation stream in an Apache Kafka topic. This approach has several benefits—it allows replaying the conversation later, which is useful for processing short-term memories and potentially for recovering from failures if we decide to implement such functionality in the future.
To interact with the Apache Kafka service from a NodeJS project, we'll use node-rdkafka. Install it with:
To send data to an Apache Kafka topic, we first need to set up a producer. Create a file named producer.js and add this code for a simple Kafka producer:
Loading code...
Next, we integrate this producer into the Agent class by adding a new function, storeInKafka:
Loading code...
With this new function, every time an agent receives a message, it will send it to Apache Kafka. Update the startToListenToOthers function to invoke storeInKafka:
Loading code...
Since agents need to share a common topic for ongoing conversations, it makes sense to pass the topic name when creating agents. Update the Agent constructor to include a conversationTopic:
Loading code...
To ensure each conversation has a unique topic, we'll use the timestamp of the conversation's start. This approach guarantees that all independent conversations have distinct topic names. Modify run.js to calculate the topic name and provide it to the agents.
Loading code...
In the next step we generate reflections based on the data that is stored in each unique conversation topic.
To generate conversation summaries (reflections) for each agent we'll do these steps:
To consume all records from a given topic, calculate the latest offset. Create a new file consumer.js and add the code to export the consumeAll function:
Loading code...
Update prompt.js with a new prompt for generating a conversation summary:
Loading code...
In the Agent class, add a reflect method to read all messages from the conversation topic, request a summary from the model, and store the summary in a new topic. Each agent will have its own topic for reflections:
Loading code...
Finally, we'll connect everything using an additional Valkey channel that triggers the conversation reflection once a conversation ends:
Loading code...
Now that reflections are sent to Apache Kafka whenever agents converse, we need to add these reflections as long-term memories for each agent. We'll use the Retrieval Augmented Generation (RAG) pattern and the LangChain framework for this purpose:
In order to install LangChain library that can work with OpenSearch and Amazon Bedrock run:
Loading code...
We also need to install OpenSearch NodeJS client:
Loading code...
Once you have the necessary libraries, create a new file named vectorize.js. This file will handle data consumption from the reflection topic and send it to the OpenSearch index:
Loading code...
Although the consumeAndIndex method could be run as a separate process (since it’s independent of the agent), we'll keep all calls within run.js for simplicity:
Loading code...
To enhance the agent's responses, we'll integrate short-term memory with the long-term memories stored in OpenSearch. For this we'll do the following:
LongMemoryService: this service will provide a method to retrieve relevant long-term memory based on the current conversation.
Create a file called longTermMemory.js that utilizes LangChain libraries to interface with both the Claude model from Bedrock and the OpenSearch vector store:
Loading code...
Next, update the Agent class to incorporate the long-memory service and the method to query it:
Loading code...
Revise the prompts in prompts.js to factor in long-term memories:
Loading code...
Finally, adjust the getPrompt method in the Agent class to integrate long-term memories into the prompt:
Loading code...
It's time to run our agents!
Loading code...
Observe that, at the end of a conversation, the system will enter reflection mode. You can also monitor the data stored in Apache Kafka topics:
Run multiple conversations to verify that agents are recognizing each other.
In this tutorial, we’ve built a system that enables agents to retain and learn from interactions through long-term memory.
If you're curious to learn more things you can do with Aiven and AI look at:
npm install dotenvimport Kafka from 'node-rdkafka';
import dotenv from 'dotenv';
dotenv.config();
// Create a producer
export const producer = new Kafka.Producer({
'metadata.broker.list': process.env["KAFKA_SERVICE_URI"],
'security.protocol': 'ssl',
'ssl.key.location': process.env["ssl.key.location"],
'ssl.certificate.location': process.env["ssl.certificate.location"],
'ssl.ca.location': process.env["ssl.ca.location"],
'dr_cb': true
});
producer.on('event.log', function (log) {
console.log(log);
});
// Logging all errors
producer.on('event.error', function (err) {
console.error(err);
});
producer.on('connection.failure', function (err) {
console.error(err);
});
producer.on('delivery-report', function (err, report) {
console.log('Message was delivered' + JSON.stringify(report));
});
producer.on('disconnected', function (arg) {
console.log('producer disconnected. ' + JSON.stringify(arg));
});
producer.connect({}, (err) => {
if (err) {
console.error(err);
}
});storeInKafka(topic, message) {
producer.produce(
topic,
null,
Buffer.from(message),
null,
Date.now()
);
producer.flush();
}startToListenToOthers() {
const subscriber = subscribe(this.agentName);
subscriber.on('message', async (channel, message) => {
const parsedMessage = JSON.parse(message);
this.storeInKafka(this.conversationTopic, message);
await delay(1000);
await this.replyToMessage(parsedMessage.message, parsedMessage.agent);
});
}constructor(agentName, anotherAgent, starts, conversationTopic) {
this.conversationTopic = conversationTopic;
....const kafkaTopic = Date.now().toString();
const nick = new Agent('Nick', 'Judy', false, kafkaTopic);
nick.start();
const judy = new Agent('Judy', 'Nick', true, kafkaTopic);
judy.start();import Kafka from 'node-rdkafka';
import dotenv from 'dotenv';
dotenv.config();
export const consumeAll = async (topic, groupId) => {
return new Promise((resolve, reject) => {
console.log('Initializing Kafka Consumer...');
const consumer = new Kafka.KafkaConsumer({
'group.id': groupId,
'metadata.broker.list': process.env["KAFKA_SERVICE_URI"],
'security.protocol': 'ssl',
'ssl.key.location': process.env["ssl.key.location"],
'ssl.certificate.location': process.env["ssl.certificate.location"],
'ssl.ca.location': process.env["ssl.ca.location"],
'enable.auto.commit': false
}, {
'auto.offset.reset': 'earliest'
});
const messages = [];
let latestOffset;
consumer.on('ready', () => {
console.log('Consumer is ready, querying watermark offsets...');
consumer.queryWatermarkOffsets(topic, 0, 1000, (err, offsets) => {
if (err) {
console.error('Error querying watermark offsets:', err);
return reject(err);
}
latestOffset = offsets.highOffset;
console.log(`Latest offset for topic ${topic} is ${latestOffset}`);
consumer.subscribe([topic]);
console.log(`Subscribed to topic ${topic}, starting consumption...`);
consumer.consume();
});
});
consumer.on('data', (data) => {
console.log('Received data:', data);
const messageOffset = data.offset;
console.log(`Message offset: ${messageOffset}, Latest offset: ${latestOffset}`);
messages.push(data.value.toString());
console.log('Message added to the list.', data.value.toString());
if (messageOffset === latestOffset - 1) {
console.log('Reached the latest offset, disconnecting...');
consumer.disconnect();
}
});
consumer.on('disconnected', () => {
console.log('Consumer disconnected');
resolve(messages);
});
consumer.on('event.error', (err) => {
console.error('Error event:', err);
reject(err);
});
consumer.on('event.log', (log) => {
console.log('Log event:', log);
});
consumer.on('connection.failure', (err) => {
console.error('Connection failure:', err);
});
console.log('Connecting to Kafka...');
consumer.connect();
});
};export const getConversationSummaryPrompt = (agentName, content) => `You're an inhabitant of a planet Hipola, a very small and cosy planet. Your name is ${agentName}. you met another citizen and had this conversation: ${content}. Reflect on this conversation and summarize in one most important thought that is worth remembering about the person you met. Output only the thought. Remember, you're ${agentName}.`;async reflect() {
const messages = await consumeAll(this.conversationTopic, $${this.conversationTopic}-${this.agentName}`);
const summary = await this.getConversationSummary(messages.join("; "));
this.storeInKafka($${this.agentName}-reflections`, summary);
}
async getConversationSummary(content) {
const prompt = getConversationSummaryPrompt(this.agentName, content);
return await invokeModel(prompt);
}async triggerReflection(recipient) {
await sendToChannel($${recipient}-internalize`, "Reflect on the conversation");
await sendToChannel($${this.agentName}-internalize`, "Reflect on the conversation");
}
async replyToMessage(message, recipient) {
//agent indicated that no longer wants to continue conversation
if (message && message.includes("END")) {
return await this.triggerReflection(recipient);
}
const prompt = await this.getPrompt(message);
console.log(`### ${this.agentName.toUpperCase()} PROMPT: ###`)
console.log("prompt: " + this.agentName, prompt)
const response = await invokeModel(prompt);
console.log(`=== ${this.agentName.toUpperCase()} SAYS: ===`)
console.log($${response}`);
if (message) {
this.shortMemory.push($${recipient} said: ${message}`)
}
this.shortMemory.push(`You replied: ${response}`);
sendToChannel(recipient, JSON.stringify({agent: this.agentName, message: response}));
}
waitToConversationEnd() {
const subscriber = subscribe( $${this.agentName}-internalize`);
subscriber.on('message', async (channel) => {
if (channel !== $${this.agentName}-internalize`) return;
await this.reflect();
});
}
async start() {
// listen what another agent tells you
this.startToListenToOthers();
// get ready to process the conversation
this.waitToConversationEnd();
if (this.starts) {
await this.replyToMessage(null, this.anotherAgent);
}
}
npm install @langchain/aws @langchain/community @langchain/core langchainnpm install @opensearch-project/opensearchimport Kafka from 'node-rdkafka';
import { Client } from "@opensearch-project/opensearch";
import { Document } from "langchain/document";
import { BedrockEmbeddings } from "@langchain/aws";
import { OpenSearchVectorStore } from "@langchain/community/vectorstores/opensearch";
import dotenv from 'dotenv';
dotenv.config();
const client = new Client({
nodes: [process.env.OPENSEARCH_SERVICE_URI],
});
const consumeAndIndex = (topicName) => {
// Kafka consumer setup
const consumer = new Kafka.KafkaConsumer({
'group.id': 'kafka-group',
'metadata.broker.list': process.env["KAFKA_SERVICE_URI"],
'security.protocol': 'ssl',
'ssl.key.location': process.env["ssl.key.location"],
'ssl.certificate.location': process.env["ssl.certificate.location"],
'ssl.ca.location': process.env["ssl.ca.location"],
}, {});
consumer.connect();
consumer.on('ready', () => {
console.log('Consumer ready');
consumer.subscribe([topicName]);
consumer.consume();
}).on('data', async (data) => {
const messageValue = data.value.toString();
// Process the message and create a Document
const doc = new Document({
metadata: { source: 'kafka' },
pageContent: messageValue,
});
// Create embeddings and send to OpenSearch
try {
const embeddings = new BedrockEmbeddings({
region: 'us-east-1',
credentials: {
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
accessKeyId: process.env.AWS_ACCESS_KEY_ID
},
model: "amazon.titan-embed-text-v1",
});
await OpenSearchVectorStore.fromDocuments([doc], embeddings, {
client,
indexName: topicName.toLowerCase(),
});
console.log('Document indexed successfully:', doc);
} catch (error) {
console.error('Error indexing document:', error);
}
});
consumer.on('event.error', (err) => {
console.error('Error from consumer:', err);
});
};
export default consumeAndIndex;
import Agent from './src/agent.js';
import consumeAndIndex from "./src/vectorize.js";
const kafkaTopic = Date.now().toString();
const nick = new Agent('Nick', 'Judy', false, kafkaTopic);
consumeAndIndex("Nick-reflections");
nick.start();
const judy = new Agent('Judy', 'Nick', true, kafkaTopic);
judy.start();
consumeAndIndex("Judy-reflections");import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import dotenv from 'dotenv';
import { Client } from "@opensearch-project/opensearch";
import { OpenSearchVectorStore } from "@langchain/community/vectorstores/opensearch";
import { BedrockEmbeddings } from "@langchain/aws";
import { VectorDBQAChain } from "langchain/chains";
dotenv.config();
export class LongMemoryService {
constructor(indexName) {
this.indexName = indexName;
this.model = new BedrockChat({
model: "anthropic.claude-3-haiku-20240307-v1:0",
region: "us-east-1",
credentials: {
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
accessKeyId: process.env.AWS_ACCESS_KEY_ID
},
});
this.client = new Client({
nodes: [process.env.OPENSEARCH_SERVICE_URI],
});
this.vectorStore = new OpenSearchVectorStore(new BedrockEmbeddings({
region: 'us-east-1',
credentials: {
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
accessKeyId: process.env.AWS_ACCESS_KEY_ID
},
model: "amazon.titan-embed-text-v1"
}), {
client: this.client,
indexName: indexName,
});
this.chain = VectorDBQAChain.fromLLM(this.model, this.vectorStore, {
k: 1,
returnSourceDocuments: true,
});
}
async indexExists() {
try {
const response = await this.client.indices.exists({ index: this.indexName });
return response.body;
} catch (error) {
console.error('Error checking if index exists:', error);
return false;
}
}
async getLongMemory(query) {
const indexExists = await this.indexExists();
if (!indexExists) {
return '';
}
const response = await this.chain.call({ query });
return response.text;
}
}
constructor(agentName, anotherAgent, starts,conversationTopic) {
console.log({ conversationTopic })
this.agentName = agentName;
this.anotherAgent = anotherAgent;
this.shortMemory = [];
this.starts = starts;
this.conversationTopic = conversationTopic;
this.longMemoryService = new LongMemoryService($${this.agentName.toLowerCase()}-reflections`);
}
async queryLongTermMemory(message) {
const longmemory = await this.longMemoryService.getLongMemory(`\n\nHuman: ${message} \n\nAssistant:`);
console.log("******* " + this.agentName.toUpperCase() + " LONG MEMORY: " + longmemory);
console.log("************************************************************************************");
return longmemory;
}export const getPromptStart = (agentName) => `You're an inhabitant of a planet Hipola, a very small and cosy planet. Your name is ${agentName}.`;
export const instructions = `Always follow these instructions:
- if it is the first time you meet this inhabitant, introduce yourself and learn their name;
- if you met this person before or already know something about them - do not introduce yourself, but relate to the previous conversation
- if it's ongoing conversation, don't introduce yourself, just continue the conversation, reply or ask question, be natural;
- after a couple of exchanged messages politely say goodbye
- answer the questions of the other inhabitant;
- try to finish the topic and when you're done with the conversation for today respond with "[END]";
`;
export const getMemoryPrompt = (agentName, anotherAgent) => `The context are memories of ${agentName}. Are there any memories or thoughts about ${anotherAgent}? If yes, respond with "You remember meeting ${anotherAgent}, what you remember is that .... [continue based on the additional context]". If there is no info about ${anotherAgent} in the context respond with "You haven't met ${anotherAgent} before". Don't provide any other judgement or additional information.`;
export const getContinuationMemoryPrompt = (agentName, anotherAgent, message) => `The context are memories of ${agentName}. Are there any memories or thoughts about ${anotherAgent} relevant to the message "${message}"? If yes return "Something that I remember from past conversations with ${anotherAgent} is that .... [continue with a concise list of notes]". Otherwise, if there is no relevant context return "nothing relevant that I remember" and be very very very short and don't provide any other judgement or additional information!`;
export const getStartConversationPrompt = (agentName, memoriesOfOtherAgent) => $${getPromptStart(agentName)} ${memoriesOfOtherAgent}.\n\n${instructions}`;
export const getContinueConversationPrompt = (agentName, memoryString, longTermMemory, message) => `
${getPromptStart(agentName)}
You're meeting another inhabitant. This is the conversation so far:\n${memoryString}\n\n\n\n
This is what you remember about them from previous interactions that is relevant to their phrase:\n${longTermMemory} Reply to this message from another inhabitant from the planet Hipola: "${message}" and ask a relevant question to continue the conversation. If you already had several messages exchanged, politely say goodbye and end conversation. Be concise. Remember, you're ${agentName}.
${instructions}`;
async getPrompt(message) {
// start of the conversation:
if (!message) {
const memoriesOfOtherAgent = await this.queryLongTermMemory(getMemoryPrompt(this.agentName, this.anotherAgent));
return getStartConversationPrompt(this.agentName, memoriesOfOtherAgent);
}
// continuation of the conversation:
let memoryString = this.shortMemory.join('\n');
let longTermMemory = await this.queryLongTermMemory(getContinuationMemoryPrompt(this.agentName, this.anotherAgent, message));
return getContinueConversationPrompt(this.agentName, memoryString, longTermMemory, message);
}node run