47d5af90d317bffde6f0843beabd2d72c88fbfce
lrnassar
Tue Mar 10 08:18:06 2026 -0700
Adding retry logic around the gmail API call, because it fails periodically (but then works next time it runs). Refs #36801
diff --git src/utils/qa/mlqAutomate.py src/utils/qa/mlqAutomate.py
index 748fee08c08..30d93b7f1bd 100755
--- src/utils/qa/mlqAutomate.py
+++ src/utils/qa/mlqAutomate.py
@@ -1,1901 +1,1903 @@
#!/usr/bin/env python3
"""
MLQ Automation Script
Monitors Gmail for mailing list emails, moderates pending messages,
and creates/updates Redmine tickets.
"""
import os
import sys
import argparse
import base64
import re
import logging
import html as html_module
import time
from datetime import datetime, timedelta
from difflib import SequenceMatcher
import email
from email import policy
from email.utils import parseaddr
from email.mime.text import MIMEText
from functools import wraps
import pytz
import requests
from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from google.auth.transport.requests import Request
from googleapiclient.discovery import build
+from googleapiclient.errors import HttpError
import anthropic
# Configuration
CONFIG = {
'REDMINE_URL': 'https://redmine.gi.ucsc.edu',
'REDMINE_API_KEY': '',
'REDMINE_PROJECT': 'maillists',
'CALENDAR_ID': 'ucsc.edu_anbl4254jlssgo3gc2l5c8un5c@group.calendar.google.com',
'CLAUDE_API_KEY': '',
# Mailing lists
'MODERATED_LISTS': ['genome@soe.ucsc.edu', 'genome-mirror@soe.ucsc.edu'],
'UNMODERATED_LISTS': ['genome-www@soe.ucsc.edu'],
# Name mapping from calendar to Redmine
'NAME_MAPPING': {
'Jairo': 'Jairo Navarro',
'Lou': 'Lou Nassar',
'Gerardo': 'Gerardo Perez',
'Gera': 'Gerardo Perez',
'Clay': 'Clay Fischer',
'Matt': 'Matt Speir',
},
# Redmine User IDs
'USER_IDS': {
'Jairo Navarro': 163,
'Lou Nassar': 171,
'Gerardo Perez': 179,
'Clay Fischer': 161,
'Matt Speir': 150,
},
# Redmine field IDs
'TRACKER_ID': 7, # MLQ
'PRIORITY_ID': 12, # Unprioritized
'STATUS_ID': 1, # New
'CUSTOM_FIELDS': {
'MLQ Category - primary': 28,
'Email': 40,
'MLM': 9,
},
}
SCOPES = [
'https://www.googleapis.com/auth/gmail.readonly',
'https://www.googleapis.com/auth/gmail.send',
'https://www.googleapis.com/auth/gmail.modify',
'https://www.googleapis.com/auth/calendar.readonly',
]
MLQ_CATEGORIES = [
"Other", "Alignments", "BLAT", "Bug Report", "CAPTCHA", "Command-line Utilities",
"Conservation", "Custom Track", "Data - Availability (when)", "Data - Interpretation (what)",
"Data - Location (where)", "Data Contribution", "Data Integrator", "Data Requests", "dbSNP",
"Downloads", "ENCODE", "External Tools", "Feature Request", "GBiB", "GBiC",
"Gene Interactions (hgGeneGraph)", "Gene Tracks", "Help Docs (Info)", "Hubs", "IP blocked",
"JSON hubApi", "Licenses", "LiftOver", "Login", "Mirror - Asia", "Mirror - Europe",
"Mirror Site & Utilities", "Multi-region", "MySQL", "PCR", "Publications & Citing",
"Sessions", "Slow Performance", "Table Browser", "Track Collection Builder", "User Accounts",
"Variant Annotation Integrator", "Widget"
]
SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
PST = pytz.timezone('America/Los_Angeles')
DRY_RUN = False
# Setup logging
LOG_FILE = os.environ.get('MLQ_LOG_FILE', os.path.join(SCRIPT_DIR, 'mlq_automate.log'))
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.FileHandler(LOG_FILE),
logging.StreamHandler(sys.stdout)
]
)
logger = logging.getLogger(__name__)
# Suppress httpx INFO logs (Anthropic client HTTP request logging)
logging.getLogger('httpx').setLevel(logging.WARNING)
# Config file path
MLQ_CONF_PATH = os.path.expanduser('~/.hg.conf')
def load_config_file():
"""
Load configuration from ~/.hg.conf file.
Uses the standard UCSC kent configuration file.
Requires 600 permissions for security.
"""
if not os.path.exists(MLQ_CONF_PATH):
logger.error(f"Configuration file not found: {MLQ_CONF_PATH}")
logger.error("Please add the following keys to ~/.hg.conf:")
logger.error(" redmine.apiKey=YOUR_REDMINE_API_KEY")
logger.error(" claude.apiKey=YOUR_CLAUDE_API_KEY")
sys.exit(1)
# Check file permissions (must be 600 for security)
file_stat = os.stat(MLQ_CONF_PATH)
file_mode = file_stat.st_mode & 0o777
if file_mode != 0o600:
logger.error(f"Configuration file {MLQ_CONF_PATH} has insecure permissions: {oct(file_mode)}")
logger.error("For security, this file must have 600 permissions.")
logger.error(f"Run: chmod 600 {MLQ_CONF_PATH}")
sys.exit(1)
# Parse key=value pairs
config_values = {}
with open(MLQ_CONF_PATH, 'r') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
# Skip empty lines and comments
if not line or line.startswith('#'):
continue
if '=' not in line:
logger.warning(f"Skipping invalid line {line_num} in {MLQ_CONF_PATH}: {line}")
continue
key, value = line.split('=', 1)
config_values[key.strip()] = value.strip()
# Map config file keys to CONFIG dict
key_mapping = {
'redmine.apiKey': 'REDMINE_API_KEY',
'claude.apiKey': 'CLAUDE_API_KEY',
}
for conf_key, config_key in key_mapping.items():
if conf_key in config_values:
CONFIG[config_key] = config_values[conf_key]
# Validate required keys
if not CONFIG['REDMINE_API_KEY']:
logger.error(f"Missing redmine.apiKey in {MLQ_CONF_PATH}")
sys.exit(1)
if not CONFIG['CLAUDE_API_KEY']:
logger.error(f"Missing claude.apiKey in {MLQ_CONF_PATH}")
sys.exit(1)
logger.info(f"Loaded configuration from {MLQ_CONF_PATH}")
def retry(max_attempts=3, delay=2, backoff=2, exceptions=(Exception,)):
"""Retry decorator with exponential backoff."""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except exceptions as e:
attempts += 1
if attempts == max_attempts:
logger.error(f"{func.__name__} failed after {max_attempts} attempts: {e}")
raise
logger.warning(f"{func.__name__} attempt {attempts} failed: {e}. Retrying in {current_delay}s...")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
def get_google_credentials():
"""Get or refresh Google API credentials."""
creds = None
token_path = os.path.expanduser('~/.gmail_token.json')
creds_path = os.path.expanduser('~/.gmail_credentials.json')
if os.path.exists(token_path):
creds = Credentials.from_authorized_user_file(token_path, SCOPES)
if not creds or not creds.valid:
if creds and creds.expired and creds.refresh_token:
creds.refresh(Request())
else:
flow = InstalledAppFlow.from_client_secrets_file(creds_path, SCOPES)
creds = flow.run_local_server(port=8080)
with open(token_path, 'w') as f:
f.write(creds.to_json())
return creds
def get_current_mlm():
"""Get the current MLM based on PST time rules."""
creds = get_google_credentials()
service = build('calendar', 'v3', credentials=creds, cache_discovery=False)
now = datetime.now(PST)
target_date = now
# After 5pm, use next day's MLM
if now.hour >= 17:
target_date += timedelta(days=1)
# Weekend handling: Fri 5pm+ through Mon 5pm uses Monday's MLM
weekday = target_date.weekday()
if weekday == 5: # Saturday
target_date += timedelta(days=2)
elif weekday == 6: # Sunday
target_date += timedelta(days=1)
# Tuesday is always Matt
if target_date.weekday() == 1: # Tuesday
return 'Matt Speir'
start = target_date.replace(hour=0, minute=0, second=0, microsecond=0)
end = target_date.replace(hour=23, minute=59, second=59, microsecond=0)
events = service.events().list(
calendarId=CONFIG['CALENDAR_ID'],
timeMin=start.isoformat(),
timeMax=end.isoformat(),
singleEvents=True
).execute().get('items', [])
for event in events:
title = event.get('summary', '')
match = re.search(r'MLM(?:\s+Rotating)?:\s*(\w+)', title, re.IGNORECASE)
if match:
cal_name = match.group(1)
return CONFIG['NAME_MAPPING'].get(cal_name, cal_name)
logger.warning(f"No MLM found for {target_date.date()}")
return None
@retry(max_attempts=3, delay=2, exceptions=(anthropic.APIError,))
def analyze_email_with_claude(subject, body, sender):
"""
Use Claude to analyze an email in a single call.
Returns dict with: is_spam, category, draft_response
"""
client = anthropic.Anthropic(api_key=CONFIG['CLAUDE_API_KEY'])
categories_list = ", ".join(MLQ_CATEGORIES)
prompt = f"""Analyze this email for the UCSC Genome Browser support team.
From: {sender}
Subject: {subject}
Body:
{body[:3000]}
Provide your analysis in this exact format:
SPAM: [YES or NO]
CATEGORY: [Pick one from: {categories_list}]
DRAFT_RESPONSE: [If not spam, write a helpful, professional response under 200 words. If spam, write "N/A"]
Important:
- Mark as SPAM if it is:
- Conference/journal solicitations asking for paper submissions
- Promotions for workshops, courses, training programs, or webinars
- Marketing or promotional emails advertising services or products
- Mass-sent announcements unrelated to genome browser support
- Contains sensitive personal medical information (specific names with genetic test results, medical conditions, family medical history, or personal health details) - these are privacy concerns
- Mark as NOT SPAM if it is a genuine question about using the UCSC Genome Browser (general genetics questions without personal identifying info are OK)
- For CATEGORY, pick the most specific match. Use "Other" if unsure.
- For DRAFT_RESPONSE, be helpful and concise. Ask clarifying questions if needed. Point to relevant documentation when appropriate."""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=800,
messages=[{"role": "user", "content": prompt}]
)
result_text = response.content[0].text.strip()
# Parse the response
is_spam = False
category = "Other"
draft_response = None
for line in result_text.split('\n'):
line = line.strip()
if line.upper().startswith('SPAM:'):
is_spam = 'YES' in line.upper()
elif line.upper().startswith('CATEGORY:'):
cat = line.split(':', 1)[1].strip()
# Validate category
if cat in MLQ_CATEGORIES:
category = cat
else:
for c in MLQ_CATEGORIES:
if c.lower() == cat.lower():
category = c
break
elif line.upper().startswith('DRAFT_RESPONSE:'):
draft_response = line.split(':', 1)[1].strip()
# Capture multi-line response
idx = result_text.find('DRAFT_RESPONSE:')
if idx != -1:
draft_response = result_text[idx + len('DRAFT_RESPONSE:'):].strip()
if draft_response.upper() == 'N/A':
draft_response = None
return {
'is_spam': is_spam,
'category': category,
'draft_response': draft_response
}
@retry(max_attempts=3, delay=2, exceptions=(anthropic.APIError,))
def batch_check_spam_with_claude(messages):
"""
Use Claude to determine spam status for multiple emails in one call.
Returns dict mapping message index to True (spam) or False (not spam).
"""
if not messages:
return {}
client = anthropic.Anthropic(api_key=CONFIG['CLAUDE_API_KEY'])
# Build the prompt with all messages
emails_text = ""
for i, msg in enumerate(messages, 1):
emails_text += f"""
--- EMAIL {i} ---
From: {msg['original_from']}
Subject: {msg['original_subject']}
Body:
{msg['original_body'][:1500]}
"""
prompt = f"""Analyze these emails and determine if each is spam. This is for a genome browser technical support mailing list.
{emails_text}
Mark as SPAM if it is:
- Conference/journal solicitations asking for paper submissions
- Promotions for workshops, courses, training programs, or webinars
- Marketing or promotional emails advertising services or products
- Phishing or scam attempts
- Mass-sent unsolicited emails unrelated to genome browser support
- Announcements about events, courses, or programs (not questions about the browser)
- Contains sensitive personal medical information (specific names with genetic test results, medical conditions, family medical history, or personal health details) - these are privacy concerns
Mark as NOT SPAM if it is:
- A genuine question about the UCSC Genome Browser
- A technical support request
- A follow-up to an existing conversation
- Someone asking how to use browser features for their research
- General questions about genetic data without personal identifying information
Reply with one line per email in this exact format:
EMAIL 1: SPAM or NOT SPAM
EMAIL 2: SPAM or NOT SPAM
(etc.)"""
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=100,
messages=[{"role": "user", "content": prompt}]
)
result_text = response.content[0].text.strip().upper()
results = {}
for line in result_text.split('\n'):
line = line.strip()
match = re.match(r'EMAIL\s*(\d+):\s*(SPAM|NOT SPAM)', line)
if match:
idx = int(match.group(1)) - 1 # Convert to 0-based index
is_spam = match.group(2) == 'SPAM'
results[idx] = is_spam
# Default to not spam for any missing results
for i in range(len(messages)):
if i not in results:
logger.warning(f"No spam result for message {i+1}, defaulting to not spam")
results[i] = False
return results
def parse_raw_email_headers(raw_email):
"""Parse Subject, From, and body from raw email text.
Properly handles multipart MIME emails by extracting just the text/plain part.
"""
# Parse using Python's email module for proper MIME handling
msg = email.message_from_string(raw_email, policy=policy.default)
# Get headers
subject = msg.get('subject', '')
# Priority: x-original-sender > reply-to > from
sender = (
msg.get('x-original-sender') or
msg.get('reply-to') or
msg.get('from', '')
)
# Extract body - prefer text/plain, fall back to text/html converted to text
body = ''
html_body = ''
if msg.is_multipart():
for part in msg.walk():
content_type = part.get_content_type()
if content_type == 'text/plain' and not body:
try:
body = part.get_content()
except Exception:
# Fallback: try to decode manually
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
body = payload.decode(charset, errors='ignore')
elif content_type == 'text/html' and not html_body:
try:
html_body = part.get_content()
except Exception:
payload = part.get_payload(decode=True)
if payload:
charset = part.get_content_charset() or 'utf-8'
html_body = payload.decode(charset, errors='ignore')
else:
# Not multipart - check content type
content_type = msg.get_content_type()
try:
content = msg.get_content()
except Exception:
payload = msg.get_payload(decode=True)
if payload:
charset = msg.get_content_charset() or 'utf-8'
content = payload.decode(charset, errors='ignore')
else:
content = msg.get_payload()
if content_type == 'text/plain':
body = content
elif content_type == 'text/html':
html_body = content
# If no text/plain, convert HTML to text
if not body and html_body:
body = html_to_text(html_body)
return subject, sender, body
+@retry(max_attempts=3, delay=2, exceptions=(HttpError,))
def get_pending_moderation_emails(group_name):
"""Get pending moderation notification emails for a group."""
creds = get_google_credentials()
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
# Search for pending moderation emails for this group (exclude trash)
query = f'subject:"{group_name} - soe.ucsc.edu admins: Message Pending" -in:trash'
results = service.users().messages().list(userId='me', q=query, maxResults=50).execute()
pending = []
for msg_ref in results.get('messages', []):
msg = service.users().messages().get(userId='me', id=msg_ref['id'], format='full').execute()
headers = {h['name']: h['value'] for h in msg['payload']['headers']}
# Extract the approval address from the From header
from_addr = headers.get('From', '')
approve_addr = None
if '+msgappr@' in from_addr:
match = re.search(r'<([^>]+\+msgappr@[^>]+)>', from_addr)
if match:
approve_addr = match.group(1)
# Extract the attached original message
original_subject = ''
original_from = ''
original_body = ''
original_attachments = [] # Will store attachment info for later extraction
for part in msg['payload'].get('parts', []):
if part['mimeType'] == 'message/rfc822':
# Check if original email is in an attachment (needs separate fetch)
if 'attachmentId' in part.get('body', {}):
att_id = part['body']['attachmentId']
att = service.users().messages().attachments().get(
userId='me', messageId=msg_ref['id'], id=att_id
).execute()
raw_email = base64.urlsafe_b64decode(att['data']).decode('utf-8', errors='ignore')
original_subject, original_from, original_body = parse_raw_email_headers(raw_email)
# Note: Attachments in raw RFC822 would need MIME parsing - skip for now
else:
# Fallback: nested parts format (Gmail pre-parsed the RFC822)
# Helper to recursively find content of specific MIME type
def find_content_recursive(p, mime_type):
if p.get('mimeType') == mime_type and p.get('body', {}).get('data'):
return base64.urlsafe_b64decode(p['body']['data']).decode('utf-8', errors='ignore')
for sp in p.get('parts', []):
result = find_content_recursive(sp, mime_type)
if result:
return result
return ''
# Helper to recursively find email headers (Subject/From) in nested MIME structure
def find_headers_recursive(p):
if 'headers' in p:
headers = {h['name']: h['value'] for h in p['headers']}
# Only return if we found actual email headers (not just Content-Type)
if 'Subject' in headers or 'From' in headers:
return headers
for sp in p.get('parts', []):
result = find_headers_recursive(sp)
if result:
return result
return {}
nested_headers = find_headers_recursive(part)
if nested_headers:
original_subject = nested_headers.get('Subject', original_subject)
original_from = (
nested_headers.get('X-Original-Sender') or
nested_headers.get('Reply-To') or
nested_headers.get('From') or
original_from
)
# Try text/plain first, fall back to HTML converted to text
original_body = find_content_recursive(part, 'text/plain')
if not original_body:
html_body = find_content_recursive(part, 'text/html')
if html_body:
original_body = html_to_text(html_body)
# Extract attachments from nested RFC822 structure
original_attachments = extract_email_attachments(service, msg_ref['id'], part)
if approve_addr:
# Get the notification's Message-Id and Subject for proper reply threading
notification_message_id = headers.get('Message-Id') or headers.get('Message-ID', '')
notification_subject = headers.get('Subject', '')
pending.append({
'gmail_id': msg_ref['id'],
'approve_addr': approve_addr,
'original_subject': original_subject,
'original_from': original_from,
'original_body': original_body,
'original_attachments': original_attachments,
'notification_message_id': notification_message_id,
'notification_subject': notification_subject,
})
return pending
def moderate_message(pending_msg, approve=True):
"""Approve a pending message by sending email reply."""
if DRY_RUN:
logger.info(f" [DRY RUN] Would approve: {pending_msg['original_subject'][:50]}")
return True
creds = get_google_credentials()
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
to_addr = pending_msg['approve_addr']
# Create a properly formatted reply with threading headers
message = MIMEText('')
message['To'] = to_addr
message['From'] = 'gbauto@ucsc.edu'
# Use Re: prefix on original subject to indicate reply
orig_subject = pending_msg.get('notification_subject', '')
if orig_subject and not orig_subject.startswith('Re:'):
message['Subject'] = f'Re: {orig_subject}'
else:
message['Subject'] = orig_subject or 'Re: Moderation'
# Add threading headers to make this a proper reply
msg_id = pending_msg.get('notification_message_id', '')
if msg_id:
message['In-Reply-To'] = msg_id
message['References'] = msg_id
encoded = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
try:
service.users().messages().send(
userId='me',
body={'raw': encoded}
).execute()
return True
except Exception as e:
logger.error(f" Error approving message: {e}")
return False
def send_error_notification(subject, body):
"""Send an error notification email to the QA team.
Used to alert the team when Redmine API operations fail after retries.
"""
if DRY_RUN:
logger.info(f" [DRY RUN] Would send error notification: {subject}")
return True
try:
creds = get_google_credentials()
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
message = MIMEText(body)
message['To'] = 'browserqa-group@ucsc.edu'
message['From'] = 'gbauto@ucsc.edu'
message['Subject'] = f'[MLQ Automation Error] {subject}'
encoded = base64.urlsafe_b64encode(message.as_bytes()).decode('utf-8')
service.users().messages().send(
userId='me',
body={'raw': encoded}
).execute()
logger.info(f"Sent error notification email: {subject}")
return True
except Exception as e:
logger.error(f"Failed to send error notification email: {e}")
return False
def delete_moderation_email(gmail_id):
"""Delete/archive the moderation notification after processing."""
if DRY_RUN:
return
creds = get_google_credentials()
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
try:
service.users().messages().trash(userId='me', id=gmail_id).execute()
except Exception as e:
logger.error(f" Error trashing notification: {e}")
def get_emails_from_gmail(group_email, minutes_ago=60):
"""Get recent emails sent to a mailing list."""
creds = get_google_credentials()
service = build('gmail', 'v1', credentials=creds, cache_discovery=False)
cutoff = datetime.now(PST) - timedelta(minutes=minutes_ago)
query = f"to:{group_email} after:{int(cutoff.timestamp())}"
results = service.users().messages().list(
userId='me', q=query, maxResults=50
).execute()
messages = []
for msg_ref in results.get('messages', []):
msg = service.users().messages().get(
userId='me', id=msg_ref['id'], format='full'
).execute()
headers = {h['name']: h['value'] for h in msg['payload']['headers']}
subject = headers.get('Subject', '(no subject)')
# Get original sender - Google Groups may rewrite From to the list address
# Priority: X-Original-Sender > Reply-To > From
from_addr = (
headers.get('X-Original-Sender') or
headers.get('Reply-To') or
headers.get('From', '')
)
# Skip moderation-related emails (notifications and our own replies)
if 'Message Pending' in subject:
continue
if 'Moderation response' in subject:
continue
if 'gbauto@ucsc.edu' in from_addr:
continue
if '+msgappr@' in from_addr or '+msgrej@' in from_addr:
continue
# Extract body - prefer text/plain, fall back to text/html converted to text
def extract_body_content(payload, target_type):
"""Recursively find content of target_type in email payload."""
if payload.get('mimeType') == target_type:
data = payload.get('body', {}).get('data', '')
if data:
return base64.urlsafe_b64decode(data).decode('utf-8', errors='ignore')
if 'parts' in payload:
for part in payload['parts']:
result = extract_body_content(part, target_type)
if result:
return result
return ''
# Try text/plain first, fall back to HTML converted to text
body = extract_body_content(msg['payload'], 'text/plain')
if not body:
html_body = extract_body_content(msg['payload'], 'text/html')
if html_body:
body = html_to_text(html_body)
# Extract attachments
attachments = extract_email_attachments(service, msg['id'], msg['payload'])
messages.append({
'id': msg['id'],
'thread_id': msg['threadId'],
'subject': subject,
'from': from_addr,
'to': headers.get('To', ''),
'cc': headers.get('Cc', ''),
'date': headers.get('Date', ''),
'body': body,
'attachments': attachments,
'timestamp': int(msg['internalDate']) / 1000,
})
return messages
def html_to_text(html):
"""Convert HTML to plain text by stripping tags and decoding entities.
Used as a fallback when an email only has HTML content (no text/plain part).
"""
if not html:
return ''
# Remove script and style tags with their content
text = re.sub(r'', '', html, flags=re.DOTALL | re.IGNORECASE)
text = re.sub(r'', '', text, flags=re.DOTALL | re.IGNORECASE)
# Convert and
to newlines
text = re.sub(r' ', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'', '\n\n', text, flags=re.IGNORECASE)
text = re.sub(r'', '\n', text, flags=re.IGNORECASE)
text = re.sub(r'', '\n', text, flags=re.IGNORECASE)
# Remove all other HTML tags
text = re.sub(r'<[^>]+>', '', text)
# Decode HTML entities
text = html_module.unescape(text)
# Normalize whitespace (collapse multiple spaces/newlines)
text = re.sub(r'[ \t]+', ' ', text) # Collapse horizontal whitespace
text = re.sub(r'\n[ \t]+', '\n', text) # Remove leading spaces on lines
text = re.sub(r'[ \t]+\n', '\n', text) # Remove trailing spaces on lines
text = re.sub(r'\n{3,}', '\n\n', text) # Collapse multiple newlines
return text.strip()
def sanitize_for_redmine(text):
"""Clean up text for Redmine textile compatibility."""
# Remove emojis and other 4-byte UTF-8 characters that cause MySQL utf8 encoding issues
# MySQL utf8 is 3-byte max; utf8mb4 supports 4-byte but many Redmine installs use utf8
text = re.sub(r'[\U00010000-\U0010FFFF]', '', text)
# Remove Outlook duplicate URL format: URL -> URL
# Outlook often includes the URL twice: once as display text, once in angle brackets
text = re.sub(
r'(https?://[^\s<>\[\]]+)\[\]>]+>',
r'\1',
text
)
# Note: We do NOT auto-link URLs here. Redmine uses Textile format by default
# and has its own auto-linking for bare URLs. Adding Markdown-style [URL](URL)
# links causes display issues in Textile mode.
lines = text.split('\n')
cleaned = []
for line in lines:
# Remove leading whitespace that would trigger code blocks (4+ spaces or tabs)
stripped = line.lstrip(' \t')
leading = len(line) - len(stripped)
if leading >= 4:
# Reduce to 2 spaces to preserve some structure without triggering code block
line = ' ' + stripped
elif leading > 0 and line.startswith('\t'):
# Convert tabs to spaces
line = ' ' + stripped
# Escape # at start of line to prevent header formatting
if line.startswith('#'):
line = '\\' + line
# Escape --- or *** that would become horizontal rules
if re.match(r'^[-*_]{3,}\s*$', line):
line = '\\' + line
# Escape leading - or * followed by space that would become list items
# (only if it doesn't look intentional, i.e., not followed by more text structure)
if re.match(r'^[-*]\s+[a-z]', line, re.IGNORECASE):
# Looks like prose starting with dash, not a list - escape it
if not re.match(r'^[-*]\s+\S+\s*$', line): # Single word items are likely lists
line = '\\' + line
cleaned.append(line)
return '\n'.join(cleaned)
def strip_quoted_content(body):
"""Remove quoted reply content and Google Groups footer from email body.
Handles three reply styles:
1. Top-posted: New content at top, quoted content below (most common)
2. Inline/interleaved: Short quotes with replies below each (less common)
3. Bottom-posted: Quoted content at top, new content below (e.g., some
Thunderbird/Linux mail clients)
For bottom-posted replies, when a quote header ("On ... wrote:") appears
within the first 3 lines, the quoted block is skipped and only the new
content that follows is kept.
"""
lines = body.split('\n')
cleaned = []
i = 0
# Pattern to strip Unicode control characters (LTR/RTL marks, etc.)
# These can appear at the start or end of lines in emails with mixed-direction text
unicode_control_chars = '[\u200e\u200f\u202a-\u202e\u2066-\u2069]'
unicode_control_pattern = re.compile(f'^{unicode_control_chars}+|{unicode_control_chars}+$')
while i < len(lines):
line = lines[i]
# Strip Unicode control characters for pattern matching
line_stripped = unicode_control_pattern.sub('', line)
# Also strip leading quote markers ("> ") to catch quoted "On...wrote:" patterns
# e.g., "> On Jan 15, 2026, at 12:01 AM, Name wrote:"
line_unquoted = re.sub(r'^>\s*', '', line_stripped)
# Detect "On wrote:" quote headers
is_quote_header = False
quote_header_lines = 1 # How many lines this header spans
if re.match(r'^On .+wrote:\s*$', line_unquoted, re.IGNORECASE):
is_quote_header = True
elif re.match(r'^On .+, at .+wrote:\s*$', line_unquoted, re.IGNORECASE):
is_quote_header = True
elif re.match(r'^On .+@.+>\s*$', line_unquoted) or re.match(r'^On .*\d{4}.*[AP]M .+', line_unquoted):
if i + 1 < len(lines):
next_stripped = unicode_control_pattern.sub('', lines[i + 1])
next_unquoted = re.sub(r'^>\s*', '', next_stripped).strip().lower()
if next_unquoted.endswith('wrote:'):
is_quote_header = True
quote_header_lines = 2
elif line_unquoted.strip().lower().endswith('wrote:') and i > 0:
prev = lines[i - 1]
prev_stripped = unicode_control_pattern.sub('', prev)
prev_unquoted = re.sub(r'^>\s*', '', prev_stripped)
if re.match(r'^On .+', prev_unquoted, re.IGNORECASE):
if cleaned and cleaned[-1] == prev:
cleaned.pop()
is_quote_header = True
if is_quote_header:
# Bottom-posted reply: quote header near the start means new content
# is below the quoted block. Skip the quoted block, keep what follows.
# Allow at most 1 non-blank line above (e.g., a greeting like "Hi,")
non_blank_above = sum(1 for l in cleaned if l.strip())
if non_blank_above <= 1:
# Skip past the quote header
i += quote_header_lines
# Skip the quoted lines (starting with ">")
while i < len(lines):
l = lines[i]
l_stripped = unicode_control_pattern.sub('', l).strip()
if l_stripped.startswith('>') or l_stripped == '':
i += 1
else:
break
# Reset cleaned — anything before the quote header was blank/trivial
cleaned = []
continue
else:
# Top-posted reply: we already have real content above, stop here
break
if re.match(r'^-{4,}\s*Original Message\s*-{4,}', line_stripped, re.IGNORECASE):
break
# Gmail forwarded message format: "---------- Forwarded message ---------"
if re.match(r'^-{4,}\s*Forwarded message\s*-{4,}', line_stripped, re.IGNORECASE):
break
# Outlook single-line forward format
if re.match(r'^From:.*Sent:.*To:', line_stripped, re.IGNORECASE):
break
# Outlook multi-line forward format:
# From: Name
# Sent: Date
# To: Recipients
if re.match(r'^From:\s*.+', line_stripped, re.IGNORECASE):
# Look ahead for Sent: and To: on subsequent lines
if i + 2 < len(lines):
next1 = unicode_control_pattern.sub('', lines[i + 1])
next2 = unicode_control_pattern.sub('', lines[i + 2])
if re.match(r'^Sent:\s*.+', next1, re.IGNORECASE) and re.match(r'^To:\s*.+', next2, re.IGNORECASE):
break
# Stop at Google Groups footer
if 'You received this message because you are subscribed to the Google Groups' in line:
break
# Skip Google Groups unsubscribe line
if 'To unsubscribe from this group and stop receiving emails from it' in line:
i += 1
continue
if line.strip() == '---' and len(cleaned) > 0:
i += 1
continue
# Handle quoted lines (starting with >)
# Keep quoted lines for inline reply context - they provide important context
# The "On ... wrote:" and other patterns above will stop at the full original message
cleaned.append(line)
i += 1
# Remove trailing dashes and whitespace
while cleaned and cleaned[-1].strip() in ('', '---', '--'):
cleaned.pop()
return '\n'.join(cleaned).rstrip()
# Attachment handling constants
SIGNATURE_ATTACHMENT_PATTERNS = [
r'^logo',
r'^signature',
r'^banner',
r'^icon',
r'^footer',
r'^divider',
# Note: Removed image\d* pattern - Outlook uses this for legitimate inline images.
# Size filter (MIN_ATTACHMENT_SIZE) handles small signature icons instead.
]
MIN_ATTACHMENT_SIZE = 1024 # Skip attachments smaller than 1KB (likely icons)
MAX_ATTACHMENT_SIZE = 10 * 1024 * 1024 # 10MB max per attachment
def is_signature_attachment(filename, size_bytes):
"""Check if an attachment is likely a signature/icon that should be skipped."""
if not filename:
return True
# Skip very small images (likely icons/spacers)
if size_bytes < MIN_ATTACHMENT_SIZE:
return True
# Check filename against signature patterns
name_lower = filename.lower().rsplit('.', 1)[0] # Remove extension
for pattern in SIGNATURE_ATTACHMENT_PATTERNS:
if re.match(pattern, name_lower, re.IGNORECASE):
return True
return False
def extract_email_attachments(gmail_service, message_id, payload):
"""
Extract all attachments from an email.
Returns list of dicts with: filename, mimeType, data, size
Filters out signature images and oversized files.
"""
attachments = []
def find_attachments_recursive(part):
mime_type = part.get('mimeType', '')
filename = part.get('filename', '')
body = part.get('body', {})
attachment_id = body.get('attachmentId')
size = body.get('size', 0)
# Check if this part is an attachment (has attachmentId or filename with size)
if attachment_id and filename:
# Skip signature/icon attachments
if is_signature_attachment(filename, size):
logger.debug(f"Skipping signature attachment: {filename} ({size} bytes)")
elif size > MAX_ATTACHMENT_SIZE:
logger.warning(f"Skipping oversized attachment: {filename} ({size} bytes)")
else:
# Download the attachment
try:
att_data = gmail_service.users().messages().attachments().get(
userId='me', messageId=message_id, id=attachment_id
).execute()
file_data = base64.urlsafe_b64decode(att_data['data'])
attachments.append({
'filename': filename,
'mimeType': mime_type,
'data': file_data,
'size': len(file_data)
})
except Exception as e:
logger.warning(f"Failed to download attachment {filename}: {e}")
# Recurse into nested parts
for subpart in part.get('parts', []):
find_attachments_recursive(subpart)
find_attachments_recursive(payload)
return attachments
def upload_attachments_to_redmine(attachments):
"""
Upload attachments to Redmine and return tokens.
Returns list of dicts with: filename, token, content_type
"""
if not attachments:
return []
uploaded = []
for att in attachments:
try:
upload_url = f"{CONFIG['REDMINE_URL']}/uploads.json?filename={att['filename']}"
headers = {
'X-Redmine-API-Key': CONFIG['REDMINE_API_KEY'],
'Content-Type': 'application/octet-stream'
}
resp = requests.post(upload_url, data=att['data'], headers=headers, timeout=60)
if resp.status_code == 201:
token = resp.json()['upload']['token']
uploaded.append({
'filename': att['filename'],
'token': token,
'content_type': att['mimeType']
})
logger.debug(f"Uploaded attachment: {att['filename']}")
elif resp.status_code == 422:
logger.warning(f"Attachment too large for Redmine: {att['filename']}")
else:
logger.warning(f"Failed to upload {att['filename']}: {resp.status_code}")
except Exception as e:
logger.warning(f"Error uploading attachment {att['filename']}: {e}")
return uploaded
def replace_inline_images(body, uploaded_attachments):
"""
Replace inline image placeholders with Redmine inline image syntax.
Handles:
- Gmail format: [image: filename.png]
- Outlook CID format: [cid:filename.png@identifier]
"""
if not uploaded_attachments:
return body
# Build a map of filenames (case-insensitive)
filename_map = {att['filename'].lower(): att['filename'] for att in uploaded_attachments}
def replace_gmail_placeholder(match):
"""Handle Gmail [image: filename] format."""
placeholder_name = match.group(1).strip()
lookup = placeholder_name.lower()
if lookup in filename_map:
actual_filename = filename_map[lookup]
return f'!{actual_filename}!'
return match.group(0)
def replace_cid_placeholder(match):
"""Handle Outlook [cid:filename@identifier] format."""
cid_content = match.group(1)
# Extract filename from cid:filename@identifier or cid:filename
if '@' in cid_content:
filename_part = cid_content.split('@')[0]
else:
filename_part = cid_content
lookup = filename_part.lower()
if lookup in filename_map:
actual_filename = filename_map[lookup]
return f'!{actual_filename}!'
return match.group(0)
# Replace Gmail [image: filename.png] patterns
body = re.sub(r'\[image:\s*([^\]]+)\]', replace_gmail_placeholder, body, flags=re.IGNORECASE)
# Replace Outlook [cid:filename.png@identifier] patterns
body = re.sub(r'\[cid:([^\]]+)\]', replace_cid_placeholder, body, flags=re.IGNORECASE)
return body
def extract_email_address(from_header):
"""Extract just the email address from a From header."""
_, email = parseaddr(from_header)
return email.lower()
def extract_all_email_addresses(header_value):
"""Extract all email addresses from a header (To, CC can have multiple)."""
if not header_value:
return []
# Handle multiple addresses separated by commas
# parseaddr only handles single address, so we split first
addresses = []
for part in header_value.split(','):
_, email = parseaddr(part.strip())
if email:
addresses.append(email.lower())
return addresses
def extract_hgusersuggestion_email(subject):
"""Extract user email from hgUserSuggestion form subjects.
hgUserSuggestion form submissions have subjects like:
'hgUserSuggestion mmcgary44@gmail.com 2026-01-28 05:59:21'
Returns the extracted email, or None if not an hgUserSuggestion subject.
"""
if not subject:
return None
# Check if this is an hgUserSuggestion subject
if 'hgusersuggestion' not in subject.lower():
return None
# Extract email using pattern: hgUserSuggestion