Browser Use supports running multiple browser instances in parallel, enabling concurrent automation tasks for faster execution.
Basic Parallel Execution
Run multiple agents in parallel using asyncio.gather:
import asyncio
from browser_use import Agent, Browser, ChatOpenAI
async def main():
# Create 3 separate browser instances
browsers = [
Browser(
user_data_dir=f'./temp-profile-{i}',
headless=False,
)
for i in range(3)
]
# Create 3 agents with different tasks
agents = [
Agent(
task='Search for "browser automation" on Google',
browser=browsers[0],
llm=ChatOpenAI(model='gpt-4.1-mini'),
),
Agent(
task='Search for "AI agents" on DuckDuckGo',
browser=browsers[1],
llm=ChatOpenAI(model='gpt-4.1-mini'),
),
Agent(
task='Visit Wikipedia and search for "web scraping"',
browser=browsers[2],
llm=ChatOpenAI(model='gpt-4.1-mini'),
),
]
# Run all agents in parallel
tasks = [agent.run() for agent in agents]
results = await asyncio.gather(*tasks, return_exceptions=True)
print('🎉 All agents completed!')
# Process results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Agent {i} failed: {result}')
else:
print(f'Agent {i} completed successfully')
if __name__ == '__main__':
asyncio.run(main())
How It Works
Create Multiple Browser Instances
Each browser uses a separate user data directory to avoid conflicts
Define Independent Tasks
Create agents with different tasks that can run concurrently
Run in Parallel
Use asyncio.gather() to execute all agent tasks simultaneously
Collect Results
Gather all results once tasks complete, with error handling
Experimental Feature: Parallel execution is still experimental. Agents might conflict with each other in some scenarios.
Parallel Data Collection
Collect data from multiple sources simultaneously:
import asyncio
from pydantic import BaseModel, Field
from browser_use import Agent, Browser, ChatBrowserUse
class ProductData(BaseModel):
name: str
price: float
url: str
source: str
async def scrape_website(url: str, search_term: str):
"""Scrape a single website for product information."""
browser = Browser(
user_data_dir=f'./temp-{url.split("//")[1].split(".")[0]}',
headless=True,
)
task = f"""
Go to {url} and search for "{search_term}".
Find the top 3 products and extract:
- Product name
- Price
- URL
Return as structured data.
"""
agent = Agent(
task=task,
browser=browser,
llm=ChatBrowserUse(),
output_model_schema=ProductData
)
return await agent.run()
async def compare_prices(product: str):
"""Compare prices across multiple websites."""
websites = [
'https://www.amazon.com',
'https://www.ebay.com',
'https://www.walmart.com',
]
# Run all scraping tasks in parallel
tasks = [scrape_website(url, product) for url in websites]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f'Price comparison for "{product}":')
for result in results:
if not isinstance(result, Exception):
print(f' Found on {result.source}: ${result.price}')
if __name__ == '__main__':
asyncio.run(compare_prices('wireless headphones'))
Submit forms to multiple sites simultaneously:
import asyncio
from browser_use import Agent, Browser, ChatBrowserUse
async def submit_form(site_url: str, form_data: dict):
"""Submit a form to a single website."""
browser = Browser(
user_data_dir=f'./temp-{site_url.split("//")[1][:10]}',
headless=True,
)
task = f"""
Go to {site_url} and fill out the contact form:
- Name: {form_data['name']}
- Email: {form_data['email']}
- Message: {form_data['message']}
Submit the form and confirm submission was successful.
"""
agent = Agent(task=task, browser=browser, llm=ChatBrowserUse())
return await agent.run()
async def bulk_contact_submissions():
"""Submit contact forms to multiple sites in parallel."""
sites = [
'https://example1.com/contact',
'https://example2.com/contact',
'https://example3.com/contact',
]
form_data = {
'name': 'John Doe',
'email': 'john@example.com',
'message': 'I am interested in your services',
}
# Submit to all sites in parallel
tasks = [submit_form(url, form_data) for url in sites]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Check results
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f'Submission to {sites[i]} failed: {result}')
else:
print(f'Successfully submitted to {sites[i]}')
if __name__ == '__main__':
asyncio.run(bulk_contact_submissions())
Parallel Research Tasks
Gather information from multiple sources concurrently:
import asyncio
from browser_use import Agent, Browser, ChatBrowserUse
async def research_topic(source: str, topic: str):
"""Research a topic on a single source."""
browser = Browser(
user_data_dir=f'./temp-{source.replace(" ", "-")}',
headless=True,
)
task = f"""
Research "{topic}" on {source}.
Find the top 3 articles and extract:
- Title
- Summary
- Publication date
- URL
"""
agent = Agent(
task=task,
browser=browser,
llm=ChatBrowserUse(),
max_steps=30
)
return await agent.run()
async def comprehensive_research(topic: str):
"""Research a topic across multiple sources in parallel."""
sources = [
'TechCrunch',
'The Verge',
'Hacker News',
'Ars Technica',
]
print(f'Researching "{topic}" across {len(sources)} sources...')
# Research all sources in parallel
tasks = [research_topic(source, topic) for source in sources]
results = await asyncio.gather(*tasks, return_exceptions=True)
print(f'\n✅ Research complete!\n')
# Compile findings
for i, result in enumerate(results):
if not isinstance(result, Exception):
print(f'Findings from {sources[i]}:')
if result.final_result():
print(result.final_result())
print()
if __name__ == '__main__':
asyncio.run(comprehensive_research('quantum computing'))
Error Handling in Parallel Execution
Handle errors gracefully when running tasks in parallel:
import asyncio
from browser_use import Agent, Browser, ChatBrowserUse
async def safe_task_execution(task_name: str, task_description: str):
"""Execute a task with error handling."""
try:
browser = Browser(
user_data_dir=f'./temp-{task_name}',
headless=True,
)
agent = Agent(
task=task_description,
browser=browser,
llm=ChatBrowserUse(),
max_failures=2 # Retry twice on failure
)
result = await agent.run()
return {'task': task_name, 'status': 'success', 'result': result}
except Exception as e:
return {'task': task_name, 'status': 'failed', 'error': str(e)}
async def main():
"""Run multiple tasks with comprehensive error handling."""
tasks = [
('task1', 'Search for "AI news" on Google'),
('task2', 'Search for "machine learning" on DuckDuckGo'),
('task3', 'Visit https://invalid-url.com'), # This will fail
]
# Execute all tasks in parallel
results = await asyncio.gather(
*[safe_task_execution(name, desc) for name, desc in tasks],
return_exceptions=True
)
# Process results
successful = [r for r in results if r.get('status') == 'success']
failed = [r for r in results if r.get('status') == 'failed']
print(f'\n✅ Successful: {len(successful)}')
print(f'❌ Failed: {len(failed)}')
if failed:
print('\nFailed tasks:')
for task in failed:
print(f" - {task['task']}: {task['error']}")
if __name__ == '__main__':
asyncio.run(main())
Resource Usage: Each browser instance consumes memory and CPU. Monitor system resources when running many parallel tasks.
Optimizing Parallel Execution
# Use headless mode for better performance
browser = Browser(headless=True)
# Limit concurrent tasks based on system resources
MAX_CONCURRENT = 5
async def run_with_concurrency_limit(tasks, max_concurrent):
"""Run tasks with a concurrency limit."""
semaphore = asyncio.Semaphore(max_concurrent)
async def limited_task(task):
async with semaphore:
return await task
return await asyncio.gather(*[limited_task(t) for t in tasks])
Best Practices
Separate User Data Directories
Give each browser instance its own user data directory to prevent conflicts
Use Headless Mode
Enable headless mode for better performance when running multiple browsers
Implement Error Handling
Use return_exceptions=True in asyncio.gather() to prevent one failure from stopping all tasks
Limit Concurrency
Use asyncio.Semaphore to limit concurrent tasks based on system resources
Clean Up Resources
Ensure browser instances are properly closed after task completion
Concurrency Limits
import asyncio
from browser_use import Agent, Browser, ChatBrowserUse
async def rate_limited_execution(tasks, max_concurrent=3):
"""Execute tasks with a maximum concurrency limit."""
semaphore = asyncio.Semaphore(max_concurrent)
async def execute_task(task_func):
async with semaphore:
return await task_func()
return await asyncio.gather(*[execute_task(t) for t in tasks])
async def main():
"""Run 10 tasks with max 3 concurrent executions."""
async def create_task(i):
browser = Browser(user_data_dir=f'./temp-{i}', headless=True)
agent = Agent(
task=f'Search for topic {i}',
browser=browser,
llm=ChatBrowserUse()
)
return await agent.run()
tasks = [lambda i=i: create_task(i) for i in range(10)]
results = await rate_limited_execution(tasks, max_concurrent=3)
print(f'Completed {len(results)} tasks with max 3 concurrent executions')
if __name__ == '__main__':
asyncio.run(main())
- Data Scraping - Extract data from multiple sites in parallel
- Research - Gather information from multiple sources
- Shopping - Compare prices across multiple stores