Skip to main content

Documentation Index

Fetch the complete documentation index at: https://mintlify.com/skyrobot804/node_v1/llms.txt

Use this file to discover all available pages before exploring further.

ImageWatcher uses the watchdog library to monitor a directory for new FITS files using OS-native filesystem events — inotify on Linux, FSEvents on macOS, and kqueue on BSD/macOS. Because it registers with the operating system’s event subsystem rather than polling the directory on a timer, it incurs essentially zero CPU overhead while idle and detects new files with sub-second latency. A configurable debounce window lets partially written files finish before the callback fires.

Constructor

from image_watcher import ImageWatcher

watcher = ImageWatcher(
    watch_path: str,
    callback: Callable[[dict], None],
    debounce_delay: float = 2.0,
)
Creates an ImageWatcher instance but does not start file monitoring. Call start() to activate the OS observer thread.
watch_path
str
required
Absolute or relative path to the directory to monitor. Must be an existing directory at the time start() is called (checked with os.path.isdir). Subdirectories are not watched — monitoring is non-recursive.
callback
Callable[[dict], None]
required
Callable invoked once per stable new FITS file. Receives a single dict argument; see Callback Event Dict for the shape. Exceptions raised inside the callback are caught and logged at ERROR level — they do not stop the watcher.
debounce_delay
float
default:"2.0"
Time in seconds to wait after the last filesystem event for a given path before invoking the callback. Any new event for the same path during the debounce window resets the timer. Increase this value for slow storage (e.g. NFS mounts or USB drives) where large FITS files take longer to finish writing.

start()

watcher.start() -> None
Starts the OS observer thread and begins monitoring watch_path for filesystem events. If watch_path does not exist at call time, the error is logged and the method returns without raising — the watcher is left in a non-running state. The internal watchdog.observers.Observer thread is set as a daemon thread so it does not block process exit if stop() is not called explicitly.

stop()

watcher.stop() -> None
Performs an ordered shutdown:
  1. Sets _running = False so any pending timer callbacks that fire after this point are no-ops.
  2. Cancels all pending debounce threading.Timer objects and clears the timers dict.
  3. Calls observer.stop() to signal the watchdog thread to exit.
  4. Calls observer.join() to wait for the thread to finish cleanly.
After stop() returns, no further callbacks will be invoked. It is safe to call stop() more than once.

Callback Event Dict

The single argument passed to callback on each stable FITS detection.
path
str
Absolute path to the detected FITS file, e.g. "/mnt/seestar/image_20250601_023412.fits".
header
dict
FITS primary HDU header as a plain Python dict, with COMMENT and HISTORY keywords stripped and string values whitespace-stripped. Returns {} if the header could not be read (e.g. corrupt file or permission error). Typical keys include OBJECT, EXPTIME, DATE-OBS, RA, DEC, IMAGETYP, FILTER.
size_kb
float
File size in kilobytes at the moment the callback fires, from os.path.getsize(path) / 1024.0.

File Types

Only files with extensions .fits or .fit (case-insensitive) trigger debounce timers and ultimately the callback. All other files created or moved into watch_path are silently ignored by the _schedule() filter.

Debounce Behavior

The watcher handles two watchdog event types:
  • on_created — fires when a new file is created in the watched directory.
  • on_moved — fires when a file is renamed or moved into the watched directory (e.g. an atomic rename(tmp, final.fits) completion).
Both event types call _schedule(path), which:
  1. Cancels any existing threading.Timer for that path.
  2. Starts a new timer for debounce_delay seconds.
  3. When the timer expires without being reset, _fire(path) is called.
_fire() verifies the file still exists (os.path.isfile), reads the FITS header, measures the file size, and invokes the callback. This means the callback always receives a complete, stable file regardless of how long the camera took to finish writing it.

Usage Example

import logging
from image_watcher import ImageWatcher
from photometry import run_pipeline
from aavso_submission import submit

logging.basicConfig(level=logging.INFO)

# Load your config (e.g. from config.yaml)
import yaml
with open("config.yaml") as f:
    config = yaml.safe_load(f)


def on_new_image(event: dict) -> None:
    path   = event["path"]
    header = event["header"]
    size   = event["size_kb"]

    print(f"New FITS: {path}  ({size:.1f} KB)  OBJECT={header.get('OBJECT', '?')}")

    result = run_pipeline(path, config)
    if result is None:
        print("  Pipeline returned None — skipping submission")
        return

    print(f"  mag={result['magnitude']:.3f} ± {result['uncertainty']:.3f}  "
          f"quality={result['quality_flag']}")

    sub = submit(result, config)
    print(f"  Submission status: {sub['status']}  message: {sub['message']}")


watcher = ImageWatcher(
    watch_path="/mnt/seestar/output",
    callback=on_new_image,
    debounce_delay=3.0,   # wait 3 s after the last event before firing
)

watcher.start()
print("Watching for new FITS files. Press Ctrl+C to stop.")

try:
    import time
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    pass
finally:
    watcher.stop()
    print("Watcher stopped.")

Build docs developers (and LLMs) love