Skip to content
Back to Guides

Security Best Practices

Production-grade security for data pipelines and ML systems

Why Security Matters in Data Engineering

Data pipelines handle sensitive information: customer PII, financial records, health data, API keys, and database credentials. A single security breach can cost millions in fines, reputation damage, and customer trust loss.

This guide covers production-grade security practices for data engineering and MLOps systems. You'll learn how to protect secrets, implement RBAC, encrypt data, enable audit logging, and meet compliance requirements (GDPR, HIPAA, SOC 2).

Critical: Security is not optional for production systems. Implement these practices from day one - retrofitting security later is 10× harder and riskier.

Core Security Principles

Least Privilege

Grant minimum permissions needed. Users and services should only access what they need for their job.

Defense in Depth

Multiple security layers. If one layer fails, others protect the system (encryption + RBAC + audit logs).

Zero Trust

Never trust, always verify. Authenticate and authorize every request, even from internal networks.

1. Secrets Management

Never hardcode secrets. API keys, database passwords, and tokens belong in secret managers, not in code or environment variables. Use AWS Secrets Manager, HashiCorp Vault, or Azure Key Vault.

❌ Common Mistakes (Don't Do This!)

Hardcoding in Code
# ❌ NEVER DO THIS
DB_PASSWORD = "super_secret_123"
API_KEY = "sk-abc123xyz"

conn = psycopg2.connect(
    host="prod-db.example.com",
    password="super_secret_123"  # Exposed in Git history!
)
Environment Variables in Code
# ❌ Better but still risky
import os
DB_PASSWORD = os.environ.get("DB_PASSWORD")  # Visible in process list, logs

# ❌ .env files in Git
# .env
DB_PASSWORD=super_secret_123
API_KEY=sk-abc123xyz
# If .env is committed to Git, secrets are exposed forever

AWS Secrets Manager

✅ Correct: Fetch Secrets at Runtime

import boto3
import json
from functools import lru_cache

# Cache secrets for 5 minutes to reduce API calls
@lru_cache(maxsize=128)
def get_secret(secret_name, region_name="us-east-1"):
    """
    Securely fetch secret from AWS Secrets Manager
    """
    session = boto3.session.Session()
    client = session.client(
        service_name='secretsmanager',
        region_name=region_name
    )

    try:
        response = client.get_secret_value(SecretId=secret_name)
        return json.loads(response['SecretString'])
    except Exception as e:
        print(f"Error fetching secret {secret_name}: {e}")
        raise

# Usage in data pipeline
def connect_to_database():
    secrets = get_secret("prod/database/credentials")

    conn = psycopg2.connect(
        host=secrets['host'],
        port=secrets['port'],
        database=secrets['dbname'],
        user=secrets['username'],
        password=secrets['password']  # Never exposed in code
    )
    return conn

# Usage for API keys
def call_external_api():
    secrets = get_secret("prod/api/openai")
    openai.api_key = secrets['api_key']

    response = openai.ChatCompletion.create(...)
    return response

Creating Secrets (AWS CLI)

# Create database credentials secret
aws secretsmanager create-secret \
  --name prod/database/credentials \
  --description "Production database credentials" \
  --secret-string '{
    "host": "prod-db.example.com",
    "port": 5432,
    "dbname": "analytics",
    "username": "app_user",
    "password": "generated_strong_password_here"
  }'

# Create API key secret
aws secretsmanager create-secret \
  --name prod/api/openai \
  --secret-string '{
    "api_key": "sk-proj-...",
    "organization_id": "org-..."
  }'

# Enable automatic rotation (30 days)
aws secretsmanager rotate-secret \
  --secret-id prod/database/credentials \
  --rotation-lambda-arn arn:aws:lambda:...:function:SecretsManagerRotation \
  --rotation-rules AutomaticallyAfterDays=30

HashiCorp Vault (Self-Hosted)

Vault Configuration

import hvac

# Connect to Vault
client = hvac.Client(
    url='https://vault.example.com:8200',
    token=os.environ.get('VAULT_TOKEN')  # Use AppRole in production
)

# Store secret
client.secrets.kv.v2.create_or_update_secret(
    path='database/prod',
    secret=dict(
        username='app_user',
        password='secure_password'
    )
)

