AsyncMemory gives you a non-blocking interface to Mem0’s storage layer so Python applications can add, search, and manage memories directly from async code. Use it when you embed Mem0 inside FastAPI services, background workers, or any workflow that relies on asyncio.
You’ll use this when…
Your agent already runs in an async framework and you need memory calls to await cleanly.
You want to embed Mem0’s storage locally without sending requests through the synchronous client.
You plan to mix memory operations with other async APIs (OpenAI, HTTP calls, databases).
AsyncMemory expects a running event loop. Always call it inside async def functions or through helpers like asyncio.run() to avoid runtime errors.
Working in TypeScript? The Node SDK still uses synchronous calls—use Memory there and rely on Python’s AsyncMemory when you need awaited operations.
Feature anatomy
Direct storage access: AsyncMemory talks to the same backends as the synchronous client but keeps everything in-process for lower latency.
Method parity: Each memory operation (add, search, get_all, delete, etc.) mirrors the synchronous API, letting you reuse payload shapes.
Concurrent execution: Non-blocking I/O lets you schedule multiple memory tasks with asyncio.gather.
Scoped organization: Continue using user_id, agent_id, and run_id to separate memories across sessions and agents.
Operation Async signature Notes Create memories await memory.add(...)Same arguments as synchronous Memory.add. Search memories await memory.search(...)Returns dict with results, identical shape. List memories await memory.get_all(...)Filter by user_id, agent_id, run_id. Retrieve memory await memory.get(memory_id=...)Raises ValueError if ID is invalid. Update memory await memory.update(memory_id=..., data=...)Accepts partial updates. Delete memory await memory.delete(memory_id=...)Returns confirmation payload. Delete in bulk await memory.delete_all(...)Requires at least one scope filter. History await memory.history(memory_id=...)Fetches change log for auditing.
Initialize the client
import asyncio
from mem0 import AsyncMemory
# Default configuration
memory = AsyncMemory()
# Custom configuration
from mem0.configs.base import MemoryConfig
custom_config = MemoryConfig(
# Your custom configuration here
)
memory = AsyncMemory( config = custom_config)
Run await memory.search(...) once right after initialization. If it returns memories without errors, your configuration works.
Keep configuration objects close to the async client so you can reuse them across workers without recreating vector store connections.
Manage lifecycle and concurrency
import asyncio
from contextlib import asynccontextmanager
from mem0 import AsyncMemory
@asynccontextmanager
async def get_memory ():
memory = AsyncMemory()
try :
yield memory
finally :
# Clean up resources if needed
pass
async def safe_memory_usage ():
async with get_memory() as memory:
return await memory.search( "test query" , user_id = "alice" )
Wrap the client in an async context manager when you need a clean shutdown (for example, inside FastAPI startup/shutdown hooks).
async def batch_operations ():
memory = AsyncMemory()
tasks = [
memory.add(
messages = [{ "role" : "user" , "content" : f "Message { i } " }],
user_id = f "user_ { i } "
)
for i in range ( 5 )
]
results = await asyncio.gather( * tasks, return_exceptions = True )
for i, result in enumerate (results):
if isinstance (result, Exception ):
print ( f "Task { i } failed: { result } " )
else :
print ( f "Task { i } completed successfully" )
When concurrency works correctly, successful tasks return memory IDs while failures surface as exceptions in the results list.
Add resilience with retries
import asyncio
from mem0 import AsyncMemory
async def with_timeout_and_retry ( operation , max_retries = 3 , timeout = 10.0 ):
for attempt in range (max_retries):
try :
return await asyncio.wait_for(operation(), timeout = timeout)
except asyncio.TimeoutError:
print ( f "Timeout on attempt { attempt + 1 } " )
except Exception as exc:
print ( f "Error on attempt { attempt + 1 } : { exc } " )
if attempt < max_retries - 1 :
await asyncio.sleep( 2 ** attempt)
raise Exception ( f "Operation failed after { max_retries } attempts" )
async def robust_memory_search ():
memory = AsyncMemory()
async def search_operation ():
return await memory.search( "test query" , user_id = "alice" )
return await with_timeout_and_retry(search_operation)
Always cap retries—runaway loops can keep the event loop busy and block other tasks.
See it in action
Core operations
# Create memories
result = await memory.add(
messages = [
{ "role" : "user" , "content" : "I'm travelling to SF" },
{ "role" : "assistant" , "content" : "That's great to hear!" }
],
user_id = "alice"
)
# Search memories
results = await memory.search(
query = "Where am I travelling?" ,
user_id = "alice"
)
# List memories
all_memories = await memory.get_all( user_id = "alice" )
# Get a specific memory
specific_memory = await memory.get( memory_id = "memory-id-here" )
# Update a memory
updated_memory = await memory.update(
memory_id = "memory-id-here" ,
data = "I'm travelling to Seattle"
)
# Delete a memory
await memory.delete( memory_id = "memory-id-here" )
# Delete scoped memories
await memory.delete_all( user_id = "alice" )
Confirm each call returns the same response fields as the synchronous client (IDs, results, or confirmation objects). Missing keys usually mean the coroutine wasn’t awaited.
delete_all requires at least one of user_id, agent_id, or run_id. Provide all three to narrow deletion to a single session.
Scoped organization
await memory.add(
messages = [{ "role" : "user" , "content" : "I prefer vegetarian food" }],
user_id = "alice" ,
agent_id = "diet-assistant" ,
run_id = "consultation-001"
)
all_user_memories = await memory.get_all( user_id = "alice" )
agent_memories = await memory.get_all( user_id = "alice" , agent_id = "diet-assistant" )
session_memories = await memory.get_all( user_id = "alice" , run_id = "consultation-001" )
specific_memories = await memory.get_all(
user_id = "alice" ,
agent_id = "diet-assistant" ,
run_id = "consultation-001"
)
history = await memory.history( memory_id = "memory-id-here" )
Use history when you need audit trails for compliance or debugging update logic.
Blend with other async APIs
import asyncio
from openai import AsyncOpenAI
from mem0 import AsyncMemory
async_openai_client = AsyncOpenAI()
async_memory = AsyncMemory()
async def chat_with_memories ( message : str , user_id : str = "default_user" ) -> str :
search_result = await async_memory.search( query = message, user_id = user_id, limit = 3 )
relevant_memories = search_result[ "results" ]
memories_str = " \n " .join( f "- { entry[ 'memory' ] } " for entry in relevant_memories)
system_prompt = (
"You are a helpful AI. Answer the question based on query and memories. \n "
f "User Memories: \n { memories_str } "
)
messages = [
{ "role" : "system" , "content" : system_prompt},
{ "role" : "user" , "content" : message},
]
response = await async_openai_client.chat.completions.create(
model = "gpt-4.1-nano-2025-04-14" ,
messages = messages
)
assistant_response = response.choices[ 0 ].message.content
messages.append({ "role" : "assistant" , "content" : assistant_response})
await async_memory.add(messages, user_id = user_id)
return assistant_response
When everything is wired correctly, the OpenAI response should incorporate recent memories and the follow-up add call should persist the new assistant turn.
Handle errors gracefully
from mem0 import AsyncMemory
from mem0.configs.base import MemoryConfig
async def handle_initialization_errors ():
try :
config = MemoryConfig(
vector_store = { "provider" : "chroma" , "config" : { "path" : "./chroma_db" }},
llm = { "provider" : "openai" , "config" : { "model" : "gpt-4.1-nano-2025-04-14" }}
)
AsyncMemory( config = config)
print ( "AsyncMemory initialized successfully" )
except ValueError as err:
print ( f "Configuration error: { err } " )
except ConnectionError as err:
print ( f "Connection error: { err } " )
async def handle_memory_operation_errors ():
memory = AsyncMemory()
try :
await memory.get( memory_id = "non-existent-id" )
except ValueError as err:
print ( f "Invalid memory ID: { err } " )
try :
await memory.search( query = "" , user_id = "alice" )
except ValueError as err:
print ( f "Invalid search query: { err } " )
Catch and log ValueError exceptions from invalid inputs—async stack traces can otherwise disappear inside background tasks.
Serve through FastAPI
from fastapi import FastAPI, HTTPException
from mem0 import AsyncMemory
app = FastAPI()
memory = AsyncMemory()
@app.post ( "/memories/" )
async def add_memory ( messages : list , user_id : str ):
try :
result = await memory.add( messages = messages, user_id = user_id)
return { "status" : "success" , "data" : result}
except Exception as exc:
raise HTTPException( status_code = 500 , detail = str (exc))
@app.get ( "/memories/search" )
async def search_memories ( query : str , user_id : str , limit : int = 10 ):
try :
result = await memory.search( query = query, user_id = user_id, limit = limit)
return { "status" : "success" , "data" : result}
except Exception as exc:
raise HTTPException( status_code = 500 , detail = str (exc))
Create one AsyncMemory instance per process when using FastAPI—startup hooks are a good place to configure and reuse it.
Instrument logging
import logging
import time
from functools import wraps
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
def log_async_operation ( operation_name ):
def decorator ( func ):
@wraps (func)
async def wrapper ( * args , ** kwargs ):
start_time = time.time()
logger.info( f "Starting { operation_name } " )
try :
result = await func( * args, ** kwargs)
duration = time.time() - start_time
logger.info( f " { operation_name } completed in { duration :.2f} s" )
return result
except Exception as exc:
duration = time.time() - start_time
logger.error( f " { operation_name } failed after { duration :.2f} s: { exc } " )
raise
return wrapper
return decorator
@log_async_operation ( "Memory Add" )
async def logged_memory_add ( memory , messages , user_id ):
return await memory.add( messages = messages, user_id = user_id)
Logged durations give you the baseline needed to spot regressions once AsyncMemory is in production.
Verify the feature is working
Run a quick add/search cycle and confirm the returned memory content matches your input.
Inspect application logs to ensure async tasks complete without blocking the event loop.
In FastAPI or other frameworks, hit health endpoints to verify the shared client handles concurrent requests.
Monitor retry counters—unexpected spikes indicate configuration or connectivity issues.
Best practices
Keep operations awaited: Forgetting await is the fastest way to miss writes—lint for it or add helper wrappers.
Scope deletions carefully: Always supply user_id, agent_id, or run_id to avoid purging too much data.
Batch writes thoughtfully: Use asyncio.gather for throughput but cap concurrency based on backend capacity.
Log errors with context: Capture user and agent scopes to triage failures quickly.
Reuse clients: Instantiate AsyncMemory once per worker to avoid repeated backend handshakes.
Troubleshooting
Issue Possible causes Fix Initialization fails Missing dependencies, invalid config Validate MemoryConfig settings and environment variables. Slow operations Large datasets, network latency Cache heavy queries and tune vector store parameters. Memory not found Invalid ID or deleted record Check ID source and handle soft-deleted states. Connection timeouts Network issues, overloaded backend Apply retries/backoff and inspect infrastructure health. Out-of-memory errors Oversized batches Reduce concurrency or chunk operations into smaller sets.