The sardis-adk package provides Google Agent Development Kit (ADK) integration for Sardis, enabling Gemini-powered agents to make policy-enforced payments autonomously.
import osfrom google.adk import Agent, Runnerfrom google.adk.sessions import InMemorySessionServicefrom sardis_adk import SardisToolkit# Create toolkittoolkit = SardisToolkit( api_key=os.environ["SARDIS_API_KEY"], wallet_id=os.environ["SARDIS_WALLET_ID"],)# Get ADK toolssardis_tools = toolkit.get_tools()# Create agentagent = Agent( name="procurement-agent", model="gemini-2.0-flash", description="A procurement agent that purchases software and API credits.", instruction=( "You are a procurement agent with a Sardis wallet for making payments. " "Before making any payment:\n" "1. Check the wallet balance\n" "2. Check the policy to verify the payment would be allowed\n" "3. Execute the payment with a clear purpose\n" "4. Report the result including transaction ID" ), tools=sardis_tools,)# Run agentasync def main(): session_service = InMemorySessionService() runner = Runner( agent=agent, app_name="sardis-demo", session_service=session_service, ) session = await session_service.create_session( app_name="sardis-demo", user_id="demo-user", ) from google.genai import types user_message = types.Content( role="user", parts=[types.Part(text="Pay $25 to Anthropic for API credits")], ) async for event in runner.run_async( user_id="demo-user", session_id=session.id, new_message=user_message, ): if event.is_final_response(): for part in event.content.parts: if part.text: print(f"Agent: {part.text}")import asyncioasyncio.run(main())
import osfrom google.adk import Agent, Runnerfrom google.adk.sessions import InMemorySessionServicefrom sardis_adk import SardisToolkit# Create the toolkittoolkit = SardisToolkit( api_key=os.environ.get("SARDIS_API_KEY", "sk_test_demo"), wallet_id=os.environ.get("SARDIS_WALLET_ID", "wallet_demo"),)# Get ADK toolssardis_tools = toolkit.get_tools()# Create agentagent = Agent( name="procurement-agent", model="gemini-2.0-flash", description="A procurement agent that purchases software and API credits.", instruction=( "You are a procurement agent with a Sardis wallet for making payments. " "Before making any payment:\n" "1. Check the wallet balance\n" "2. Check the policy to verify payment is allowed\n" "3. Execute the payment with a clear purpose\n" "4. Report the transaction ID\n\n" "Always explain your reasoning before taking action." ), tools=sardis_tools,)async def main(): session_service = InMemorySessionService() runner = Runner( agent=agent, app_name="sardis-demo", session_service=session_service, ) session = await session_service.create_session( app_name="sardis-demo", user_id="demo-user", ) task = ( "I need to purchase $25 of Anthropic API credits for our research " "project. Check the balance first and make sure our policy allows it." ) from google.genai import types user_message = types.Content( role="user", parts=[types.Part(text=task)], ) async for event in runner.run_async( user_id="demo-user", session_id=session.id, new_message=user_message, ): if event.is_final_response(): for part in event.content.parts: if part.text: print(f"\nAgent: {part.text}")if __name__ == "__main__": import asyncio asyncio.run(main())
try: async for event in runner.run_async( user_id="user", session_id=session.id, new_message=user_message, ): if event.is_final_response(): for part in event.content.parts: if part.text: print(part.text)except Exception as e: print(f"Error: {e}")