Security Best Practices
Production-grade security for data pipelines and ML systems
Why Security Matters in Data Engineering
Data pipelines handle sensitive information: customer PII, financial records, health data, API keys, and database credentials. A single security breach can cost millions in fines, reputation damage, and customer trust loss.
This guide covers production-grade security practices for data engineering and MLOps systems. You'll learn how to protect secrets, implement RBAC, encrypt data, enable audit logging, and meet compliance requirements (GDPR, HIPAA, SOC 2).
Critical: Security is not optional for production systems. Implement these practices from day one - retrofitting security later is 10× harder and riskier.
Core Security Principles
Least Privilege
Grant minimum permissions needed. Users and services should only access what they need for their job.
Defense in Depth
Multiple security layers. If one layer fails, others protect the system (encryption + RBAC + audit logs).
Zero Trust
Never trust, always verify. Authenticate and authorize every request, even from internal networks.
1. Secrets Management
Never hardcode secrets. API keys, database passwords, and tokens belong in secret managers, not in code or environment variables. Use AWS Secrets Manager, HashiCorp Vault, or Azure Key Vault.
❌ Common Mistakes (Don't Do This!)
# ❌ NEVER DO THIS
DB_PASSWORD = "super_secret_123"
API_KEY = "sk-abc123xyz"
conn = psycopg2.connect(
host="prod-db.example.com",
password="super_secret_123" # Exposed in Git history!
)# ❌ Better but still risky
import os
DB_PASSWORD = os.environ.get("DB_PASSWORD") # Visible in process list, logs
# ❌ .env files in Git
# .env
DB_PASSWORD=super_secret_123
API_KEY=sk-abc123xyz
# If .env is committed to Git, secrets are exposed foreverAWS Secrets Manager
✅ Correct: Fetch Secrets at Runtime
import boto3
import json
from functools import lru_cache
# Cache secrets for 5 minutes to reduce API calls
@lru_cache(maxsize=128)
def get_secret(secret_name, region_name="us-east-1"):
"""
Securely fetch secret from AWS Secrets Manager
"""
session = boto3.session.Session()
client = session.client(
service_name='secretsmanager',
region_name=region_name
)
try:
response = client.get_secret_value(SecretId=secret_name)
return json.loads(response['SecretString'])
except Exception as e:
print(f"Error fetching secret {secret_name}: {e}")
raise
# Usage in data pipeline
def connect_to_database():
secrets = get_secret("prod/database/credentials")
conn = psycopg2.connect(
host=secrets['host'],
port=secrets['port'],
database=secrets['dbname'],
user=secrets['username'],
password=secrets['password'] # Never exposed in code
)
return conn
# Usage for API keys
def call_external_api():
secrets = get_secret("prod/api/openai")
openai.api_key = secrets['api_key']
response = openai.ChatCompletion.create(...)
return responseCreating Secrets (AWS CLI)
# Create database credentials secret
aws secretsmanager create-secret \
--name prod/database/credentials \
--description "Production database credentials" \
--secret-string '{
"host": "prod-db.example.com",
"port": 5432,
"dbname": "analytics",
"username": "app_user",
"password": "generated_strong_password_here"
}'
# Create API key secret
aws secretsmanager create-secret \
--name prod/api/openai \
--secret-string '{
"api_key": "sk-proj-...",
"organization_id": "org-..."
}'
# Enable automatic rotation (30 days)
aws secretsmanager rotate-secret \
--secret-id prod/database/credentials \
--rotation-lambda-arn arn:aws:lambda:...:function:SecretsManagerRotation \
--rotation-rules AutomaticallyAfterDays=30HashiCorp Vault (Self-Hosted)
Vault Configuration
import hvac
# Connect to Vault
client = hvac.Client(
url='https://vault.example.com:8200',
token=os.environ.get('VAULT_TOKEN') # Use AppRole in production
)
# Store secret
client.secrets.kv.v2.create_or_update_secret(
path='database/prod',
secret=dict(
username='app_user',
password='secure_password'
)
)
# Read secret
secret = client.secrets.kv.v2.read_secret_version(path='database/prod')
db_creds = secret['data']['data']
# Use secret
conn = psycopg2.connect(
host='prod-db.example.com',
user=db_creds['username'],
password=db_creds['password']
)
# Dynamic secrets (Vault generates temporary credentials)
db_creds = client.secrets.database.generate_credentials(name='my-role')
# Credentials auto-expire after TTL, no rotation neededSecret Rotation Best Practices
- • Rotate secrets every 30-90 days automatically
- • Use dynamic secrets when possible (auto-expire, no rotation needed)
- • Test rotation in staging before enabling in production
- • Monitor failed rotation attempts (indicates potential breach)
- • Have runbook for emergency rotation (if secret is leaked)
2. RBAC (Role-Based Access Control)
Implement least privilege access. Define roles (data-engineer, data-scientist, analyst) with specific permissions. Never give everyone admin access.
AWS IAM Policies by Role
Data Engineer Role (Read/Write)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": [
"arn:aws:s3:::data-lake-raw/*",
"arn:aws:s3:::data-lake-processed/*"
]
},
{
"Effect": "Allow",
"Action": [
"glue:GetTable",
"glue:GetTables",
"glue:CreateTable",
"glue:UpdateTable"
],
"Resource": "*"
},
{
"Effect": "Allow",
"Action": [
"secretsmanager:GetSecretValue"
],
"Resource": "arn:aws:secretsmanager:*:*:secret:dev/*"
},
{
"Effect": "Deny",
"Action": [
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "arn:aws:s3:::data-lake-production/*"
}
]
}Data Analyst Role (Read-Only)
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:ListBucket"
],
"Resource": [
"arn:aws:s3:::data-lake-processed",
"arn:aws:s3:::data-lake-processed/*"
]
},
{
"Effect": "Allow",
"Action": [
"athena:StartQueryExecution",
"athena:GetQueryExecution",
"athena:GetQueryResults"
],
"Resource": "*",
"Condition": {
"StringEquals": {
"athena:WorkGroup": "analyst-workgroup"
}
}
},
{
"Effect": "Deny",
"Action": [
"s3:PutObject",
"s3:DeleteObject"
],
"Resource": "*"
}
]
}Database-Level RBAC
PostgreSQL Role Separation
-- Create roles with specific permissions -- Read-only analyst role CREATE ROLE analyst_role; GRANT CONNECT ON DATABASE analytics TO analyst_role; GRANT USAGE ON SCHEMA public TO analyst_role; GRANT SELECT ON ALL TABLES IN SCHEMA public TO analyst_role; -- Data engineer role (read/write in staging, read-only in prod) CREATE ROLE engineer_role; GRANT CONNECT ON DATABASE analytics TO engineer_role; -- Staging: full access GRANT ALL PRIVILEGES ON SCHEMA staging TO engineer_role; GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA staging TO engineer_role; -- Production: read-only GRANT USAGE ON SCHEMA production TO engineer_role; GRANT SELECT ON ALL TABLES IN SCHEMA production TO engineer_role; -- Service account for data pipelines (write to prod) CREATE ROLE pipeline_service_account WITH LOGIN PASSWORD 'secure_password'; GRANT USAGE ON SCHEMA production TO pipeline_service_account; GRANT INSERT, UPDATE ON ALL TABLES IN SCHEMA production TO pipeline_service_account; -- Assign roles to users CREATE USER john.doe WITH PASSWORD 'secure_pass'; GRANT analyst_role TO john.doe; CREATE USER jane.smith WITH PASSWORD 'secure_pass'; GRANT engineer_role TO jane.smith; -- Revoke public access REVOKE ALL ON SCHEMA public FROM PUBLIC;
Permission Matrix by Role
| Resource | Data Analyst | Data Engineer | ML Engineer | Admin |
|---|---|---|---|---|
| S3 Production Data | Read | Read | Read | Full |
| S3 Staging Data | None | Full | Full | Full |
| Database Prod Schema | Read | Read | Read | Full |
| Secrets Manager | None | Read (dev) | Read (dev) | Full |
| EMR/Spark Clusters | None | Create (dev) | Create (dev) | Full |
| SageMaker Models | None | None | Full | Full |
3. Data Encryption
Encrypt data at rest and in transit. Use TLS 1.2+ for network traffic and AES-256 for stored data. Never store PII or sensitive data unencrypted.
Encryption at Rest
S3 Bucket Encryption (SSE-KMS)
# Enable default encryption on S3 bucket
aws s3api put-bucket-encryption \
--bucket my-data-lake \
--server-side-encryption-configuration '{
"Rules": [{
"ApplyServerSideEncryptionByDefault": {
"SSEAlgorithm": "aws:kms",
"KMSMasterKeyID": "arn:aws:kms:us-east-1:123456789:key/abc-123"
},
"BucketKeyEnabled": true
}]
}'
# All new objects are automatically encrypted
# Use customer-managed KMS key for audit trail and key rotationRDS Encryption
# Enable encryption when creating RDS instance aws rds create-db-instance \ --db-instance-identifier prod-analytics-db \ --db-instance-class db.r5.xlarge \ --engine postgres \ --storage-encrypted \ --kms-key-id arn:aws:kms:us-east-1:123456789:key/abc-123 \ --backup-retention-period 7 \ --enable-cloudwatch-logs-exports '["postgresql"]' # Encrypted backups are automatic # Cannot enable encryption on existing unencrypted DB (must create new)
Important: Encryption at rest protects against physical theft or unauthorized disk access. It does NOT protect against application-level attacks or insider threats with valid credentials.
Encryption in Transit
Database Connections (Enforce TLS)
import psycopg2
import ssl
# ✅ Force TLS/SSL connection
conn = psycopg2.connect(
host='prod-db.example.com',
port=5432,
database='analytics',
user='app_user',
password='from_secrets_manager',
sslmode='require', # Require TLS
sslrootcert='/path/to/ca-cert.pem' # Verify server certificate
)
# PostgreSQL server configuration (postgresql.conf)
# ssl = on
# ssl_cert_file = '/path/to/server.crt'
# ssl_key_file = '/path/to/server.key'
# ssl_ca_file = '/path/to/ca.crt'
# Reject non-TLS connections (pg_hba.conf)
# hostssl all all 0.0.0.0/0 md5
# host all all 0.0.0.0/0 reject # Deny non-TLSAPI Calls (HTTPS Only)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
# ✅ Configure HTTPS with certificate verification
session = requests.Session()
# Retry on connection errors
retry = Retry(
total=3,
backoff_factor=1,
status_forcelist=[500, 502, 503, 504]
)
adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)
# Always use HTTPS, verify certificates
response = session.get(
'https://api.example.com/data',
headers={'Authorization': f'Bearer {token}'},
verify=True, # Verify SSL certificate (default)
timeout=30
)
# ❌ NEVER disable SSL verification in production
# response = session.get(url, verify=False) # DANGEROUS!Column-Level Encryption (PII)
Encrypt Sensitive Columns
from cryptography.fernet import Fernet
import base64
class PIIEncryptor:
"""Encrypt PII fields (SSN, credit cards, emails) at column level"""
def __init__(self, key):
self.cipher = Fernet(key)
@staticmethod
def generate_key():
"""Generate encryption key (store in AWS Secrets Manager)"""
return Fernet.generate_key()
def encrypt(self, plaintext):
"""Encrypt sensitive data before storing in database"""
if plaintext is None:
return None
return self.cipher.encrypt(plaintext.encode()).decode()
def decrypt(self, ciphertext):
"""Decrypt when authorized user needs to view"""
if ciphertext is None:
return None
return self.cipher.decrypt(ciphertext.encode()).decode()
# Usage in data pipeline
encryptor = PIIEncryptor(key=get_secret('encryption-key')['key'])
# Encrypt before insert
encrypted_ssn = encryptor.encrypt('123-45-6789')
encrypted_email = encryptor.encrypt('user@example.com')
cursor.execute(
"INSERT INTO users (name, ssn_encrypted, email_encrypted) VALUES (%s, %s, %s)",
('John Doe', encrypted_ssn, encrypted_email)
)
# Decrypt when needed (with proper authorization check)
if user.has_permission('view_pii'):
ssn = encryptor.decrypt(row['ssn_encrypted'])
else:
ssn = '***-**-****' # Masked4. Audit Logging
Log all access to sensitive data: who accessed what, when, and from where. Essential for compliance (GDPR, HIPAA) and incident investigation.
AWS CloudTrail (API Audit Logs)
Enable CloudTrail for All Regions
# Create CloudTrail with data events logging
aws cloudtrail create-trail \
--name all-regions-audit-trail \
--s3-bucket-name audit-logs-bucket \
--is-multi-region-trail \
--enable-log-file-validation \
--kms-key-id arn:aws:kms:us-east-1:123456789:key/abc-123
# Enable data events for S3 (track object-level access)
aws cloudtrail put-event-selectors \
--trail-name all-regions-audit-trail \
--event-selectors '[{
"ReadWriteType": "All",
"IncludeManagementEvents": true,
"DataResources": [{
"Type": "AWS::S3::Object",
"Values": ["arn:aws:s3:::sensitive-data-bucket/*"]
}]
}]'
# Start logging
aws cloudtrail start-logging --name all-regions-audit-trail
# Query logs with Athena
# Who accessed this sensitive file in the last 30 days?
SELECT
userIdentity.userName,
eventTime,
sourceIPAddress,
requestParameters.bucketName,
requestParameters.key
FROM cloudtrail_logs
WHERE requestParameters.bucketName = 'sensitive-data-bucket'
AND requestParameters.key LIKE '%customers/pii%'
AND eventTime > date_add('day', -30, now())
ORDER BY eventTime DESC;Application-Level Audit Logging
Log Data Access in Application
import logging
import json
from datetime import datetime
# Configure audit logger (separate from application logs)
audit_logger = logging.getLogger('audit')
audit_handler = logging.FileHandler('/var/log/data-access-audit.log')
audit_handler.setFormatter(logging.Formatter('%(message)s'))
audit_logger.addHandler(audit_handler)
audit_logger.setLevel(logging.INFO)
def log_data_access(user, action, resource, success, ip_address, details=None):
"""
Log all access to sensitive data
Required for HIPAA, GDPR, SOC 2 compliance
"""
log_entry = {
'timestamp': datetime.utcnow().isoformat(),
'user': user,
'action': action, # read, write, delete, export
'resource': resource, # table name, file path
'success': success,
'ip_address': ip_address,
'details': details or {}
}
audit_logger.info(json.dumps(log_entry))
# Usage in data pipeline
def export_customer_data(user_id, customer_ids, ip_address):
"""Export customer PII (requires logging for compliance)"""
# Check authorization
if not has_permission(user_id, 'export_pii'):
log_data_access(
user=user_id,
action='export_pii',
resource=f'customers table ({len(customer_ids)} records)',
success=False,
ip_address=ip_address,
details={'reason': 'insufficient_permissions'}
)
raise PermissionError("User not authorized to export PII")
# Log successful access
log_data_access(
user=user_id,
action='export_pii',
resource=f'customers table ({len(customer_ids)} records)',
success=True,
ip_address=ip_address,
details={
'customer_ids': customer_ids,
'export_format': 'csv'
}
)
# Perform export
return export_to_csv(customer_ids)What to Log
- • PII/PHI data access (read, export)
- • Database credential usage
- • Failed authentication attempts
- • Permission changes
- • Data deletion operations
- • Configuration changes
- • User ID (who)
- • Timestamp (when)
- • Resource (what)
- • Action (read/write/delete)
- • IP address (from where)
- • Success/failure status
Audit Log Retention
- • GDPR: Keep access logs for 3-6 years
- • HIPAA: Retain audit logs for 6 years minimum
- • SOC 2: 1 year minimum, 7 years recommended
- • Store logs in tamper-proof location (S3 with object lock)
- • Encrypt audit logs (contain sensitive metadata)
5. Compliance (GDPR, HIPAA, SOC 2)
GDPR (General Data Protection Regulation)
Key GDPR Requirements
def export_user_data(user_id):
"""GDPR Article 15: Export all data for user"""
return {
'profile': get_user_profile(user_id),
'orders': get_user_orders(user_id),
'activity_logs': get_user_activity(user_id),
'ml_predictions': get_user_predictions(user_id)
}def delete_user_data(user_id):
"""GDPR Article 17: Delete all user data"""
# Delete from all tables
delete_from_postgres(user_id)
delete_from_s3(f's3://data-lake/users/{user_id}/')
delete_from_redshift(user_id)
# Anonymize ML training data (can't delete without retraining)
anonymize_training_data(user_id)
# Log deletion for compliance audit
log_deletion(user_id, reason='gdpr_request')GDPR Fines: Up to €20M or 4% of global revenue (whichever is higher). Non-compliance is expensive!
HIPAA (Health Insurance Portability and Accountability Act)
HIPAA Requirements for PHI (Protected Health Information)
HIPAA-Compliant Data Pipeline Checklist
SOC 2 (System and Organization Controls)
SOC 2 Trust Principles
- • Access controls and MFA
- • Encryption at rest and in transit
- • Network security (firewalls, VPCs)
- • Vulnerability scanning
- • System uptime and performance
- • Backup and disaster recovery
- • Incident response procedures
- • Data classification
- • Encryption and access controls
- • NDAs with employees/contractors
- • Data quality and validation
- • Error handling and logging
- • Authorized processing only
Production Security Checklist
Verify all items before deploying data pipelines to production
Secrets & Access
Encryption & Network
Logging & Monitoring
Compliance & Documentation
Complete all items before deploying to production. Security is non-negotiable for systems handling sensitive data. When in doubt, ask your security team or compliance officer.
Related Resources
Spark Cost Optimization
Reduce Spark costs with spot instances and resource management
LLM Cost Optimization
Cut LLM API costs with batching, caching, and model selection
MLOps Cost Optimization
Save on MLOps with artifact lifecycle and compute auto-shutdown
Industry Case Studies
Learn from real-world implementations at top tech companies