Documentation Index
Fetch the complete documentation index at: https://mintlify.com/karilaa-dev/tt-bot/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The database module provides SQLAlchemy-based data persistence with async support. It uses a repository pattern through db_service.py to abstract database operations.
Stack:
- SQLAlchemy 2.0 (async)
- PostgreSQL (via asyncpg)
- Declarative models
Architecture
Module Structure
data/
├── database.py # Engine, session, initialization
├── db_service.py # Repository functions
└── models/
├── __init__.py # Model exports
├── users.py # Users model
├── video.py # Video model
└── music.py # Music model
Initialization Flow
from data.database import initialize_database_components, init_db
# 1. Initialize engine and session factory (once at startup)
initialize_database_components(db_url)
# 2. Create tables
await init_db()
# 3. Use db_service functions for all operations
from data.db_service import get_user, create_user
user = await get_user(user_id)
database.py
Location: data/database.py
Core database engine and session management.
Global Variables
engine = None # SQLAlchemy async engine
async_session = None # Session factory
Base = declarative_base() # Model base class
initialize_database_components()
Location: data/database.py:17
CRITICAL: Must be called before any database operations.
def initialize_database_components(db_url: str):
"""
Initialize database engine and session factory.
Call this function once during application startup before any database operations.
Must be called before using init_db(), get_db(), or get_session().
Args:
db_url: Database connection URL string
Raises:
RuntimeError: If called multiple times (will overwrite existing globals)
"""
global engine, async_session
engine_url = get_async_db_url(db_url)
engine = create_async_engine(engine_url, echo=False)
async_session = sessionmaker(
engine, class_=AsyncSession, expire_on_commit=False
)
Example:
from data.database import initialize_database_components
from data.config import config
# In main.py or startup
db_url = config["database"]["url"]
initialize_database_components(db_url)
init_db()
Location: data/database.py:51
Creates all database tables.
async def init_db():
# Ensure engine is initialized before calling this
if engine is None:
raise RuntimeError(
"Database engine not initialized. Call initialize_database_components first."
)
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
Raises: RuntimeError if initialize_database_components() not called
get_session()
Location: data/database.py:71
Creates a database session (used internally by db_service).
async def get_session() -> AsyncSession:
# Ensure async_session is initialized
if async_session is None:
raise RuntimeError(
"Database session factory not initialized. Call initialize_database_components first."
)
async with async_session() as session:
return session
Returns: AsyncSession instance
Models
Location: data/models/
Users Model
Location: data/models/users.py
Stores user/chat information and settings.
from sqlalchemy import Column, String, BigInteger, Boolean, Integer
from data.database import Base
class Users(Base):
__tablename__ = "users"
user_id = Column(BigInteger, primary_key=True, unique=True, nullable=False)
registered_at = Column(BigInteger, nullable=True) # Unix timestamp
lang = Column(String, default='en', nullable=False)
link = Column(String, nullable=True) # Referral link
file_mode = Column(Boolean, default=False, nullable=False)
ad_count = Column(Integer, default=0, nullable=False)
ad_cooldown = Column(BigInteger, default=0, nullable=False) # Unix timestamp
Fields:
user_id - Telegram user/chat ID (primary key)
registered_at - Registration timestamp
lang - Language code (en, ru, es, etc.)
link - Referral link parameter
file_mode - Send as file (True) or media (False)
ad_count - Number of downloads since last ad
ad_cooldown - Timestamp when ad cooldown expires
Video Model
Location: data/models/video.py
Records video/slideshow downloads for analytics.
from sqlalchemy import Column, String, BigInteger, Boolean, ForeignKey
from data.database import Base
class Video(Base):
__tablename__ = "videos"
pk_id = Column(BigInteger, primary_key=True, autoincrement=True, nullable=False)
user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
downloaded_at = Column(BigInteger, nullable=True) # Unix timestamp
video_link = Column(String, nullable=False)
is_images = Column(Boolean, default=False, nullable=False) # Slideshow?
is_processed = Column(Boolean, default=False, nullable=False) # Image converted?
is_inline = Column(Boolean, default=False, nullable=False) # Inline query?
Fields:
pk_id - Auto-incrementing primary key
user_id - Foreign key to Users.user_id
downloaded_at - Download timestamp
video_link - TikTok/Instagram URL
is_images - True for slideshows, False for videos
is_processed - True if images were converted/optimized
is_inline - True if downloaded via inline query
Music Model
Location: data/models/music.py
Records music/audio extractions.
from sqlalchemy import Column, BigInteger, ForeignKey
from data.database import Base
class Music(Base):
__tablename__ = "music"
pk_id = Column(BigInteger, primary_key=True, autoincrement=True, nullable=False)
user_id = Column(BigInteger, ForeignKey("users.user_id"), nullable=False)
downloaded_at = Column(BigInteger, nullable=True) # Unix timestamp
video_id = Column(BigInteger, nullable=False) # TikTok video ID
Fields:
pk_id - Auto-incrementing primary key
user_id - Foreign key to Users.user_id
downloaded_at - Download timestamp
video_id - TikTok video ID
db_service.py
Location: data/db_service.py
Repository pattern implementation - all database operations go through these functions.
User Management
get_user()
Location: data/db_service.py:11
Retrieves user by ID.
async def get_user(user_id: int) -> Optional[Users]:
async with await get_session() as db:
stmt = select(Users).where(Users.user_id == user_id)
result = await db.execute(stmt)
return result.scalar_one_or_none()
Returns: Users object or None
create_user()
Location: data/db_service.py:18
Creates new user record.
async def create_user(user_id: int, lang: str, link: Optional[str] = None) -> Users:
async with await get_session() as db:
time_now = int(datetime.now().timestamp())
user = Users(
user_id=user_id,
registered_at=time_now,
lang=lang,
link=link,
ad_count=0,
ad_cooldown=0,
)
db.add(user)
await db.commit()
return user
Parameters:
user_id - Telegram user/chat ID
lang - Language code
link - Optional referral link parameter
Returns: Created Users object
update_user_mode()
Location: data/db_service.py:34
Toggles file mode setting.
async def update_user_mode(user_id: int, file_mode: bool) -> None:
async with await get_session() as db:
stmt = update(Users).where(Users.user_id == user_id).values(file_mode=file_mode)
await db.execute(stmt)
await db.commit()
update_user_lang()
Location: data/db_service.py:159
Updates user language.
async def update_user_lang(user_id: int, lang: str) -> None:
async with await get_session() as db:
stmt = update(Users).where(Users.user_id == user_id).values(lang=lang)
await db.execute(stmt)
await db.commit()
get_user_settings()
Location: data/db_service.py:133
Retrieves user language and file mode.
async def get_user_settings(user_id: int) -> Optional[Tuple[str, bool]]:
async with await get_session() as db:
stmt = select(Users.lang, Users.file_mode).where(Users.user_id == user_id)
result = await db.execute(stmt)
user = result.first()
if user:
return user[0], bool(user[1]) # (lang, file_mode)
return None
Returns: (lang, file_mode) tuple or None
Example:
from data.db_service import get_user_settings
settings = await get_user_settings(message.chat.id)
if settings:
lang, file_mode = settings
else:
# New user - register them
lang = await lang_func(chat_id, message.from_user.language_code)
file_mode = False
Video/Download Tracking
add_video()
Location: data/db_service.py:143
Records video/slideshow download.
async def add_video(
user_id: int,
video_link: str,
is_images: bool,
is_processed: bool = False,
is_inline: bool = False
) -> None:
async with await get_session() as db:
video = Video(
user_id=user_id,
downloaded_at=int(datetime.now().timestamp()),
video_link=video_link,
is_images=is_images,
is_processed=is_processed,
is_inline=is_inline
)
db.add(video)
await db.commit()
Parameters:
user_id - Telegram user/chat ID
video_link - TikTok/Instagram URL
is_images - True for slideshows
is_processed - True if images were converted
is_inline - True if inline query
add_music()
Location: data/db_service.py:152
Records music extraction.
async def add_music(user_id: int, video_id: int) -> None:
async with await get_session() as db:
music = Music(
user_id=user_id,
downloaded_at=int(datetime.now().timestamp()),
video_id=video_id
)
db.add(music)
await db.commit()
Statistics
get_user_stats()
Location: data/db_service.py:41
Retrieves user statistics.
async def get_user_stats(user_id: int) -> Tuple[Optional[Users], int, int]:
async with await get_session() as db:
# Get user
stmt = select(Users).where(Users.user_id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
return None, 0, 0
# Get video count
stmt = select(func.count(Video.video_link)).where(Video.user_id == user_id)
result = await db.execute(stmt)
videos_count = result.scalar()
# Get images count
stmt = select(func.count(Video.video_link)).where(
Video.user_id == user_id, Video.is_images == True
)
result = await db.execute(stmt)
images_count = result.scalar()
return user, videos_count, images_count
Returns: (user, videos_count, images_count) tuple
get_stats_by_period()
Location: data/db_service.py:112
Retrieves downloads within time period.
async def get_stats_by_period(
period: int = 0,
chat_type: str = 'all'
) -> List[Tuple[int, str]]:
# Returns [(timestamp, video_link), ...]
# period: seconds (0 = all time)
# chat_type: 'all', 'users' (>0), 'groups' (<0)
Parameters:
period - Time period in seconds (0 for all time)
chat_type - Filter by chat type: ‘all’, ‘users’, ‘groups’
Returns: List of (downloaded_at, video_link) tuples
get_referral_stats()
Location: data/db_service.py:70
Top 10 referral links.
async def get_referral_stats() -> List[Tuple[str, int]]:
# Returns [(link, count), ...]
Returns: List of (link, count) tuples (top 10)
get_other_stats()
Location: data/db_service.py:83
Miscellaneous statistics.
async def get_other_stats() -> Tuple[int, List[Tuple[str, int]], List[Tuple[int, int]]]:
# Returns (file_mode_count, top_langs, top_users)
Returns: (file_mode_count, top_langs, top_users) tuple
file_mode_count - Users with file mode enabled
top_langs - [(lang, count), ...]
top_users - [(user_id, download_count), ...] (top 10)
Advertisement Tracking
should_show_ad()
Location: data/db_service.py:197
Checks if ad should be shown to user.
async def should_show_ad(user_id: int) -> bool:
async with await get_session() as db:
stmt = select(Users.ad_count, Users.ad_cooldown).where(Users.user_id == user_id)
result = await db.execute(stmt)
ad_count, ad_cooldown = result.first()
# Show ad if 3+ downloads OR cooldown expired
return ad_count >= 3 or ad_cooldown < int(datetime.now().timestamp())
Returns: True if ad should be shown
record_ad_show()
Location: data/db_service.py:175
Records ad display and resets counter.
async def record_ad_show(user_id: int) -> None:
async with await get_session() as db:
ad_cooldown = int(datetime.now().timestamp()) + 86400 # 24 hours
await db.execute(
update(Users)
.where(Users.user_id == user_id)
.values(ad_count=1, ad_cooldown=ad_cooldown)
)
await db.commit()
Effect: Resets ad_count to 1, sets 24-hour cooldown
increase_ad_count()
Location: data/db_service.py:186
Increments ad counter.
async def increase_ad_count(user_id: int) -> None:
async with await get_session() as db:
stmt = select(Users.ad_count).where(Users.user_id == user_id)
result = await db.execute(stmt)
ad_count = result.scalar() or 0
await db.execute(
update(Users).where(Users.user_id == user_id).values(ad_count=ad_count + 1)
)
await db.commit()
Admin Functions
get_user_ids()
Location: data/db_service.py:166
Retrieves all user IDs (for broadcasts).
async def get_user_ids(only_positive: bool = True) -> List[int]:
async with await get_session() as db:
stmt = select(Users.user_id)
if only_positive:
stmt = stmt.where(Users.user_id > 0) # Only users, not groups
result = await db.execute(stmt)
return [id[0] for id in result.all()]
Parameters:
only_positive - If True, exclude groups (negative IDs)
Returns: List of user IDs
get_user_videos()
Location: data/db_service.py:63
Retrieves all videos for a user.
async def get_user_videos(user_id: int) -> List[Tuple[int, str]]:
async with await get_session() as db:
stmt = select(Video.downloaded_at, Video.video_link).where(Video.user_id == user_id)
result = await db.execute(stmt)
return result.all()
Returns: List of (downloaded_at, video_link) tuples
Usage Examples
User Registration Flow
from data.db_service import get_user_settings, create_user
from misc.utils import lang_func, start_manager
settings = await get_user_settings(message.chat.id)
if not settings:
# New user - register
lang = await lang_func(message.chat.id, message.from_user.language_code, True)
file_mode = False
await start_manager(message.chat.id, message, lang)
else:
# Existing user
lang, file_mode = settings
Download Tracking
from data.db_service import add_video
# After successful download
try:
await add_video(
user_id=message.chat.id,
video_link=video_link,
is_images=video_info.is_slideshow,
is_processed=was_processed,
is_inline=False,
)
logger.info(f"Video Download: CHAT {message.chat.id} - VIDEO {video_link}")
except Exception as e:
logger.error(f"Can't write into database: {e}")
Advertisement Logic
from data.db_service import should_show_ad, record_ad_show, increase_ad_count
if not group_chat:
if await should_show_ad(message.chat.id):
# Show ad
await record_ad_show(message.chat.id)
await message.answer(
locale[lang]["ad_support"],
reply_markup=ad_button,
)
else:
# Increment counter
await increase_ad_count(message.chat.id)
Best Practices
- Always call
initialize_database_components() before any DB operations
- Use
db_service functions - never query models directly
- Handle
None returns - users may not exist
- Use transactions -
db_service functions auto-commit
- Log database errors - don’t let DB issues crash handlers
- Use Unix timestamps - consistent timezone handling
- Validate user_id - check for existence before operations
- Clean up sessions -
async with handles this automatically