Skip to content

Module 2: Best Practices

Overview

This guide covers best practices for implementing Azure AI Search basic operations in production applications. These practices ensure optimal performance, security, maintainability, and user experience.

🔍 Search Query Best Practices

Query Design

Use Appropriate Search Types

# For exact matches
results = search_client.search('"machine learning"')  # Phrase search

# For flexible matching
results = search_client.search('machine learning', search_mode='any')  # Any terms

# For precise matching
results = search_client.search('machine AND learning')  # Boolean search

# For partial matching
results = search_client.search('mach*')  # Wildcard search

Optimize Query Structure

# Good: Specific and targeted
query = 'python tutorial beginner'

# Better: Use field-specific search when possible
results = search_client.search(
    'python tutorial',
    search_fields=['title', 'description']
)

# Best: Combine with filters for precision
results = search_client.search(
    'python tutorial',
    search_fields=['title', 'description'],
    filter="difficulty eq 'Beginner' and rating ge 4.0"
)

Handle User Input Safely

import re

def sanitize_query(user_input):
    """Sanitize user input for safe searching"""
    if not user_input or not user_input.strip():
        return None

    # Remove potentially problematic characters
    sanitized = re.sub(r'[<>]', '', user_input.strip())

    # Limit length
    if len(sanitized) > 1000:
        sanitized = sanitized[:1000]

    # Normalize whitespace
    sanitized = re.sub(r'\s+', ' ', sanitized)

    return sanitized

# Usage
user_query = sanitize_query(request.get('q'))
if user_query:
    results = search_client.search(user_query)

Performance Optimization

Limit Result Sets

# Always specify reasonable limits
results = search_client.search(
    query,
    top=20,  # Don't return more than needed
    skip=0   # For pagination
)

Select Only Required Fields

# Instead of returning all fields
results = search_client.search(query)

# Return only what you need
results = search_client.search(
    query,
    select=['id', 'title', 'author', 'publishedDate']
)
# More efficient than searching all fields
results = search_client.search(
    query,
    search_fields=['title', 'description']  # Target specific fields
)

🛡️ Error Handling Best Practices

Comprehensive Error Handling

Handle Specific Error Types

from azure.core.exceptions import HttpResponseError
import logging

def safe_search(search_client, query, **kwargs):
    """Perform search with comprehensive error handling"""
    try:
        results = search_client.search(query, **kwargs)
        return list(results), None

    except HttpResponseError as e:
        error_msg = handle_http_error(e)
        logging.error(f"Search HTTP error: {error_msg}")
        return [], error_msg

    except Exception as e:
        error_msg = f"Unexpected search error: {str(e)}"
        logging.error(error_msg)
        return [], error_msg

def handle_http_error(error):
    """Convert HTTP errors to user-friendly messages"""
    status_code = error.status_code

    error_messages = {
        400: "Invalid search query. Please check your search terms.",
        401: "Authentication failed. Please check your credentials.",
        403: "Access denied. Insufficient permissions.",
        404: "Search index not found.",
        429: "Too many requests. Please wait and try again.",
        503: "Search service temporarily unavailable."
    }

    return error_messages.get(status_code, f"Search error: {error.message}")

Implement Fallback Strategies

def search_with_fallback(search_client, query):
    """Search with progressive fallback strategies"""
    strategies = [
        (f'"{query}"', "exact phrase"),
        (query, "all terms"),
        (query.replace(' ', ' OR '), "any terms"),
        (' OR '.join(f"{term}*" for term in query.split()), "wildcard")
    ]

    for search_query, strategy_name in strategies:
        try:
            results = list(search_client.search(search_query, top=10))
            if results:
                logging.info(f"Found {len(results)} results using {strategy_name}")
                return results
        except Exception as e:
            logging.warning(f"Strategy '{strategy_name}' failed: {e}")
            continue

    return []

Input Validation

Validate Before Searching

def validate_search_input(query, max_length=1000):
    """Validate search input before processing"""
    errors = []

    if not query:
        errors.append("Search query cannot be empty")
    elif not query.strip():
        errors.append("Search query cannot be just whitespace")
    elif len(query) > max_length:
        errors.append(f"Search query too long (max {max_length} characters)")
    elif len(query.strip()) < 2:
        errors.append("Search query must be at least 2 characters")

    return errors

# Usage
def perform_search(user_query):
    validation_errors = validate_search_input(user_query)
    if validation_errors:
        return {"errors": validation_errors, "results": []}

    sanitized_query = sanitize_query(user_query)
    results, error = safe_search(search_client, sanitized_query)

    return {"results": results, "error": error}

