Scalable trace upload is critical for production agent observability. This guide covers patterns for uploading traces efficiently, handling parent-child relationships, and managing large volumes.
Here’s a production-ready implementation for uploading traces to LangSmith:
upload_traces.py
"""Load traces.json, shift timestamps to now, regenerate IDs, and upload via RunTree."""import jsonfrom collections import defaultdictfrom datetime import datetime, timezonefrom dotenv import load_dotenvload_dotenv()from langsmith import Client, uuid7from langsmith.run_trees import RunTreedef parse_dt(s: str | None) -> datetime | None: """Parse ISO format datetime string.""" if s is None: return None dt = datetime.fromisoformat(s) if dt.tzinfo is not None: dt = dt.replace(tzinfo=None) return dtdef main(): import argparse parser = argparse.ArgumentParser() parser.add_argument("--project", default="default", help="Target project name") parser.add_argument("--input", default="synthetic_traces.json", help="Input file path") args = parser.parse_args() with open(args.input) as f: runs = json.load(f) print(f"Loaded {len(runs)} runs from {args.input}") # Calculate time shift so traces appear recent latest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"]) time_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest print(f"Shifting timestamps by: {time_delta}") # Build ID map (uuid7 for time-ordering) id_map = {} for run in runs: for field in ("id", "trace_id", "parent_run_id"): old_id = run.get(field) if old_id and old_id not in id_map: id_map[old_id] = str(uuid7()) # Group runs by trace and transform traces = defaultdict(list) for run in runs: traces[run["trace_id"]].append({ "id": id_map[run["id"]], "parent_run_id": id_map.get(run["parent_run_id"]), "name": run["name"], "run_type": run["run_type"], "inputs": run["inputs"], "outputs": run.get("outputs"), "error": run.get("error"), "extra": run.get("extra"), "tags": run.get("tags"), "start_time": parse_dt(run["start_time"]) + time_delta, "end_time": parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None, }) client = Client() print(f"Uploading {len(traces)} traces to project '{args.project}'...") for i, trace_runs in enumerate(traces.values()): # Sort by start_time, root first (no parent) trace_runs.sort(key=lambda r: (r["parent_run_id"] is not None, r["start_time"])) tree_map = {} root_tree = None for run in trace_runs: if run["parent_run_id"] is None: # Root run root_tree = RunTree( id=run["id"], name=run["name"], run_type=run["run_type"], inputs=run["inputs"], start_time=run["start_time"], extra=run.get("extra"), tags=run.get("tags"), project_name=args.project, client=client, ) tree_map[run["id"]] = root_tree else: # Child run parent = tree_map.get(run["parent_run_id"]) if parent: child = parent.create_child( name=run["name"], run_type=run["run_type"], run_id=run["id"], inputs=run["inputs"], start_time=run["start_time"], extra=run.get("extra"), tags=run.get("tags"), ) tree_map[run["id"]] = child # End all runs (children first) for run in reversed(trace_runs): tree = tree_map.get(run["id"]) if tree: tree.end(outputs=run.get("outputs"), error=run.get("error"), end_time=run["end_time"]) if root_tree: root_tree.post(exclude_child_runs=False) if (i + 1) % 10 == 0: print(f" Uploaded {i + 1}/{len(traces)} traces") # Wait for all background operations to complete print("Flushing...") client.flush() print("Done!")if __name__ == "__main__": main()
When uploading historical or synthetic traces, shift timestamps to make them appear recent:
# Find the latest timestamp in your datasetlatest = max(parse_dt(r["start_time"]) for r in runs if r["start_time"])# Calculate offset to current timetime_delta = datetime.now(timezone.utc).replace(tzinfo=None) - latest# Apply to all timestampsstart_time = parse_dt(run["start_time"]) + time_deltaend_time = parse_dt(run["end_time"]) + time_delta if run.get("end_time") else None
Ensure your datetime objects are timezone-aware or consistently naive. Mixing the two causes errors.
Preserve time-ordering while generating fresh IDs:
from langsmith import uuid7# Build bidirectional ID mapid_map = {}for run in runs: for field in ("id", "trace_id", "parent_run_id"): old_id = run.get(field) if old_id and old_id not in id_map: id_map[old_id] = str(uuid7()) # Time-ordered UUIDs# Remap all IDsfor run in runs: run["id"] = id_map[run["id"]] run["trace_id"] = id_map[run["trace_id"]] if run.get("parent_run_id"): run["parent_run_id"] = id_map[run["parent_run_id"]]
Why uuid7? Unlike uuid4, uuid7 preserves temporal ordering, making trace analysis and debugging easier.
Group runs by trace and sort to ensure parent runs are processed before children:
from collections import defaultdict# Group by trace_idtraces = defaultdict(list)for run in runs: traces[run["trace_id"]].append(run)# Sort each trace: root first, then by start_timefor trace_runs in traces.values(): trace_runs.sort(key=lambda r: ( r["parent_run_id"] is not None, # False (root) comes before True r["start_time"] ))
# Process in reverse to end children firstfor run in reversed(trace_runs): tree = tree_map.get(run["id"]) if tree: tree.end( outputs=run.get("outputs"), error=run.get("error"), end_time=run["end_time"] )
Post the complete trace tree and flush at the end:
if root_tree: root_tree.post(exclude_child_runs=False) # Include all children# After all traces uploadedclient.flush() # Critical: ensures all data is sent
Always call client.flush() before your script exits. Otherwise, traces may be lost due to background operations not completing.
import loggingfailed_traces = []for trace_runs in traces.values(): try: upload_trace(trace_runs, client) except Exception as e: logging.error(f"Failed to upload trace: {e}") failed_traces.append(trace_runs)# Write failed traces to disk for manual inspectionif failed_traces: with open("failed_traces.json", "w") as f: json.dump(failed_traces, f)