Agent mode requires running skyvern init first to set up your local Skyvern environment.
import asynciofrom skyvern_langchain.agent import RunTaskrun_task = RunTask()async def main(): result = await run_task.ainvoke( "Navigate to the Hacker News homepage and get the top 3 posts." ) print(result)if __name__ == "__main__": asyncio.run(main())
import asynciofrom skyvern_langchain.client import RunTaskrun_task = RunTask( api_key="<your_skyvern_api_key>")# Or load from SKYVERN_API_KEY environment variable:# run_task = RunTask()async def main(): result = await run_task.ainvoke( "Navigate to the Hacker News homepage and get the top 3 posts." ) print(result)if __name__ == "__main__": asyncio.run(main())
This agent dispatches a task, waits for completion, and returns results.
import asynciofrom dotenv import load_dotenvfrom langchain_openai import ChatOpenAIfrom langchain.agents import initialize_agent, AgentTypefrom skyvern_langchain.agent import DispatchTask, GetTaskfrom langchain_community.tools.sleep.tool import SleepToolload_dotenv()llm = ChatOpenAI(model="gpt-4o", temperature=0)dispatch_task = DispatchTask()get_task = GetTask()agent = initialize_agent( llm=llm, tools=[ dispatch_task, get_task, SleepTool(), ], verbose=True, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,)async def main(): prompt = ( "Run a task with Skyvern. The task is about 'Navigate to the Hacker News " "homepage and get the top 3 posts.' Then, get this task information until " "it's completed. The task information re-get interval should be 60s." ) result = await agent.ainvoke(prompt) print(result)if __name__ == "__main__": asyncio.run(main())
import asynciofrom dotenv import load_dotenvfrom langchain_openai import ChatOpenAIfrom langchain.agents import initialize_agent, AgentTypefrom skyvern_langchain.client import DispatchTask, GetTaskfrom langchain_community.tools.sleep.tool import SleepToolload_dotenv()llm = ChatOpenAI(model="gpt-4o", temperature=0)# Initialize with API keydispatch_task = DispatchTask( api_key="<your_skyvern_api_key>")get_task = GetTask( api_key="<your_skyvern_api_key>")# Or load from environment: DispatchTask() and GetTask()agent = initialize_agent( llm=llm, tools=[ dispatch_task, get_task, SleepTool(), ], verbose=True, agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,)async def main(): prompt = ( "Run a task with Skyvern. The task is about 'Navigate to the Hacker News " "homepage and get the top 3 posts.' Then, get this task information until " "it's completed. The task information re-get interval should be 60s." ) result = await agent.ainvoke(prompt) print(result)if __name__ == "__main__": asyncio.run(main())
from langchain.prompts import PromptTemplatefrom langchain.chains import LLMChainfrom skyvern_langchain.client import RunTaskrun_task = RunTask()async def submit_application(user_data): task_description = f""" Navigate to the job application form and fill it out with: - Name: {user_data['name']} - Email: {user_data['email']} - Phone: {user_data['phone']} - Resume: Upload from {user_data['resume_path']} Then submit the form. """ result = await run_task.ainvoke(task_description) return result
from langchain.agents import initialize_agentfrom skyvern_langchain.client import DispatchTask, GetTaskdispatch = DispatchTask(api_key="your_key")get = GetTask(api_key="your_key")agent = initialize_agent( llm=ChatOpenAI(model="gpt-4o"), tools=[dispatch, get, SleepTool()], agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,)result = agent.run( "Extract product data from these 3 e-commerce sites: [urls]. " "Wait for all tasks to complete and compile the results.")