Documentation Index
Fetch the complete documentation index at: https://mintlify.com/karilaa-dev/tt-bot/llms.txt
Use this file to discover all available pages before exploring further.
TT-Bot implements a sophisticated queue management system to control concurrent operations and prevent resource exhaustion.
Overview
The QueueManager singleton provides per-user concurrency limiting for video info extraction:
# From misc/queue_manager.py:15
class QueueManager:
"""
Singleton queue manager for controlling concurrent operations.
Features:
- Per-user tracking for info queue (limits concurrent requests per user)
- Bypass option for inline downloads
"""
Architecture
Singleton pattern
The queue manager uses a singleton to ensure only one instance exists:
# From misc/queue_manager.py:54
@classmethod
def get_instance(cls) -> QueueManager:
"""Get or create the singleton instance."""
if cls._instance is None:
queue_config = config["queue"]
cls._instance = cls(
max_user_queue=queue_config["max_user_queue_size"],
)
return cls._instance
Per-user tracking
The manager tracks how many requests each user has in the queue:
# From misc/queue_manager.py:40
def __init__(self, max_user_queue: int):
"""
Initialize the queue manager.
Args:
max_user_queue: Maximum videos per user in info queue
"""
self.max_user_queue = max_user_queue
self._user_info_counts: dict[int, int] = {}
self._lock = asyncio.Lock()
Configuration
Queue limits are configured in .env:
This limits each user to 3 concurrent video info extractions. Set to 0 for unlimited.
# From misc/queue_manager.py:91
max_user_queue_size: int
Usage
Basic usage with context manager
# From handlers/get_video.py:153
queue = QueueManager.get_instance()
# Acquire info queue slot with per-user limit
async with queue.info_queue(message.chat.id) as acquired:
if not acquired:
# User limit exceeded
if status_message:
await status_message.delete()
if not group_chat:
await message.reply(
locale[lang]["error_queue_full"].format(
queue.get_user_queue_count(message.chat.id)
),
reply_markup=try_again_button(lang),
)
return
# Fetch video info
video_info = await api.video(video_link)
Pre-check before acquiring
The handler checks queue limits before starting processing:
# From handlers/get_video.py:131
# Check per-user queue limit before proceeding (0 = no limit)
max_queue = queue_config["max_user_queue_size"]
if max_queue > 0:
user_queue_count = queue.get_user_queue_count(message.chat.id)
if user_queue_count >= max_queue:
if not group_chat:
await message.reply(
locale[lang]["error_queue_full"].format(user_queue_count),
reply_markup=try_again_button(lang),
)
return
This provides instant feedback before showing processing status.
Bypass for inline mode
Inline downloads can bypass per-user limits:
# From misc/queue_manager.py:121
@asynccontextmanager
async def info_queue(
self, user_id: int, bypass_user_limit: bool = False
) -> AsyncGenerator[bool, None]:
"""
Context manager for info queue with per-user limiting.
Args:
user_id: Telegram user/chat ID
bypass_user_limit: If True, skip per-user limit check (for inline)
"""
Usage:
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
# Always acquires for inline mode
video_info = await api.video_with_retry(...)
Slot acquisition
Acquire logic
# From misc/queue_manager.py:72
async def acquire_info_for_user(
self, user_id: int, bypass_user_limit: bool = False
) -> bool:
"""
Acquire info slot for a user.
Returns:
True if acquired successfully, False if user limit exceeded
"""
async with self._lock:
if not bypass_user_limit and self.max_user_queue > 0:
current_count = self._user_info_counts.get(user_id, 0)
if current_count >= self.max_user_queue:
logger.debug(
f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue"
)
return False
# Increment user count
self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1
Release logic
# From misc/queue_manager.py:103
async def release_info_for_user(self, user_id: int) -> None:
"""Release info slot for a user.
This method is async to properly acquire the lock and prevent
race conditions when multiple coroutines release concurrently.
"""
async with self._lock:
if user_id in self._user_info_counts:
self._user_info_counts[user_id] -= 1
if self._user_info_counts[user_id] <= 0:
del self._user_info_counts[user_id]
The release is async to prevent race conditions during concurrent releases.
Queue monitoring
Get user queue count
# From misc/queue_manager.py:68
def get_user_queue_count(self, user_id: int) -> int:
"""Get current queue count for a user."""
return self._user_info_counts.get(user_id, 0)
Get active users count
# From misc/queue_manager.py:148
@property
def active_users_count(self) -> int:
"""Number of users currently with items in the info queue."""
return len(self._user_info_counts)
Error messages
When queue is full, users see a localized message with retry button:
# From handlers/get_video.py:32
def try_again_button(lang: str):
"""Create a 'Try Again' button for queue full error."""
keyb = InlineKeyboardBuilder()
keyb.button(
text=locale[lang]["try_again_button"],
callback_data=RETRY_CALLBACK_PREFIX,
)
return keyb.as_markup()
The retry button re-processes the original message:
# From handlers/get_video.py:322
@video_router.callback_query(F.data == RETRY_CALLBACK_PREFIX)
async def handle_retry_callback(callback: CallbackQuery):
"""Handle 'Try Again' button click for queue full error."""
# Get the original message that contains the TikTok link
original_message = callback.message.reply_to_message
if not original_message or not original_message.text:
await callback.answer("Original message not found", show_alert=True)
return
# Delete the error message with the button
try:
if hasattr(callback.message, "delete"):
await callback.message.delete()
except TelegramBadRequest:
logging.debug("Retry button message already deleted")
# Re-process the original message
await send_tiktok_video(original_message)
Thread safety
All queue operations are protected by an async lock:
# From misc/queue_manager.py:49
self._lock = asyncio.Lock()
This ensures:
- No race conditions during concurrent acquires/releases
- Accurate count tracking
- Safe dictionary modifications
Queue vs. no global queue
Important: The bot only limits concurrent info extractions per user. There is no global send queue:
# From handlers/get_video.py:199
# Send video/images (no global send queue - per-user limit only)
if video_info.is_slideshow: # Process images
This design:
- Prevents one user from blocking others during slow extractions
- Allows unlimited parallel sends (limited by Telegram rate limits)
- Balances resource usage with responsiveness
Testing support
The singleton can be reset for testing:
# From misc/queue_manager.py:64
@classmethod
def reset_instance(cls) -> None:
"""Reset the singleton instance (useful for testing)."""
cls._instance = None
Logging
Queue operations are logged at DEBUG level:
logger.debug(
f"User {user_id} acquired info slot "
f"(user_count={self._user_info_counts.get(user_id, 0)})"
)
logger.debug(
f"User {user_id} released info slot "
f"(user_count={self._user_info_counts.get(user_id, 0)})"
)
Enable DEBUG logging to monitor queue behavior: