Installation
Install via pip:pip install jasonisnthappy
The package includes pre-built dynamic libraries for all supported platforms. The library is automatically loaded from the package directory.
Quick start
from jasonisnthappy import Database
# Open database
db = Database.open("./my_database.db")
try:
# Begin transaction
tx = db.begin_transaction()
try:
# Insert document
doc_id = tx.insert("users", {
"name": "Alice",
"age": 30,
"email": "alice@example.com"
})
print(f"Inserted document with ID: {doc_id}")
# Find by ID
user = tx.find_by_id("users", doc_id)
print(f"Found: {user}")
# Update
tx.update_by_id("users", doc_id, {
"name": "Alice",
"age": 31,
"email": "alice@example.com"
})
# Find all
all_users = tx.find_all("users")
print(f"All users: {all_users}")
# Commit transaction
tx.commit()
except Exception as e:
# Rollback on error
tx.rollback()
raise
finally:
db.close()
Context managers
The library supports Python context managers for automatic resource cleanup:with Database.open("./my_database.db") as db:
collections = db.list_collections()
print(f"Collections: {collections}")
Type hints
The library includes full type hints for better IDE support:from typing import Dict, Any, List, Optional
from jasonisnthappy import Database, Transaction, Collection
db: Database = Database.open("./my_database.db")
tx: Transaction = db.begin_transaction()
doc: Dict[str, Any] = {"name": "Alice", "age": 30}
doc_id: str = tx.insert("users", doc)
result: Optional[Dict[str, Any]] = tx.find_by_id("users", doc_id)
results: List[Dict[str, Any]] = tx.find_all("users")
API reference
Database
Opening databases
@staticmethod
def open(path: str) -> Database
@staticmethod
def open_with_options(path: str, options: CDatabaseOptions) -> Database
@staticmethod
def default_database_options() -> CDatabaseOptions
- Simple open
- With context manager
- With options
db = Database.open("./my_database.db")
try:
# Use database
pass
finally:
db.close()
with Database.open("./my_database.db") as db:
# Use database
pass
opts = Database.default_database_options()
opts.cache_size = 1024 * 1024 * 100 # 100MB cache
opts.read_only = False
db = Database.open_with_options("./my_database.db", opts)
Configuration
def set_transaction_config(config: CTransactionConfig) -> None
def get_transaction_config() -> CTransactionConfig
def set_auto_checkpoint_threshold(threshold: int) -> None
@staticmethod
def default_transaction_config() -> CTransactionConfig
Database info
def get_path() -> str
def is_read_only() -> bool
def max_bulk_operations() -> int
def max_document_size() -> int
def max_request_body_size() -> int
def list_collections() -> List[str]
def collection_stats(collection_name: str) -> Dict[str, Any]
def database_info() -> Dict[str, Any]
Transaction
Creating transactions
def begin_transaction() -> Transaction
CRUD operations
def insert(collection_name: str, doc: Dict[str, Any]) -> str
def find_by_id(collection_name: str, doc_id: str) -> Optional[Dict[str, Any]]
def update_by_id(collection_name: str, doc_id: str, doc: Dict[str, Any]) -> None
def delete_by_id(collection_name: str, doc_id: str) -> None
def find_all(collection_name: str) -> List[Dict[str, Any]]
def count(collection_name: str) -> int
Insert a document
doc_id = tx.insert("users", {
"name": "Alice",
"age": 30,
"email": "alice@example.com"
})
Find by ID
user = tx.find_by_id("users", doc_id)
if user:
print(f"Name: {user['name']}, Age: {user['age']}")
Transaction control
def commit() -> None
def rollback() -> None
def is_active() -> bool
Always commit or rollback transactions. Using the context manager is recommended to ensure proper cleanup.
Collection management
def create_collection(collection_name: str) -> None
def drop_collection(collection_name: str) -> None
def rename_collection(old_name: str, new_name: str) -> None
Collection
For non-transactional operations with auto-commit:def get_collection(name: str) -> Collection
Basic CRUD
def name() -> str
def insert(doc: Dict[str, Any]) -> str
def find_by_id(doc_id: str) -> Optional[Dict[str, Any]]
def update_by_id(doc_id: str, doc: Dict[str, Any]) -> None
def delete_by_id(doc_id: str) -> None
def find_all() -> List[Dict[str, Any]]
def count() -> int
Querying
def find(filter_str: str) -> List[Dict[str, Any]]
def find_one(filter_str: str) -> Optional[Dict[str, Any]]
def update(filter_str: str, update: Dict[str, Any]) -> int
def update_one(filter_str: str, update: Dict[str, Any]) -> bool
def delete(filter_str: str) -> int
def delete_one(filter_str: str) -> bool
adults = coll.find("age >= 18")
for user in adults:
print(f"{user['name']} is {user['age']} years old")
Bulk operations
def insert_many(docs: List[Dict[str, Any]]) -> List[str]
def upsert_by_id(doc_id: str, doc: Dict[str, Any]) -> UpsertResult
def upsert(filter_str: str, doc: Dict[str, Any]) -> UpsertResult
def bulk_write(operations: List[Dict[str, Any]], ordered: bool = True) -> Dict[str, Any]
Advanced operations
def distinct(field: str) -> List[Any]
def count_distinct(field: str) -> int
def search(query: str) -> List[Dict[str, Any]]
def count_with_query(filter_str: str) -> int
def aggregate(pipeline: List[Dict[str, Any]]) -> List[Dict[str, Any]]
Indexing
def create_index(
collection_name: str,
index_name: str,
field: str,
unique: bool = False
) -> None
def create_compound_index(
collection_name: str,
index_name: str,
fields: List[str],
unique: bool = False
) -> None
def create_text_index(
collection_name: str,
index_name: str,
field: str
) -> None
def drop_index(collection_name: str, index_name: str) -> None
def list_indexes(collection_name: str) -> List[Dict[str, Any]]
Schema validation
def set_schema(collection_name: str, schema: Dict[str, Any]) -> None
def get_schema(collection_name: str) -> Optional[Dict[str, Any]]
def remove_schema(collection_name: str) -> None
schema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"},
"email": {"type": "string"}
},
"required": ["name", "email"]
}
db.set_schema("users", schema)
Maintenance
def checkpoint() -> None
def backup(dest_path: str) -> None
def verify_backup(backup_path: str) -> Dict[str, Any]
def garbage_collect() -> Dict[str, Any]
def metrics() -> Dict[str, Any]
def frame_count() -> int
Web UI
def start_web_ui(addr: str) -> WebServer
class WebServer:
def stop() -> None
server = db.start_web_ui("127.0.0.1:8080")
print("Web UI available at http://127.0.0.1:8080")
# Later, stop the server
server.stop()
Complete example
from jasonisnthappy import Database
def main():
# Use context managers for automatic cleanup
with Database.open("./my_app.db") as db:
# Create an index for faster lookups
db.create_index("users", "idx_email", "email", unique=True)
# Set a schema for validation
db.set_schema("users", {
"type": "object",
"properties": {
"name": {"type": "string"},
"age": {"type": "number"},
"email": {"type": "string"}
},
"required": ["name", "email"]
})
# Use collections for auto-commit operations
users = db.get_collection("users")
try:
# Insert multiple documents
ids = users.insert_many([
{"name": "Alice", "age": 30, "email": "alice@example.com"},
{"name": "Bob", "age": 25, "email": "bob@example.com"},
{"name": "Charlie", "age": 35, "email": "charlie@example.com"},
])
print(f"Inserted {len(ids)} documents")
# Query with filter
adults = users.find("age >= 30")
print(f"Found {len(adults)} adults")
# Update with filter
count = users.update(
"name == 'Alice'",
{"age": 31}
)
print(f"Updated {count} documents")
# Get distinct values
ages = users.distinct("age")
print(f"Distinct ages: {ages}")
# Run aggregation
pipeline = [
{"match": "age >= 25"},
{"group_by": "age"},
{"count": "total"},
{"sort": {"field": "total", "asc": False}}
]
results = users.aggregate(pipeline)
print(f"Aggregation results: {results}")
finally:
users.close()
# Get database info
info = db.database_info()
print(f"Total documents: {info['total_documents']}")
print(f"Collections: {info['collections']}")
if __name__ == "__main__":
main()
Error handling
All operations raiseRuntimeError on failure:
try:
db = Database.open("./my_database.db")
tx = db.begin_transaction()
doc_id = tx.insert("users", {"name": "Alice"})
tx.commit()
except RuntimeError as e:
print(f"Database error: {e}")
if tx:
tx.rollback()
finally:
if db:
db.close()
Platform support
Pre-built binaries are provided for:- macOS (Intel and Apple Silicon)
- Linux (x86_64, ARM64)
- Windows (x86_64)
Data classes
from dataclasses import dataclass
@dataclass
class UpsertResult:
id: str
inserted: bool # True if inserted, False if updated