📊 Result Processing Best Practices

Efficient Result Handling

Process Results Efficiently

def process_search_results(results, max_preview_length=200):
    """Process search results for optimal display"""
    processed_results = []

    for result in results:
        # Extract core fields safely
        processed_result = {
            'id': result.get('id', ''),
            'title': result.get('title', 'Untitled'),
            'score': result.get('@search.score', 0.0),
            'author': result.get('author', 'Unknown'),
            'url': result.get('url', '#')
        }

        # Create content preview
        content = result.get('content', '')
        if content:
            preview = content[:max_preview_length]
            if len(content) > max_preview_length:
                # Find good breaking point
                last_space = preview.rfind(' ')
                if last_space > max_preview_length * 0.8:
                    preview = preview[:last_space]
                preview += '...'
            processed_result['preview'] = preview

        processed_results.append(processed_result)

    return processed_results

Implement Smart Pagination

class SearchPaginator:
    def __init__(self, search_client, page_size=20):
        self.search_client = search_client
        self.page_size = page_size

    def get_page(self, query, page_number=1, **search_options):
        """Get a specific page of results"""
        skip = (page_number - 1) * self.page_size

        try:
            results = self.search_client.search(
                query,
                top=self.page_size,
                skip=skip,
                include_total_count=True,
                **search_options
            )

            result_list = list(results)
            total_count = getattr(results, 'get_count', lambda: 0)()
            total_pages = (total_count + self.page_size - 1) // self.page_size

            return {
                'results': result_list,
                'page': page_number,
                'page_size': self.page_size,
                'total_results': total_count,
                'total_pages': total_pages,
                'has_next': page_number < total_pages,
                'has_previous': page_number > 1
            }

        except Exception as e:
            logging.error(f"Pagination error: {e}")
            return {
                'results': [],
                'error': str(e),
                'page': page_number,
                'total_results': 0
            }

Score Analysis

Analyze and Use Search Scores

def analyze_result_quality(results, min_quality_score=1.0):
    """Analyze search result quality and provide insights"""
    if not results:
        return {"quality": "no_results", "recommendation": "Try broader search terms"}

    scores = [r.get('@search.score', 0) for r in results]

    analysis = {
        'total_results': len(results),
        'score_range': {'min': min(scores), 'max': max(scores)},
        'average_score': sum(scores) / len(scores),
        'high_quality_count': len([s for s in scores if s >= min_quality_score])
    }

    # Provide recommendations
    if analysis['high_quality_count'] == 0:
        analysis['quality'] = 'low'
        analysis['recommendation'] = 'Try different search terms or use broader matching'
    elif analysis['high_quality_count'] >= len(results) * 0.7:
        analysis['quality'] = 'high'
        analysis['recommendation'] = 'Results look highly relevant'
    else:
        analysis['quality'] = 'mixed'
        analysis['recommendation'] = 'Consider filtering by score or refining query'

    return analysis

🔒 Security Best Practices

API Key Management

Secure Credential Handling

import os
from azure.identity import DefaultAzureCredential
from azure.core.credentials import AzureKeyCredential

def get_search_client(use_managed_identity=False):
    """Get search client with secure credential handling"""
    endpoint = os.getenv('AZURE_SEARCH_SERVICE_ENDPOINT')
    index_name = os.getenv('AZURE_SEARCH_INDEX_NAME')

    if not endpoint or not index_name:
        raise ValueError("Missing required environment variables")

    if use_managed_identity:
        # Preferred for production
        credential = DefaultAzureCredential()
    else:
        # For development/testing
        api_key = os.getenv('AZURE_SEARCH_API_KEY')
        if not api_key:
            raise ValueError("API key not found in environment variables")
        credential = AzureKeyCredential(api_key)

    return SearchClient(endpoint, index_name, credential)

Use Appropriate Key Types

# For different environments
class SearchClientFactory:
    @staticmethod
    def create_for_environment(environment='development'):
        """Create search client appropriate for environment"""
        if environment == 'production':
            # Use managed identity in production
            return get_search_client(use_managed_identity=True)
        elif environment == 'development':
            # Use API key for development
            return get_search_client(use_managed_identity=False)
        else:
            raise ValueError(f"Unknown environment: {environment}")

Input Sanitization

Prevent Injection Attacks

import html
import re

