Skip to main content

Function Signature

run_assistant(prompt, assistant_id)
Executes an OpenAI Assistant with a given prompt and returns the AI-generated response. This function creates a new thread, streams the assistant’s response, and handles completion events.

Parameters

prompt
string
required
The user prompt to send to the assistant. This should contain the question or request for the AI to process.
assistant_id
string
required
The unique identifier of the OpenAI Assistant to use. Different assistants are configured for different tasks (summary, report, resources).

Return Value

result_text
string
The text response generated by the assistant. Returns an empty string if the run fails or if no message is completed.

Implementation Details

The function performs the following operations:
  1. Thread Creation: Creates a new OpenAI thread with the user’s prompt
  2. Assistant Execution: Starts a streaming run with the specified assistant
  3. Event Streaming: Processes streaming events to capture the response
  4. Error Handling: Detects and handles failed runs
  5. Result Extraction: Returns the completed message text

Code Example

from openai.types.beta.assistant_stream_event import ThreadMessageCompleted, ThreadRunFailed
from openai import OpenAI

def run_assistant(prompt, assistant_id):
    thread = client.beta.threads.create(
        messages=[
            {
                "role": "user",
                "content": [{"type": "text", "text": prompt}],
            }
        ]
    )
    run = client.beta.threads.runs.create(
        thread_id=thread.id, assistant_id=assistant_id, stream=True
    )

    result_text = ""
    for event in run:
        if isinstance(event, ThreadMessageCompleted):
            result_text = event.data.content[0].text.value
        if isinstance(event, ThreadRunFailed):
            print(event)
            break
    return result_text

Usage Example

# Initialize OpenAI client
client = OpenAI(api_key=st.secrets["OPENAI_API_KEY"])

# Assistant IDs for different tasks
report_assistant_id = st.secrets["ASSISTANT_ID"]
summary_assistant_id = st.secrets["SUMMARY_ASSISTANT"]
resource_assistant_id = st.secrets["RESOURCE_ASSISTANT"]

# Company name extracted from domain
company_name = "Example Corp"

# Create prompts for different analysis types
summary_prompt = f"Provide a brief summary of the financial regulations relevant to the company: {company_name}"
report_prompt = f"Identify any financial compliance red flags in the company data: {company_name} that might affect their business compliance."
resource_prompt = f"List the relevant financial regulatory documents for the company: {company_name}"

# Execute assistants
summary = run_assistant(summary_prompt, summary_assistant_id)
report = run_assistant(report_prompt, report_assistant_id)
resources = run_assistant(resource_prompt, resource_assistant_id)

# Display results
print(f"Summary: {summary}")
print(f"Report: {report}")
print(f"Resources: {resources}")

Multi-Assistant Pattern

The application uses multiple specialized assistants for different analysis tasks:
assistants_ = {
    'resource': resource_assistant_id,
    'report': report_assistant_id,
    'summary': summary_assistant_id,
}
prompts_ = {
    'resource': resource_prompt,
    'report': report_prompt,
    'summary': summary_prompt,
}

# Execute all assistants
results = {}
for key in assistants_.keys():
    results[key] = run_assistant(
        prompt=prompts_[key],
        assistant_id=assistants_[key]
    )

Event Stream Example

Stream Events

The function processes the following event types:
# ThreadMessageCompleted event
{
  "event": "thread.message.completed",
  "data": {
    "id": "msg_abc123",
    "object": "thread.message",
    "created_at": 1234567890,
    "thread_id": "thread_abc123",
    "role": "assistant",
    "content": [
      {
        "type": "text",
        "text": {
          "value": "Based on the analysis of Example Corp, here are the key findings...",
          "annotations": []
        }
      }
    ]
  }
}

# ThreadRunFailed event
{
  "event": "thread.run.failed",
  "data": {
    "id": "run_abc123",
    "status": "failed",
    "last_error": {
      "code": "server_error",
      "message": "An unexpected error occurred"
    }
  }
}

Error Handling

The function implements basic error handling for failed runs:
for event in run:
    if isinstance(event, ThreadMessageCompleted):
        # Extract the completed message
        result_text = event.data.content[0].text.value
    if isinstance(event, ThreadRunFailed):
        # Log the failure event
        print(event)
        break

Common Error Scenarios

  1. Invalid Assistant ID: The assistant doesn’t exist or is not accessible
  2. API Rate Limits: OpenAI API rate limiting
  3. Thread Creation Failure: Issues creating a new thread
  4. Stream Interruption: Network issues during streaming
  5. Content Policy Violation: Prompt triggers content moderation

Best Practices

Error Handling Enhancement

Consider adding more robust error handling:
def run_assistant(prompt, assistant_id):
    try:
        thread = client.beta.threads.create(
            messages=[
                {
                    "role": "user",
                    "content": [{"type": "text", "text": prompt}],
                }
            ]
        )
    except Exception as e:
        print(f"Failed to create thread: {e}")
        return ""
    
    try:
        run = client.beta.threads.runs.create(
            thread_id=thread.id, assistant_id=assistant_id, stream=True
        )
    except Exception as e:
        print(f"Failed to start assistant run: {e}")
        return ""

    result_text = ""
    try:
        for event in run:
            if isinstance(event, ThreadMessageCompleted):
                result_text = event.data.content[0].text.value
            if isinstance(event, ThreadRunFailed):
                print(f"Assistant run failed: {event}")
                break
    except Exception as e:
        print(f"Error during stream processing: {e}")
    
    return result_text

Assistant Configuration

Each assistant should be configured with:
  • Specific instructions for its task (summary, report, or resources)
  • Access to the vector storage containing business data
  • Appropriate model selection (e.g., GPT-4)
  • Temperature and other generation parameters

Vector Storage Integration

The assistants use a shared vector storage for context:
vector_storage_id = st.secrets["VECTOR_STORAGE_ID"]

# The vector storage is populated with crawled business data
web_crawler.website_crawler(f"https://{domain}", my_bar=my_bar)

Performance Considerations

  • Streaming: The function uses streaming for real-time response delivery
  • Sequential Execution: Multiple assistants are run sequentially, not in parallel
  • Thread Lifecycle: Each call creates a new thread (threads are not reused)
  • Result Extraction: Only the final completed message is returned

Build docs developers (and LLMs) love