Documentation Index
Fetch the complete documentation index at: https://mintlify.com/karilaa-dev/tt-bot/llms.txt
Use this file to discover all available pages before exploring further.
Overview
The QueueManager class controls concurrent operations with per-user limiting. It prevents server overload and ensures fair resource allocation across users.
Location: misc/queue_manager.py
Architecture
QueueManager is a singleton that tracks active requests per user. It uses asyncio locks for thread-safe operation and provides context managers for automatic resource cleanup.
Design Pattern
- Singleton: One instance shared across the application
- Context Manager: Automatic acquisition and release
- Per-User Tracking: Independent limits per user/chat
Class Definition
class QueueManager:
"""Singleton queue manager for controlling concurrent operations."""
_instance: QueueManager | None = None
def __init__(self, max_user_queue: int):
self.max_user_queue = max_user_queue
self._user_info_counts: dict[int, int] = {}
self._lock = asyncio.Lock()
Attributes:
max_user_queue - Maximum concurrent requests per user
_user_info_counts - Dict mapping user_id to active request count
_lock - Asyncio lock for thread-safe operations
Methods
get_instance()
Location: misc/queue_manager.py:54
Gets or creates the singleton instance.
@classmethod
def get_instance(cls) -> QueueManager:
"""Get or create the singleton instance."""
if cls._instance is None:
queue_config = config["queue"]
cls._instance = cls(
max_user_queue=queue_config["max_user_queue_size"],
)
return cls._instance
Returns: QueueManager singleton instance
Example:
from misc.queue_manager import QueueManager
queue = QueueManager.get_instance()
reset_instance()
Location: misc/queue_manager.py:64
Resets the singleton instance (useful for testing).
@classmethod
def reset_instance(cls) -> None:
"""Reset the singleton instance (useful for testing)."""
cls._instance = None
get_user_queue_count()
Location: misc/queue_manager.py:68
Gets current active request count for a user.
def get_user_queue_count(self, user_id: int) -> int:
"""Get current queue count for a user."""
return self._user_info_counts.get(user_id, 0)
Parameters:
user_id - Telegram user/chat ID
Returns: Number of active requests for user
Example:
queue = QueueManager.get_instance()
count = queue.get_user_queue_count(message.chat.id)
print(f"User has {count} active requests")
acquire_info_for_user()
Location: misc/queue_manager.py:72
Acquires a queue slot for a user.
async def acquire_info_for_user(
self, user_id: int, bypass_user_limit: bool = False
) -> bool:
"""
Acquire info slot for a user.
Args:
user_id: Telegram user/chat ID
bypass_user_limit: If True, skip per-user limit check (for inline)
Returns:
True if acquired successfully, False if user limit exceeded
"""
async with self._lock:
if not bypass_user_limit and self.max_user_queue > 0:
current_count = self._user_info_counts.get(user_id, 0)
if current_count >= self.max_user_queue:
logger.debug(
f"User {user_id} rejected: {current_count}/{self.max_user_queue} in queue"
)
return False
# Increment user count
self._user_info_counts[user_id] = self._user_info_counts.get(user_id, 0) + 1
return True
Parameters:
user_id - Telegram user/chat ID
bypass_user_limit - If True, skip limit check (for inline downloads)
Returns: True if slot acquired, False if limit exceeded
Example:
queue = QueueManager.get_instance()
# Normal acquisition (with limit check)
acquired = await queue.acquire_info_for_user(user_id)
if not acquired:
await message.reply("Queue full, please wait...")
return
# Bypass limit (for inline downloads)
acquired = await queue.acquire_info_for_user(user_id, bypass_user_limit=True)
release_info_for_user()
Location: misc/queue_manager.py:103
Releases a queue slot for a user.
async def release_info_for_user(self, user_id: int) -> None:
"""Release info slot for a user.
This method is async to properly acquire the lock and prevent
race conditions when multiple coroutines release concurrently.
"""
async with self._lock:
if user_id in self._user_info_counts:
self._user_info_counts[user_id] -= 1
if self._user_info_counts[user_id] <= 0:
del self._user_info_counts[user_id]
Parameters:
user_id - Telegram user/chat ID
Note: Always pair with acquire_info_for_user() or use info_queue() context manager
info_queue() (Context Manager)
Location: misc/queue_manager.py:120
Context manager for automatic queue slot management.
@asynccontextmanager
async def info_queue(
self, user_id: int, bypass_user_limit: bool = False
) -> AsyncGenerator[bool, None]:
"""
Context manager for info queue with per-user limiting.
Args:
user_id: Telegram user/chat ID
bypass_user_limit: If True, skip per-user limit check (for inline)
Yields:
True if acquired successfully, False if user limit exceeded
Usage:
async with queue.info_queue(user_id) as acquired:
if not acquired:
await message.reply("Queue full, please wait...")
return
# Do work...
"""
acquired = await self.acquire_info_for_user(user_id, bypass_user_limit)
try:
yield acquired
finally:
if acquired:
await self.release_info_for_user(user_id)
Parameters:
user_id - Telegram user/chat ID
bypass_user_limit - If True, skip limit check
Yields: True if slot acquired, False if limit exceeded
Ensures: Automatic release in finally block
Example:
from misc.queue_manager import QueueManager
queue = QueueManager.get_instance()
# Normal usage (with limit check)
async with queue.info_queue(message.chat.id) as acquired:
if not acquired:
await message.reply(
locale[lang]["error_queue_full"].format(
queue.get_user_queue_count(message.chat.id)
)
)
return
# Process download (slot automatically released on exit)
video_info = await api.video(video_link)
await send_video_result(message.chat.id, video_info, lang, file_mode)
# Inline usage (bypass limit)
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
if not acquired:
await bot.edit_message_text(
inline_message_id=message_id,
text=locale[lang]["error"]
)
return
# Process inline download
video_info = await api.video(video_link)
active_users_count (Property)
Location: misc/queue_manager.py:148
Gets number of users with active requests.
@property
def active_users_count(self) -> int:
"""Number of users currently with items in the info queue."""
return len(self._user_info_counts)
Returns: Count of users with active requests
Example:
queue = QueueManager.get_instance()
print(f"{queue.active_users_count} users currently downloading")
Configuration
Queue settings are configured via data/config.py:
config["queue"] = {
"max_user_queue_size": 3, # Max concurrent requests per user (0 = unlimited)
}
Environment Variable: MAX_USER_QUEUE_SIZE
Usage Patterns
Standard Download Handler
from misc.queue_manager import QueueManager
from data.config import config, locale
queue = QueueManager.get_instance()
queue_config = config["queue"]
# Pre-check before acquiring slot
max_queue = queue_config["max_user_queue_size"]
if max_queue > 0:
user_queue_count = queue.get_user_queue_count(message.chat.id)
if user_queue_count >= max_queue:
await message.reply(
locale[lang]["error_queue_full"].format(user_queue_count)
)
return
# Acquire slot and process
async with queue.info_queue(message.chat.id) as acquired:
if not acquired:
# Shouldn't happen due to pre-check, but handle anyway
await message.reply(locale[lang]["error"])
return
# Process download
video_info = await api.video(video_link)
await send_video_result(message.chat.id, video_info, lang, file_mode)
Inline Download Handler
from misc.queue_manager import QueueManager
queue = QueueManager.get_instance()
# Bypass user limit for inline downloads
async with queue.info_queue(user_id, bypass_user_limit=True) as acquired:
if not acquired:
await bot.edit_message_text(
inline_message_id=message_id,
text="Error: Server busy"
)
return
# Process download
video_info = await api.video(video_link)
file_id = await upload_video_to_storage(video_info.data, video_info)
await bot.edit_message_media(
inline_message_id=message_id,
media=InputMediaVideo(media=file_id)
)
Why Per-User Limits?
Without Limits:
- One user could spam requests
- Server resources exhausted
- Other users blocked
With Per-User Limits:
- Fair resource allocation
- Prevents abuse
- Better user experience for all
Bypass Option:
- Inline downloads use
bypass_user_limit=True
- Allows inline queries even when user’s regular queue is full
- Prevents inline query failures
Thread Safety
All operations use asyncio.Lock for thread safety:
async with self._lock:
# Atomic operations on shared state
self._user_info_counts[user_id] = count
This ensures:
- No race conditions
- Accurate counts
- Safe concurrent access
Logging
Queue operations are logged for debugging:
logger.debug(f"User {user_id} acquired info slot (user_count={count})")
logger.debug(f"User {user_id} released info slot (user_count={count})")
logger.debug(f"User {user_id} rejected: {count}/{max} in queue")
Best Practices
- Always use context manager - Ensures automatic cleanup
- Pre-check before acquiring - Better UX (show error earlier)
- Use bypass sparingly - Only for inline downloads
- Set reasonable limits - Balance throughput and fairness (default: 3)
- Handle acquisition failure - Always check
acquired flag
- Don’t hold slots unnecessarily - Keep work inside context manager minimal
- Use get_instance() - Never create QueueManager directly
Testing
from misc.queue_manager import QueueManager
# Reset singleton for test isolation
QueueManager.reset_instance()
# Create instance with test config
queue = QueueManager.get_instance()
# Test user limits
await queue.acquire_info_for_user(123)
assert queue.get_user_queue_count(123) == 1
await queue.release_info_for_user(123)
assert queue.get_user_queue_count(123) == 0