One of the major use case of Memobase is to remember user preferences from the conversation history. In this tutorial, we will demonstrate how to build memory feature around OpenAI client. Memobase offers an easy way already for OpenAI client to remember users. This document is a detailed code breakdown of how to implement this feature.

Setup

  1. Go to Memobase for your Memobase API Key or launch a local server
  2. Setup the environment variables:
OPENAI_API_KEY=your_openai_api_key
MEMOBASE_URL=https://api.memobase.dev
MEMOBASE_API_KEY=your_memobase_api_key
  1. Install the dependencies:
pip install openai memobase

Code Breakdown

Diagram of OpenAI API with Memory

To implement the memory feature around OpenAI client, we need to:

  1. Add wrappers around the OpenAI client, so we can obtain the chat messages and modify the prompts to inject memory
  2. Integrate Memobase APIs in wrappers to memorize the chat history and retrieve user memory
  3. Test if the memory feature works as expected

Full Code

Basic Setup

import os
from memobase import MemoBaseClient
from openai import OpenAI

client = OpenAI()
mb_client = MemoBaseClient(
    api_key=os.getenv("MEMOBASE_API_KEY"),
    project_url=os.getenv("MEMOBASE_URL"),
)

Add wrappers around the OpenAI client

We use duck typing to add wrappers around the OpenAI client.

def openai_memory(
    openai_client: OpenAI | AsyncOpenAI,
    mb_client: MemoBaseClient
) -> OpenAI | AsyncOpenAI:
    if hasattr(openai_client, "_memobase_patched"):
        return openai_client
    openai_client._memobase_patched = True
    openai_client.chat.completions.create = _sync_chat(
        openai_client, mb_client
    )

Above code is a simplified version of the actual implementation:

  • We first check if the OpenAI client is already patched, if so, we return the original client
  • We then replace the chat.completions.create method, which is the main method to generate the chat completion

We use a function called _sync_chat to replace the chat.completions.create method.

New chat.completions.create method

We hope the new chat.completions.create method can:

  • receive user_id in the arguments, so that we can memoize for the specific user
  • receive every possible arguments that the original create method receives, so the new one won’t break any existing code
  • return the same type as the original create method, and I want to support streaming too.
  • almost the same running time as the original create method

We first make sure the orginal arguments can be passed to the original create method:

def _sync_chat(
    client: OpenAI,
    mb_client: MemoBaseClient,
):
    # Save the original create method
    _create_chat = client.chat.completions.create
    def sync_chat(*args, **kwargs) -> ChatCompletion | Stream[ChatCompletionChunk]:
        is_streaming = kwargs.get("stream", False)
        if kwargs.get("user_id", None) is None:
            kwargs.pop("user_id")
            if not is_streaming:
                return _create_chat(*args, **kwargs)
            else:
                return (r for r in _create_chat(*args, **kwargs))

        user_id = string_to_uuid(kwargs.pop("user_id"))
        ...

    return sync_chat

As we can see, the new chat.completions.create method is almost the same as the original one, except that it can receive user_id in the arguments. Since Memobase use uuid to identify users, we need to convert the user_id to uuid. By doing so, you can pass any user name.

If the user_id is provided, we need to:

  1. Get or create the user in Memobase
  2. Insert the user’s memory context into the messages
  3. Call the original create method
  4. Save the conversation to Memobase

Here’s how it’s implemented:

def _sync_chat(client: OpenAI, mb_client: MemoBaseClient):
    _create_chat = client.chat.completions.create
    
    def sync_chat(*args, **kwargs) -> ChatCompletion | Stream[ChatCompletionChunk]:
        # ... existing code ...
        user_query = kwargs["messages"][-1]
        if user_query["role"] != "user":
            LOG.warning(f"Last query is not user query: {user_query}")
            return _create_chat(*args, **kwargs)
            
        # Get or create user in Memobase
        u = mb_client.get_or_create_user(user_id)
        
        # Inject user context into messages
        kwargs["messages"] = user_context_insert(
            kwargs["messages"], u
        )
        
        # Call original create method
        response = _create_chat(*args, **kwargs)
        
        # Save conversation to Memobase
        # ... handle streaming and non-streaming cases

Enhancing messages with user context

The user_context_insert function adds the user’s memory to the messages before sending to OpenAI:

PROMPT = """

--# ADDITIONAL INFO #--
{user_context}
{additional_memory_prompt}
--# DONE #--"""

def user_context_insert(
    messages, u: User, additional_memory_prompt: str="", max_context_size: int = 750
):
    # Retrieve user context
    context = u.context(max_token_size=max_context_size)
    if not len(context):
        return messages
        
    # Format the system prompt with user context
    sys_prompt = PROMPT.format(
        user_context=context, additional_memory_prompt=additional_memory_prompt
    )
    
    # Add to existing system message or insert new one
    if messages[0]["role"] == "system":
        messages[0]["content"] += sys_prompt
    else:
        messages.insert(0, {"role": "system", "content": sys_prompt.strip()})
    return messages

Saving conversations

After getting a response, we save the conversation to Memobase:

def add_message_to_user(messages: ChatBlob, user: User):
    try:
        r = user.insert(messages)
        LOG.debug(f"Insert {messages}")
    except ServerError as e:
        LOG.error(f"Failed to insert message: {e}")

For non-streaming responses, it’s straightforward:

# Non-streaming case
r_string = response.choices[0].message.content
messages = ChatBlob(
    messages=[
        {"role": "user", "content": user_query["content"]},
        {"role": "assistant", "content": r_string},
    ]
)
threading.Thread(target=add_message_to_user, args=(messages, u)).start()

For streaming responses, we accumulate the chunks and save after all chunks are received:

# Streaming case
def yield_response_and_log():
    total_response = ""
    r_role = None

    for r in response:
        yield r
        try:
            r_string = r.choices[0].delta.content
            r_role = r_role or r.choices[0].delta.role
            total_response += r_string or ""
        except Exception:
            continue
            
    # Save the complete conversation after streaming finishes
    messages = ChatBlob(
        messages=[
            {"role": "user", "content": user_query["content"]},
            {"role": "assistant", "content": total_response},
        ]
    )
    threading.Thread(target=add_message_to_user, args=(messages, u)).start()

Utility Functions

The wrapper also includes helper functions:

# Get user profile
def _get_profile(mb_client: MemoBaseClient):
    def get_profile(u_string) -> list[UserProfile]:
        uid = string_to_uuid(u_string)
        return mb_client.get_user(uid, no_get=True).profile()
    return get_profile

# Get memory prompt
def _get_memory_prompt(mb_client: MemoBaseClient, max_context_size: int = 1000, additional_memory_prompt: str = ""):
    def get_memory(u_string) -> list[UserProfile]:
        uid = string_to_uuid(u_string)
        u = mb_client.get_user(uid, no_get=True)
        context = u.context(max_token_size=max_context_size)
        sys_prompt = PROMPT.format(
            user_context=context, additional_memory_prompt=additional_memory_prompt
        )
        return sys_prompt
    return get_memory

# Clear user memory
def _flush(mb_client: MemoBaseClient):
    def flush(u_string) -> list[UserProfile]:
        uid = string_to_uuid(u_string)
        return mb_client.get_user(uid, no_get=True).flush()
    return flush

Usage Example

Now that we understand how the client works, let’s use it:

import os
from openai import OpenAI
from memobase import MemoBaseClient
from memobase.patch import openai_memory

# Initialize clients
openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
mb_client = MemoBaseClient(
    api_key=os.getenv("MEMOBASE_API_KEY"),
    project_url=os.getenv("MEMOBASE_URL"),
)

# Patch the OpenAI client with memory capabilities
memory_enabled_client = openai_memory(openai_client, mb_client)

# Use the patched client with a user_id
response = memory_enabled_client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's my name?"}],
    user_id="john_doe"  # Can be any string identifier
)

print(response.choices[0].message.content)

The first time, the AI might not know the user’s name. But after telling it:

response = memory_enabled_client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "My name is John Doe"}],
    user_id="john_doe"
)

# Later on, in a new conversation
response = memory_enabled_client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "What's my name?"}],
    user_id="john_doe"
)

# Now the AI will remember the user's name

Conclusion

This implementation demonstrates a powerful way to add user memory to the OpenAI client. The patched client:

  1. Works identically to the original OpenAI client
  2. Adds memory capabilities when user_id is provided
  3. Handles both streaming and non-streaming responses
  4. Automatically saves conversations to Memobase
  5. Retrieves and injects user context into prompts

For applications requiring personalized AI interactions, this approach provides a clean, non-intrusive way to add memory capabilities to your existing OpenAI-based applications.