# Read secret
secret = client.secrets.kv.v2.read_secret_version(path='database/prod')
db_creds = secret['data']['data']

# Use secret
conn = psycopg2.connect(
    host='prod-db.example.com',
    user=db_creds['username'],
    password=db_creds['password']
)

# Dynamic secrets (Vault generates temporary credentials)
db_creds = client.secrets.database.generate_credentials(name='my-role')
# Credentials auto-expire after TTL, no rotation needed

Secret Rotation Best Practices

  • • Rotate secrets every 30-90 days automatically
  • • Use dynamic secrets when possible (auto-expire, no rotation needed)
  • • Test rotation in staging before enabling in production
  • • Monitor failed rotation attempts (indicates potential breach)
  • • Have runbook for emergency rotation (if secret is leaked)

2. RBAC (Role-Based Access Control)

Implement least privilege access. Define roles (data-engineer, data-scientist, analyst) with specific permissions. Never give everyone admin access.

AWS IAM Policies by Role

Data Engineer Role (Read/Write)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": [
        "arn:aws:s3:::data-lake-raw/*",
        "arn:aws:s3:::data-lake-processed/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "glue:GetTable",
        "glue:GetTables",
        "glue:CreateTable",
        "glue:UpdateTable"
      ],
      "Resource": "*"
    },
    {
      "Effect": "Allow",
      "Action": [
        "secretsmanager:GetSecretValue"
      ],
      "Resource": "arn:aws:secretsmanager:*:*:secret:dev/*"
    },
    {
      "Effect": "Deny",
      "Action": [
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": "arn:aws:s3:::data-lake-production/*"
    }
  ]
}

Data Analyst Role (Read-Only)

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "s3:GetObject",
        "s3:ListBucket"
      ],
      "Resource": [
        "arn:aws:s3:::data-lake-processed",
        "arn:aws:s3:::data-lake-processed/*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "athena:StartQueryExecution",
        "athena:GetQueryExecution",
        "athena:GetQueryResults"
      ],
      "Resource": "*",
      "Condition": {
        "StringEquals": {
          "athena:WorkGroup": "analyst-workgroup"
        }
      }
    },
    {
      "Effect": "Deny",
      "Action": [
        "s3:PutObject",
        "s3:DeleteObject"
      ],
      "Resource": "*"
    }
  ]
}

Database-Level RBAC

PostgreSQL Role Separation

-- Create roles with specific permissions
-- Read-only analyst role
CREATE ROLE analyst_role;
GRANT CONNECT ON DATABASE analytics TO analyst_role;
GRANT USAGE ON SCHEMA public TO analyst_role;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO analyst_role;

-- Data engineer role (read/write in staging, read-only in prod)
CREATE ROLE engineer_role;
GRANT CONNECT ON DATABASE analytics TO engineer_role;

-- Staging: full access
GRANT ALL PRIVILEGES ON SCHEMA staging TO engineer_role;
GRANT ALL PRIVILEGES ON ALL TABLES IN SCHEMA staging TO engineer_role;

-- Production: read-only
GRANT USAGE ON SCHEMA production TO engineer_role;
GRANT SELECT ON ALL TABLES IN SCHEMA production TO engineer_role;

-- Service account for data pipelines (write to prod)
CREATE ROLE pipeline_service_account WITH LOGIN PASSWORD 'secure_password';
GRANT USAGE ON SCHEMA production TO pipeline_service_account;
GRANT INSERT, UPDATE ON ALL TABLES IN SCHEMA production TO pipeline_service_account;

-- Assign roles to users
CREATE USER john.doe WITH PASSWORD 'secure_pass';
GRANT analyst_role TO john.doe;

CREATE USER jane.smith WITH PASSWORD 'secure_pass';
GRANT engineer_role TO jane.smith;

-- Revoke public access
REVOKE ALL ON SCHEMA public FROM PUBLIC;

Permission Matrix by Role

ResourceData AnalystData EngineerML EngineerAdmin
S3 Production DataReadReadReadFull
S3 Staging DataNoneFullFullFull
Database Prod SchemaReadReadReadFull
Secrets ManagerNoneRead (dev)Read (dev)Full
EMR/Spark ClustersNoneCreate (dev)Create (dev)Full
SageMaker ModelsNoneNoneFullFull

3. Data Encryption

Encrypt data at rest and in transit. Use TLS 1.2+ for network traffic and AES-256 for stored data. Never store PII or sensitive data unencrypted.

Encryption at Rest

S3 Bucket Encryption (SSE-KMS)

# Enable default encryption on S3 bucket
aws s3api put-bucket-encryption \
  --bucket my-data-lake \
  --server-side-encryption-configuration '{
    "Rules": [{
      "ApplyServerSideEncryptionByDefault": {
        "SSEAlgorithm": "aws:kms",
        "KMSMasterKeyID": "arn:aws:kms:us-east-1:123456789:key/abc-123"
      },
      "BucketKeyEnabled": true
    }]
  }'

# All new objects are automatically encrypted
# Use customer-managed KMS key for audit trail and key rotation

RDS Encryption

# Enable encryption when creating RDS instance
aws rds create-db-instance \
  --db-instance-identifier prod-analytics-db \
  --db-instance-class db.r5.xlarge \
  --engine postgres \
  --storage-encrypted \
  --kms-key-id arn:aws:kms:us-east-1:123456789:key/abc-123 \
  --backup-retention-period 7 \
  --enable-cloudwatch-logs-exports '["postgresql"]'

# Encrypted backups are automatic
# Cannot enable encryption on existing unencrypted DB (must create new)

Important: Encryption at rest protects against physical theft or unauthorized disk access. It does NOT protect against application-level attacks or insider threats with valid credentials.

Encryption in Transit

Database Connections (Enforce TLS)

import psycopg2
import ssl

# ✅ Force TLS/SSL connection
conn = psycopg2.connect(
    host='prod-db.example.com',
    port=5432,
    database='analytics',
    user='app_user',
    password='from_secrets_manager',
    sslmode='require',  # Require TLS
    sslrootcert='/path/to/ca-cert.pem'  # Verify server certificate
)

# PostgreSQL server configuration (postgresql.conf)
# ssl = on
# ssl_cert_file = '/path/to/server.crt'
# ssl_key_file = '/path/to/server.key'
# ssl_ca_file = '/path/to/ca.crt'

# Reject non-TLS connections (pg_hba.conf)
# hostssl  all  all  0.0.0.0/0  md5
# host     all  all  0.0.0.0/0  reject  # Deny non-TLS

API Calls (HTTPS Only)

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# ✅ Configure HTTPS with certificate verification
session = requests.Session()

# Retry on connection errors
retry = Retry(
    total=3,
    backoff_factor=1,
    status_forcelist=[500, 502, 503, 504]
)

adapter = HTTPAdapter(max_retries=retry)
session.mount('https://', adapter)

# Always use HTTPS, verify certificates
response = session.get(
    'https://api.example.com/data',
    headers={'Authorization': f'Bearer {token}'},
    verify=True,  # Verify SSL certificate (default)
    timeout=30
)

# ❌ NEVER disable SSL verification in production
# response = session.get(url, verify=False)  # DANGEROUS!

Column-Level Encryption (PII)

Encrypt Sensitive Columns

from cryptography.fernet import Fernet
import base64

class PIIEncryptor:
    """Encrypt PII fields (SSN, credit cards, emails) at column level"""

    def __init__(self, key):
        self.cipher = Fernet(key)

    @staticmethod
    def generate_key():
        """Generate encryption key (store in AWS Secrets Manager)"""
        return Fernet.generate_key()

    def encrypt(self, plaintext):
        """Encrypt sensitive data before storing in database"""
        if plaintext is None:
            return None
        return self.cipher.encrypt(plaintext.encode()).decode()

    def decrypt(self, ciphertext):
        """Decrypt when authorized user needs to view"""
        if ciphertext is None:
            return None
        return self.cipher.decrypt(ciphertext.encode()).decode()

# Usage in data pipeline
encryptor = PIIEncryptor(key=get_secret('encryption-key')['key'])

# Encrypt before insert
encrypted_ssn = encryptor.encrypt('123-45-6789')
encrypted_email = encryptor.encrypt('user@example.com')

cursor.execute(
    "INSERT INTO users (name, ssn_encrypted, email_encrypted) VALUES (%s, %s, %s)",
    ('John Doe', encrypted_ssn, encrypted_email)
)

# Decrypt when needed (with proper authorization check)
if user.has_permission('view_pii'):
    ssn = encryptor.decrypt(row['ssn_encrypted'])
else:
    ssn = '***-**-****'  # Masked

4. Audit Logging

Log all access to sensitive data: who accessed what, when, and from where. Essential for compliance (GDPR, HIPAA) and incident investigation.

AWS CloudTrail (API Audit Logs)

Enable CloudTrail for All Regions

# Create CloudTrail with data events logging
aws cloudtrail create-trail \
  --name all-regions-audit-trail \
  --s3-bucket-name audit-logs-bucket \
  --is-multi-region-trail \
  --enable-log-file-validation \
  --kms-key-id arn:aws:kms:us-east-1:123456789:key/abc-123

# Enable data events for S3 (track object-level access)
aws cloudtrail put-event-selectors \
  --trail-name all-regions-audit-trail \
  --event-selectors '[{
    "ReadWriteType": "All",
    "IncludeManagementEvents": true,
    "DataResources": [{
      "Type": "AWS::S3::Object",
      "Values": ["arn:aws:s3:::sensitive-data-bucket/*"]
    }]
  }]'

# Start logging
aws cloudtrail start-logging --name all-regions-audit-trail

# Query logs with Athena
# Who accessed this sensitive file in the last 30 days?
SELECT
  userIdentity.userName,
  eventTime,
  sourceIPAddress,
  requestParameters.bucketName,
  requestParameters.key
FROM cloudtrail_logs
WHERE requestParameters.bucketName = 'sensitive-data-bucket'
  AND requestParameters.key LIKE '%customers/pii%'
  AND eventTime > date_add('day', -30, now())
ORDER BY eventTime DESC;

Application-Level Audit Logging

Log Data Access in Application

import logging
import json
from datetime import datetime

# Configure audit logger (separate from application logs)
audit_logger = logging.getLogger('audit')
audit_handler = logging.FileHandler('/var/log/data-access-audit.log')
audit_handler.setFormatter(logging.Formatter('%(message)s'))
audit_logger.addHandler(audit_handler)
audit_logger.setLevel(logging.INFO)

def log_data_access(user, action, resource, success, ip_address, details=None):
    """
    Log all access to sensitive data
    Required for HIPAA, GDPR, SOC 2 compliance
    """
    log_entry = {
        'timestamp': datetime.utcnow().isoformat(),
        'user': user,
        'action': action,  # read, write, delete, export
        'resource': resource,  # table name, file path
        'success': success,
        'ip_address': ip_address,
        'details': details or {}
    }
    audit_logger.info(json.dumps(log_entry))

# Usage in data pipeline
def export_customer_data(user_id, customer_ids, ip_address):
    """Export customer PII (requires logging for compliance)"""

    # Check authorization
    if not has_permission(user_id, 'export_pii'):
        log_data_access(
            user=user_id,
            action='export_pii',
            resource=f'customers table ({len(customer_ids)} records)',
            success=False,
            ip_address=ip_address,
            details={'reason': 'insufficient_permissions'}
        )
        raise PermissionError("User not authorized to export PII")

    # Log successful access
    log_data_access(
        user=user_id,
        action='export_pii',
        resource=f'customers table ({len(customer_ids)} records)',
        success=True,
        ip_address=ip_address,
        details={
            'customer_ids': customer_ids,
            'export_format': 'csv'
        }
    )

    # Perform export
    return export_to_csv(customer_ids)

What to Log

Always Log:
  • • PII/PHI data access (read, export)
  • • Database credential usage
  • • Failed authentication attempts
  • • Permission changes
  • • Data deletion operations
  • • Configuration changes
Include in Logs:
  • • User ID (who)
  • • Timestamp (when)
  • • Resource (what)
  • • Action (read/write/delete)
  • • IP address (from where)
  • • Success/failure status

Audit Log Retention

  • GDPR: Keep access logs for 3-6 years
  • HIPAA: Retain audit logs for 6 years minimum
  • SOC 2: 1 year minimum, 7 years recommended
  • • Store logs in tamper-proof location (S3 with object lock)
  • • Encrypt audit logs (contain sensitive metadata)

5. Compliance (GDPR, HIPAA, SOC 2)

GDPR (General Data Protection Regulation)

Key GDPR Requirements

1. Right to Access (Article 15)
Users can request all data you have about them
def export_user_data(user_id):
    """GDPR Article 15: Export all data for user"""
    return {
        'profile': get_user_profile(user_id),
        'orders': get_user_orders(user_id),
        'activity_logs': get_user_activity(user_id),
        'ml_predictions': get_user_predictions(user_id)
    }
2. Right to Erasure (Article 17)
Users can request data deletion ("right to be forgotten")
def delete_user_data(user_id):
    """GDPR Article 17: Delete all user data"""
    # Delete from all tables
    delete_from_postgres(user_id)
    delete_from_s3(f's3://data-lake/users/{user_id}/')
    delete_from_redshift(user_id)

    # Anonymize ML training data (can't delete without retraining)
    anonymize_training_data(user_id)

    # Log deletion for compliance audit
    log_deletion(user_id, reason='gdpr_request')
3. Data Portability (Article 20)
Export data in machine-readable format (JSON, CSV)
4. Consent Management
Track and honor user consent for data processing

GDPR Fines: Up to €20M or 4% of global revenue (whichever is higher). Non-compliance is expensive!

HIPAA (Health Insurance Portability and Accountability Act)

HIPAA Requirements for PHI (Protected Health Information)

Encryption: PHI must be encrypted at rest and in transit (AES-256, TLS 1.2+)
Access Controls: Role-based access, minimum necessary principle
Audit Logs: Log all access to PHI, retain for 6 years
Business Associate Agreements (BAA): Required for cloud providers
Breach Notification: Report breaches of 500+ records within 60 days

HIPAA-Compliant Data Pipeline Checklist

AWS/GCP/Azure HIPAA BAA signed and active
All S3 buckets with PHI have encryption enabled (SSE-KMS)
All database connections use TLS 1.2+ (no plaintext connections)
Audit logging enabled for all PHI access (CloudTrail + app logs)
Access controls: least privilege, no shared accounts
Data retention policy: delete PHI after required retention period
Incident response plan: breach notification procedure documented

SOC 2 (System and Organization Controls)

SOC 2 Trust Principles

Security (required):
  • • Access controls and MFA
  • • Encryption at rest and in transit
  • • Network security (firewalls, VPCs)
  • • Vulnerability scanning
Availability (optional):
  • • System uptime and performance
  • • Backup and disaster recovery
  • • Incident response procedures
Confidentiality (optional):
  • • Data classification
  • • Encryption and access controls
  • • NDAs with employees/contractors
Processing Integrity (optional):
  • • Data quality and validation
  • • Error handling and logging
  • • Authorized processing only

Production Security Checklist

Verify all items before deploying data pipelines to production

Secrets & Access

No hardcoded secrets in code or configs
All secrets in AWS Secrets Manager or Vault
Automatic secret rotation enabled (30-90 days)
RBAC implemented with least privilege
No shared accounts, everyone has individual login
MFA enabled for all production access

Encryption & Network

S3 buckets encrypted with KMS (SSE-KMS)
RDS/Redshift encrypted at rest
All database connections use TLS 1.2+
VPC with private subnets for databases
Security groups: whitelist only (no 0.0.0.0/0 for SSH/DB)
PII columns encrypted at application level

Logging & Monitoring

CloudTrail enabled for all regions
S3 data events logged for sensitive buckets
Application audit logs for PII/PHI access
Failed login attempts logged and alerted
Audit log retention meets compliance (3-6 years)
GuardDuty/Security Hub enabled for threat detection

Compliance & Documentation

Data classification documented (PII, PHI, confidential)
GDPR data deletion procedure implemented
HIPAA BAA signed with cloud provider (if applicable)
Incident response plan documented and tested
Data retention policy enforced (auto-delete old data)
Security training completed by all team members

Complete all items before deploying to production. Security is non-negotiable for systems handling sensitive data. When in doubt, ask your security team or compliance officer.

Press Cmd+K to open