Overview
PAS2 is designed for performance, using parallel API calls, efficient progress tracking, and optimized resource management. Understanding these mechanisms helps you tune the system for your specific workload.
Parallel response generation
PAS2 retrieves responses for paraphrased queries in parallel, significantly reducing total processing time.
Thread pool configuration
def get_responses(self, queries: List[str]) -> List[str]:
"""Get responses from Mistral API for each query in parallel"""
with ThreadPoolExecutor(max_workers=min(len(queries), 5)) as executor:
future_to_index = {
executor.submit(self._get_single_response, query, i): i
for i, query in enumerate(queries)
}
responses = [""] * len(queries)
completed_count = 0
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
responses[index] = future.result()
completed_count += 1
return responses
Worker pool sizing
The thread pool is capped at 5 workers:
max_workers=min(len(queries), 5)
The 5-worker limit prevents overwhelming the API with concurrent requests while maximizing parallelism. Most API providers have rate limits that make higher concurrency counterproductive.
With 4 paraphrases (original + 3 paraphrases):
| Approach | Total Time | Speedup |
|---|
| Sequential | ~20 seconds | 1x |
| Parallel (5 workers) | ~5 seconds | 4x |
For custom deployments with higher rate limits, adjust max_workers based on your API tier:max_workers=min(len(queries), 10) # For higher rate limits
Progress callback optimization
Progress callbacks enable real-time UI updates without blocking the main thread.
Callback design
def __init__(self, mistral_api_key=None, openai_api_key=None, progress_callback=None):
self.progress_callback = progress_callback
# ...
def get_responses(self, queries: List[str]) -> List[str]:
# ...
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
responses[index] = future.result()
completed_count += 1
if self.progress_callback:
self.progress_callback("responses_progress",
completed_responses=completed_count,
total_responses=len(queries))
Minimizing callback overhead
Callbacks are designed to be lightweight:
- No blocking operations - Callbacks update state only
- Thread-safe updates - Uses locks for shared state
- Conditional execution - Only fires when callback is registered
def update_stage(self, stage, **kwargs):
"""Update the current stage and trigger callback"""
with self._lock:
if stage in self.STAGES:
self.stage = stage
# ... update state ...
if self._status_callback:
self._status_callback(self.get_html_status())
Gradio interface optimization
Queue configuration
Gradio’s interface is configured for optimal throughput:
interface.launch(
show_api=False,
quiet=True,
share=False,
max_threads=10,
debug=False
)
Key settings
max_threads=10 - Allows up to 10 concurrent interface operations
show_api=False - Disables API endpoint generation for faster startup
quiet=True - Reduces logging overhead in production
Event handler optimization
The submit button uses a two-stage approach:
submit_button.click(
fn=start_processing,
inputs=[query_input],
outputs=[progress_display, results_accordion, feedback_accordion, hidden_results],
queue=False # Immediate execution for UI updates
).then(
fn=process_query_and_display_results, # Long-running operation
inputs=[query_input],
outputs=[progress_display, results_accordion, feedback_accordion, hidden_results]
)
Using queue=False for start_processing ensures immediate UI feedback before the long-running detection starts.
Connection management
PAS2 uses a simple connect-execute-close pattern for SQLite:
def save_feedback(self, results, feedback):
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# ... execute query ...
conn.commit()
conn.close()
When to use connection pooling
For high-volume deployments, consider connection pooling:
import sqlite3
from contextlib import contextmanager
class DatabasePool:
def __init__(self, db_path, pool_size=5):
self.db_path = db_path
self.pool = [sqlite3.connect(db_path) for _ in range(pool_size)]
self.available = self.pool.copy()
self.lock = threading.Lock()
@contextmanager
def get_connection(self):
with self.lock:
if not self.available:
# Create new connection if pool exhausted
conn = sqlite3.connect(self.db_path)
else:
conn = self.available.pop()
try:
yield conn
finally:
with self.lock:
self.available.append(conn)
Connection pooling adds complexity. Only implement it if you’re handling >100 requests per minute.
API rate limit handling
Built-in retry logic
The system includes error handling for API failures:
def _get_single_response(self, query: str, index: int = None) -> str:
try:
response = self.mistral_client.chat.complete(
model=self.mistral_model,
messages=messages
)
return response.choices[0].message.content
except Exception as e:
error_msg = f"Error getting response for query '{query}': {e}"
logger.error(error_msg, exc_info=True)
return f"Error: Failed to get response for this query."
Implementing exponential backoff
For production deployments, add retry logic:
import time
from functools import wraps
def retry_with_backoff(max_retries=3, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
for attempt in range(max_retries):
try:
return func(*args, **kwargs)
except Exception as e:
if attempt == max_retries - 1:
raise
delay = base_delay * (2 ** attempt)
logger.warning(f"Attempt {attempt+1} failed, retrying in {delay}s")
time.sleep(delay)
return wrapper
return decorator
@retry_with_backoff(max_retries=3)
def _get_single_response(self, query: str, index: int = None) -> str:
# ... existing code ...
Memory optimization
Response storage
Responses are stored in lists, not accumulated strings:
responses = [""] * len(queries) # Pre-allocate
for future in concurrent.futures.as_completed(future_to_index):
index = future_to_index[future]
responses[index] = future.result() # Direct assignment
This avoids repeated string concatenation which creates intermediate objects.
Logging optimization
Logging uses lazy evaluation:
logger.info("Received response for %s (%.2f seconds)", query_description, elapsed_time)
The string formatting only occurs if the log level permits the message.
In production, set logging to WARNING or ERROR to reduce overhead:logging.basicConfig(level=logging.WARNING)
Built-in timing
The system tracks execution time for all major operations:
def generate_paraphrases(self, query: str, n_paraphrases: int = 3) -> List[str]:
start_time = time.time()
# ... generate paraphrases ...
elapsed_time = time.time() - start_time
logger.info("Generated %d paraphrases in %.2f seconds", len(paraphrases), elapsed_time)
Key metrics to monitor:
- Paraphrase generation time - Typically 2-3 seconds
- Response retrieval time - 1-2 seconds per response (parallel)
- Judgment time - 3-5 seconds
- Total detection time - Usually 8-12 seconds
Track metrics over time:
import time
class PerformanceMetrics:
def __init__(self):
self.metrics = []
def record(self, operation, duration):
self.metrics.append({
'timestamp': time.time(),
'operation': operation,
'duration': duration
})
def get_averages(self, window_size=100):
recent = self.metrics[-window_size:]
by_operation = {}
for m in recent:
if m['operation'] not in by_operation:
by_operation[m['operation']] = []
by_operation[m['operation']].append(m['duration'])
return {
op: sum(durations) / len(durations)
for op, durations in by_operation.items()
}
Optimization checklist
Before deploying to production:
Performance characteristics vary based on API latency, network conditions, and query complexity. Always profile your specific deployment before optimizing.