Function Signature
run_assistant(prompt, assistant_id)
Executes an OpenAI Assistant with a given prompt and returns the AI-generated response. This function creates a new thread, streams the assistant’s response, and handles completion events.
Parameters
The user prompt to send to the assistant. This should contain the question or request for the AI to process.
The unique identifier of the OpenAI Assistant to use. Different assistants are configured for different tasks (summary, report, resources).
Return Value
The text response generated by the assistant. Returns an empty string if the run fails or if no message is completed.
Implementation Details
The function performs the following operations:
- Thread Creation: Creates a new OpenAI thread with the user’s prompt
- Assistant Execution: Starts a streaming run with the specified assistant
- Event Streaming: Processes streaming events to capture the response
- Error Handling: Detects and handles failed runs
- Result Extraction: Returns the completed message text
Code Example
from openai.types.beta.assistant_stream_event import ThreadMessageCompleted, ThreadRunFailed
from openai import OpenAI
def run_assistant(prompt, assistant_id):
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": [{"type": "text", "text": prompt}],
}
]
)
run = client.beta.threads.runs.create(
thread_id=thread.id, assistant_id=assistant_id, stream=True
)
result_text = ""
for event in run:
if isinstance(event, ThreadMessageCompleted):
result_text = event.data.content[0].text.value
if isinstance(event, ThreadRunFailed):
print(event)
break
return result_text
Usage Example
# Initialize OpenAI client
client = OpenAI(api_key=st.secrets["OPENAI_API_KEY"])
# Assistant IDs for different tasks
report_assistant_id = st.secrets["ASSISTANT_ID"]
summary_assistant_id = st.secrets["SUMMARY_ASSISTANT"]
resource_assistant_id = st.secrets["RESOURCE_ASSISTANT"]
# Company name extracted from domain
company_name = "Example Corp"
# Create prompts for different analysis types
summary_prompt = f"Provide a brief summary of the financial regulations relevant to the company: {company_name}"
report_prompt = f"Identify any financial compliance red flags in the company data: {company_name} that might affect their business compliance."
resource_prompt = f"List the relevant financial regulatory documents for the company: {company_name}"
# Execute assistants
summary = run_assistant(summary_prompt, summary_assistant_id)
report = run_assistant(report_prompt, report_assistant_id)
resources = run_assistant(resource_prompt, resource_assistant_id)
# Display results
print(f"Summary: {summary}")
print(f"Report: {report}")
print(f"Resources: {resources}")
Multi-Assistant Pattern
The application uses multiple specialized assistants for different analysis tasks:
assistants_ = {
'resource': resource_assistant_id,
'report': report_assistant_id,
'summary': summary_assistant_id,
}
prompts_ = {
'resource': resource_prompt,
'report': report_prompt,
'summary': summary_prompt,
}
# Execute all assistants
results = {}
for key in assistants_.keys():
results[key] = run_assistant(
prompt=prompts_[key],
assistant_id=assistants_[key]
)
Event Stream Example
Stream Events
The function processes the following event types:
# ThreadMessageCompleted event
{
"event": "thread.message.completed",
"data": {
"id": "msg_abc123",
"object": "thread.message",
"created_at": 1234567890,
"thread_id": "thread_abc123",
"role": "assistant",
"content": [
{
"type": "text",
"text": {
"value": "Based on the analysis of Example Corp, here are the key findings...",
"annotations": []
}
}
]
}
}
# ThreadRunFailed event
{
"event": "thread.run.failed",
"data": {
"id": "run_abc123",
"status": "failed",
"last_error": {
"code": "server_error",
"message": "An unexpected error occurred"
}
}
}
Error Handling
The function implements basic error handling for failed runs:
for event in run:
if isinstance(event, ThreadMessageCompleted):
# Extract the completed message
result_text = event.data.content[0].text.value
if isinstance(event, ThreadRunFailed):
# Log the failure event
print(event)
break
Common Error Scenarios
- Invalid Assistant ID: The assistant doesn’t exist or is not accessible
- API Rate Limits: OpenAI API rate limiting
- Thread Creation Failure: Issues creating a new thread
- Stream Interruption: Network issues during streaming
- Content Policy Violation: Prompt triggers content moderation
Best Practices
Error Handling Enhancement
Consider adding more robust error handling:
def run_assistant(prompt, assistant_id):
try:
thread = client.beta.threads.create(
messages=[
{
"role": "user",
"content": [{"type": "text", "text": prompt}],
}
]
)
except Exception as e:
print(f"Failed to create thread: {e}")
return ""
try:
run = client.beta.threads.runs.create(
thread_id=thread.id, assistant_id=assistant_id, stream=True
)
except Exception as e:
print(f"Failed to start assistant run: {e}")
return ""
result_text = ""
try:
for event in run:
if isinstance(event, ThreadMessageCompleted):
result_text = event.data.content[0].text.value
if isinstance(event, ThreadRunFailed):
print(f"Assistant run failed: {event}")
break
except Exception as e:
print(f"Error during stream processing: {e}")
return result_text
Assistant Configuration
Each assistant should be configured with:
- Specific instructions for its task (summary, report, or resources)
- Access to the vector storage containing business data
- Appropriate model selection (e.g., GPT-4)
- Temperature and other generation parameters
Vector Storage Integration
The assistants use a shared vector storage for context:
vector_storage_id = st.secrets["VECTOR_STORAGE_ID"]
# The vector storage is populated with crawled business data
web_crawler.website_crawler(f"https://{domain}", my_bar=my_bar)
- Streaming: The function uses streaming for real-time response delivery
- Sequential Execution: Multiple assistants are run sequentially, not in parallel
- Thread Lifecycle: Each call creates a new thread (threads are not reused)
- Result Extraction: Only the final completed message is returned