def secure_query_processing(user_input):
    """Securely process user search input"""
    if not user_input:
        return None

    # HTML escape to prevent XSS
    escaped = html.escape(user_input)

    # Remove potentially dangerous patterns
    # Remove script tags, SQL injection patterns, etc.
    dangerous_patterns = [
        r'<script.*?</script>',
        r'javascript:',
        r'vbscript:',
        r'onload=',
        r'onerror=',
        r'--',  # SQL comment
        r';.*drop\s+table',  # SQL injection
    ]

    cleaned = escaped
    for pattern in dangerous_patterns:
        cleaned = re.sub(pattern, '', cleaned, flags=re.IGNORECASE)

    # Normalize and limit length
    cleaned = re.sub(r'\s+', ' ', cleaned.strip())[:1000]

    return cleaned

🚀 Performance Best Practices

Caching Strategies

Implement Result Caching

import hashlib
import json
from datetime import datetime, timedelta

class SearchCache:
    def __init__(self, cache_duration_minutes=30):
        self.cache = {}
        self.cache_duration = timedelta(minutes=cache_duration_minutes)

    def _get_cache_key(self, query, options):
        """Generate cache key from query and options"""
        cache_data = {'query': query, 'options': options}
        cache_string = json.dumps(cache_data, sort_keys=True)
        return hashlib.md5(cache_string.encode()).hexdigest()

    def get(self, query, options):
        """Get cached results if available and not expired"""
        cache_key = self._get_cache_key(query, options)

        if cache_key in self.cache:
            cached_data = self.cache[cache_key]
            if datetime.now() - cached_data['timestamp'] < self.cache_duration:
                return cached_data['results']
            else:
                # Remove expired entry
                del self.cache[cache_key]

        return None

    def set(self, query, options, results):
        """Cache search results"""
        cache_key = self._get_cache_key(query, options)
        self.cache[cache_key] = {
            'results': results,
            'timestamp': datetime.now()
        }

    def clear_expired(self):
        """Remove expired cache entries"""
        now = datetime.now()
        expired_keys = [
            key for key, data in self.cache.items()
            if now - data['timestamp'] >= self.cache_duration
        ]
        for key in expired_keys:
            del self.cache[key]

# Usage
search_cache = SearchCache(cache_duration_minutes=15)

def cached_search(query, **options):
    """Search with caching"""
    # Try cache first
    cached_results = search_cache.get(query, options)
    if cached_results is not None:
        return cached_results

    # Perform search
    results = list(search_client.search(query, **options))

    # Cache results
    search_cache.set(query, options, results)

    return results

Connection Management

Reuse Search Client Instances

# Good: Singleton pattern for search client
class SearchService:
    _instance = None
    _client = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def get_client(self):
        if self._client is None:
            self._client = get_search_client()
        return self._client

# Usage
search_service = SearchService()
search_client = search_service.get_client()

📱 User Experience Best Practices

Search Interface Design

Provide Search Feedback

def search_with_feedback(query, search_client):
    """Search with user feedback"""
    start_time = time.time()

    try:
        results = list(search_client.search(query, top=20))
        search_time = time.time() - start_time

        feedback = {
            'query': query,
            'results_count': len(results),
            'search_time': round(search_time, 3),
            'status': 'success'
        }

        if len(results) == 0:
            feedback['suggestion'] = 'Try different keywords or check spelling'
        elif len(results) < 5:
            feedback['suggestion'] = 'Try broader search terms for more results'

        return results, feedback

    except Exception as e:
        return [], {
            'query': query,
            'status': 'error',
            'error': str(e),
            'suggestion': 'Please try again or contact support'
        }

Implement Search Suggestions

def get_search_suggestions(partial_query, search_client):
    """Get search suggestions based on partial input"""
    if len(partial_query) < 2:
        return []

    try:
        # Use wildcard search for suggestions
        suggestion_query = f"{partial_query}*"
        results = search_client.search(
            suggestion_query,
            search_fields=['title'],
            select=['title'],
            top=5
        )

        suggestions = []
        seen_titles = set()

        for result in results:
            title = result.get('title', '')
            if title and title not in seen_titles:
                suggestions.append(title)
                seen_titles.add(title)

        return suggestions[:5]  # Limit to 5 suggestions

    except Exception:
        return []  # Fail silently for suggestions

Result Presentation

Format Results for Different Contexts

class ResultFormatter:
    @staticmethod
    def for_web(results):
        """Format results for web display"""
        return [
            {
                'id': r.get('id'),
                'title': r.get('title', 'Untitled'),
                'snippet': ResultFormatter._create_snippet(r.get('content', '')),
                'author': r.get('author', 'Unknown'),
                'score': round(r.get('@search.score', 0), 2),
                'url': r.get('url', '#')
            }
            for r in results
        ]

    @staticmethod
    def for_api(results):
        """Format results for API response"""
        return {
            'results': [
                {
                    'id': r.get('id'),
                    'title': r.get('title'),
                    'score': r.get('@search.score'),
                    'fields': {k: v for k, v in r.items() if not k.startswith('@')}
                }
                for r in results
            ],
            'count': len(results)
        }

    @staticmethod
    def _create_snippet(content, max_length=150):
        """Create content snippet"""
        if not content or len(content) <= max_length:
            return content

        snippet = content[:max_length]
        last_space = snippet.rfind(' ')
        if last_space > max_length * 0.8:
            snippet = snippet[:last_space]

        return snippet + '...'

📊 Monitoring and Analytics

Search Analytics

Track Search Metrics

import logging
from datetime import datetime

class SearchAnalytics:
    def __init__(self):
        self.logger = logging.getLogger('search_analytics')

    def log_search(self, query, results_count, search_time, user_id=None):
        """Log search event for analytics"""
        event = {
            'timestamp': datetime.now().isoformat(),
            'query': query,
            'results_count': results_count,
            'search_time_ms': round(search_time * 1000, 2),
            'user_id': user_id
        }

        self.logger.info(f"SEARCH_EVENT: {json.dumps(event)}")

    def log_no_results(self, query, user_id=None):
        """Log when searches return no results"""
        event = {
            'timestamp': datetime.now().isoformat(),
            'event_type': 'no_results',
            'query': query,
            'user_id': user_id
        }

        self.logger.warning(f"NO_RESULTS: {json.dumps(event)}")

# Usage
analytics = SearchAnalytics()

def monitored_search(query, user_id=None):
    """Search with monitoring"""
    start_time = time.time()

    try:
        results = list(search_client.search(query))
        search_time = time.time() - start_time

        analytics.log_search(query, len(results), search_time, user_id)

        if len(results) == 0:
            analytics.log_no_results(query, user_id)

        return results

    except Exception as e:
        analytics.logger.error(f"Search error for query '{query}': {e}")
        raise

🔄 Testing Best Practices

Unit Testing

Test Search Functionality

import unittest
from unittest.mock import Mock, patch

class TestSearchFunctionality(unittest.TestCase):
    def setUp(self):
        self.mock_client = Mock()
        self.search_service = SearchService()
        self.search_service._client = self.mock_client

    def test_safe_search_success(self):
        """Test successful search"""
        # Mock successful response
        mock_results = [{'title': 'Test', '@search.score': 1.0}]
        self.mock_client.search.return_value = mock_results

        results, error = safe_search(self.mock_client, "test query")

        self.assertEqual(len(results), 1)
        self.assertIsNone(error)
        self.mock_client.search.assert_called_once_with("test query")

    def test_safe_search_http_error(self):
        """Test HTTP error handling"""
        from azure.core.exceptions import HttpResponseError

        # Mock HTTP error
        http_error = HttpResponseError("Bad request")
        http_error.status_code = 400
        self.mock_client.search.side_effect = http_error

        results, error = safe_search(self.mock_client, "bad query")

        self.assertEqual(len(results), 0)
        self.assertIn("Invalid search query", error)

    def test_query_validation(self):
        """Test input validation"""
        # Test empty query
        errors = validate_search_input("")
        self.assertIn("cannot be empty", errors[0])

        # Test too long query
        long_query = "x" * 1001
        errors = validate_search_input(long_query)
        self.assertIn("too long", errors[0])

        # Test valid query
        errors = validate_search_input("valid query")
        self.assertEqual(len(errors), 0)

if __name__ == '__main__':
    unittest.main()

📋 Checklist for Production

Pre-Deployment Checklist

  • [ ] Security

    • [ ] API keys stored securely (environment variables/key vault)
    • [ ] Input validation implemented
    • [ ] Query sanitization in place
    • [ ] Appropriate authentication method chosen
  • [ ] Performance

    • [ ] Result limits implemented
    • [ ] Field selection optimized
    • [ ] Caching strategy in place
    • [ ] Connection pooling configured
  • [ ] Error Handling

    • [ ] All error types handled
    • [ ] User-friendly error messages
    • [ ] Fallback strategies implemented
    • [ ] Logging configured
  • [ ] Monitoring

    • [ ] Search analytics implemented
    • [ ] Performance monitoring in place
    • [ ] Error tracking configured
    • [ ] Health checks implemented
  • [ ] Testing

    • [ ] Unit tests written
    • [ ] Integration tests completed
    • [ ] Load testing performed
    • [ ] Error scenarios tested

Ready for production? These best practices will help you build robust, secure, and performant search applications. Remember to adapt these patterns to your specific use case and requirements! 🚀