Configuration Reference
OSO Kafka Backup uses YAML configuration files for backup and restore operations. This reference documents all available options.
Configuration Structure
# Required: Operation mode
mode: backup # or "restore"
# Required: Unique backup identifier
backup_id: "my-backup-001"
# Source/Target Kafka cluster configuration
source: # For backup mode
target: # For restore mode
bootstrap_servers: []
security: {}
topics: {}
# Storage backend configuration
storage:
backend: filesystem # or s3, azure, gcs
# Backend-specific options...
# Mode-specific options
backup: {} # Backup options
restore: {} # Restore options
Common Configuration
mode
Required. Operation mode.
mode: backup # Run a backup operation
mode: restore # Run a restore operation
backup_id
Required. Unique identifier for the backup.
backup_id: "production-daily-001"
backup_id: "dr-backup-$(date +%Y%m%d)"
Kafka Cluster Configuration
Used as source in backup mode and target in restore mode.
bootstrap_servers
Required. List of Kafka broker addresses.
source:
bootstrap_servers:
- broker-1.kafka.svc:9092
- broker-2.kafka.svc:9092
- broker-3.kafka.svc:9092
security
Optional security configuration for Kafka connection.
source:
security:
# Security protocol
security_protocol: SASL_SSL # PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL
# SASL configuration
sasl_mechanism: SCRAM-SHA-256 # PLAIN, SCRAM-SHA-256, SCRAM-SHA-512
sasl_username: backup-user
sasl_password: ${KAFKA_PASSWORD} # Environment variable substitution
# SSL/TLS configuration
ssl_ca_location: /etc/kafka/ca.crt
ssl_certificate_location: /etc/kafka/client.crt
ssl_key_location: /etc/kafka/client.key
ssl_key_password: ${SSL_KEY_PASSWORD}
Security Protocol Options
| Protocol | Description |
|---|---|
PLAINTEXT | No encryption, no authentication |
SSL | TLS encryption, optional mTLS |
SASL_PLAINTEXT | SASL authentication, no encryption |
SASL_SSL | SASL authentication with TLS encryption |
SASL Mechanism Options
| Mechanism | Description |
|---|---|
PLAIN | Simple username/password |
SCRAM-SHA-256 | Salted Challenge Response (SHA-256) |
SCRAM-SHA-512 | Salted Challenge Response (SHA-512) |
connection
Optional TCP connection settings. These are particularly important for cloud-hosted Kafka services like Confluent Cloud that terminate idle connections.
source:
connection:
# Enable TCP keepalive to prevent idle connection termination
tcp_keepalive: true # Default: true
# Time in seconds before first keepalive probe is sent
keepalive_time_secs: 60 # Default: 60
# Interval in seconds between keepalive probes
keepalive_interval_secs: 20 # Default: 20
# Enable TCP_NODELAY (disable Nagle's algorithm) for lower latency
tcp_nodelay: true # Default: true
Connection Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
tcp_keepalive | bool | true | Enable TCP keepalive probes |
keepalive_time_secs | int | 60 | Seconds idle before first probe |
keepalive_interval_secs | int | 20 | Seconds between probes |
tcp_nodelay | bool | true | Disable Nagle's algorithm |
Confluent Cloud terminates idle TCP connections after ~5 minutes. The default keepalive settings (60s time, 20s interval) prevent this. If you're experiencing "Broken pipe" errors with Confluent Cloud, ensure TCP keepalive is enabled.
topics
Topic selection for backup or restore.
source:
topics:
# Include specific topics or patterns
include:
- orders # Exact topic name
- payments # Another exact name
- "events-*" # Wildcard pattern
- "logs-2024-*" # Date-based pattern
# Exclude topics (applied after include)
exclude:
- "__consumer_offsets" # Internal Kafka topic
- "_schemas" # Schema Registry topic
- "*-internal" # Pattern exclusion
Storage Configuration
backend
Required. Storage backend type.
storage:
backend: filesystem # Local filesystem or mounted volume
backend: s3 # Amazon S3 or S3-compatible storage
backend: azure # Azure Blob Storage
backend: gcs # Google Cloud Storage
Filesystem Storage
storage:
backend: filesystem
path: "/var/lib/kafka-backup/data"
prefix: "cluster-prod" # Optional subdirectory
S3 Storage
storage:
backend: s3
bucket: my-kafka-backups
region: us-west-2
prefix: backups/production # Optional key prefix
# Optional: Custom endpoint for MinIO, Ceph, etc.
endpoint: https://minio.example.com:9000
# Credentials (optional - uses AWS credential chain if not specified)
access_key: ${AWS_ACCESS_KEY_ID}
secret_key: ${AWS_SECRET_ACCESS_KEY}
Azure Blob Storage
storage:
backend: azure
container: kafka-backups
account_name: mystorageaccount
prefix: backups/production
# Credentials
account_key: ${AZURE_STORAGE_KEY}
# Or use connection string:
# connection_string: ${AZURE_STORAGE_CONNECTION_STRING}
Google Cloud Storage
storage:
backend: gcs
bucket: my-kafka-backups
prefix: backups/production
# Credentials (uses GOOGLE_APPLICATION_CREDENTIALS if not specified)
service_account_json: /etc/gcp/service-account.json
Backup Configuration
Options specific to backup mode.
backup:
# Compression settings
compression: zstd # Options: zstd, lz4, none
compression_level: 3 # 1-22 for zstd (default: 3)
# Starting offset
start_offset: earliest # earliest, latest, or specific offset
# Segment settings
segment_max_bytes: 134217728 # 128 MB - roll segment after this size
segment_max_interval_ms: 60000 # 60 sec - roll segment after this time
# Continuous backup mode
continuous: false # true for streaming backup
# Internal topics
include_internal_topics: false # Include __consumer_offsets, etc.
# Checkpointing
checkpoint_interval_secs: 30 # Save progress every 30 seconds
sync_interval_secs: 60 # Sync to storage every 60 seconds
# Offset headers (required for consumer offset reset)
include_offset_headers: true
# Source cluster identifier
source_cluster_id: "prod-cluster-east"
# Snapshot backup mode (v0.8.0+)
# Captures high watermarks at start and exits when all partitions reach them.
# Incompatible with continuous: true.
stop_at_current_offsets: false
# Performance tuning (v0.8.0+)
max_concurrent_partitions: 8 # Parallel partition processing (default: 8)
poll_interval_ms: 100 # Delay between poll attempts in ms (default: 100)
Backup Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
compression | string | zstd | Compression algorithm |
compression_level | int | 3 | Compression level (1-22 for zstd) |
start_offset | string | earliest | Starting offset: earliest, latest |
segment_max_bytes | int | 134217728 | Max segment size in bytes |
segment_max_interval_ms | int | 60000 | Max segment duration in ms |
continuous | bool | false | Enable continuous/streaming backup |
include_internal_topics | bool | false | Include internal Kafka topics |
checkpoint_interval_secs | int | 30 | Checkpoint frequency |
sync_interval_secs | int | 60 | Storage sync frequency |
include_offset_headers | bool | true | Store original offsets |
source_cluster_id | string | - | Cluster identifier for tracking |
stop_at_current_offsets | bool | false | Snapshot mode: stop after reaching current high watermark (v0.8.0+) |
max_concurrent_partitions | int | 8 | Maximum parallel partition processing (v0.8.0+) |
poll_interval_ms | int | 100 | Delay in ms between consumer poll attempts (v0.8.0+) |
Restore Configuration
Options specific to restore mode.
restore:
# Point-in-Time Recovery (PITR)
time_window_start: 1701417600000 # Unix milliseconds (optional)
time_window_end: 1701504000000 # Unix milliseconds (optional)
# Auto-create topics if they don't exist (v0.3.0+)
create_topics: true
default_replication_factor: 3 # Replication factor for new topics
# Partition filtering
source_partitions: # Only restore specific partitions
- 0
- 1
- 2
# Partition mapping (remap partitions during restore)
partition_mapping:
0: 0
1: 2 # Source partition 1 -> target partition 2
# Topic remapping
topic_mapping:
orders: orders_restored # orders -> orders_restored
payments: payments_dr # payments -> payments_dr
# Consumer offset strategy
consumer_group_strategy: skip # skip, header-based, timestamp-based, manual
# Dry run mode
dry_run: false # Validate without executing
# Include original offset in headers
include_original_offset_header: true
# Rate limiting
rate_limit_records_per_sec: null # null for unlimited
rate_limit_bytes_per_sec: null # null for unlimited
# Performance tuning
max_concurrent_partitions: 4 # Parallel partition processing
produce_batch_size: 1000 # Records per produce batch
# Resumable restores
checkpoint_state: null # Path to checkpoint file
checkpoint_interval_secs: 60 # Checkpoint frequency
# Offset mapping report
offset_report: /tmp/offset-mapping.json # Save offset mapping
# Consumer group offset reset
reset_consumer_offsets: false # Reset offsets after restore
consumer_groups: # Groups to reset
- my-consumer-group
- analytics-consumer
Restore Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
time_window_start | int | - | PITR start timestamp (Unix ms) |
time_window_end | int | - | PITR end timestamp (Unix ms) |
create_topics | bool | false | Auto-create topics if they don't exist (v0.3.0+) |
default_replication_factor | int | 1 | Replication factor for auto-created topics |
source_partitions | list | - | Partitions to restore |
partition_mapping | map | - | Partition remapping |
topic_mapping | map | - | Topic remapping |
consumer_group_strategy | string | skip | Offset handling strategy |
dry_run | bool | false | Validate without executing |
include_original_offset_header | bool | true | Add original offset header |
rate_limit_records_per_sec | int | - | Rate limit (records/sec) |
rate_limit_bytes_per_sec | int | - | Rate limit (bytes/sec) |
max_concurrent_partitions | int | 4 | Parallel partitions |
produce_batch_size | int | 1000 | Batch size |
reset_consumer_offsets | bool | false | Reset consumer offsets |
consumer_groups | list | - | Consumer groups to reset |
Consumer Group Strategies
| Strategy | Description |
|---|---|
skip | Don't modify consumer offsets |
header-based | Use offset mapping from backup headers |
timestamp-based | Reset to timestamp-based offsets |
cluster-scan | Scan target cluster for offset mapping |
manual | Generate script for manual reset |
Complete Examples
Basic Backup
mode: backup
backup_id: "daily-backup"
source:
bootstrap_servers:
- kafka:9092
topics:
include:
- orders
- payments
storage:
backend: filesystem
path: "/data/backups"
backup:
compression: zstd
Production S3 Backup
mode: backup
backup_id: "prod-backup-${BACKUP_DATE}"
source:
bootstrap_servers:
- broker-1.prod.kafka:9092
- broker-2.prod.kafka:9092
- broker-3.prod.kafka:9092
security:
security_protocol: SASL_SSL
sasl_mechanism: SCRAM-SHA-256
sasl_username: backup-service
sasl_password: ${KAFKA_PASSWORD}
ssl_ca_location: /etc/kafka/ca.crt
# Connection settings (recommended for cloud Kafka services)
connection:
tcp_keepalive: true
keepalive_time_secs: 60
keepalive_interval_secs: 20
tcp_nodelay: true
topics:
include:
- "*"
exclude:
- "__consumer_offsets"
- "_schemas"
- "*-internal"
storage:
backend: s3
bucket: company-kafka-backups
region: us-west-2
prefix: production/${CLUSTER_NAME}
backup:
compression: zstd
compression_level: 5
checkpoint_interval_secs: 60
include_offset_headers: true
source_cluster_id: "prod-us-west-2"
Point-in-Time Restore
mode: restore
backup_id: "prod-backup-20241201"
target:
bootstrap_servers:
- dr-broker-1:9092
- dr-broker-2:9092
storage:
backend: s3
bucket: company-kafka-backups
region: us-west-2
prefix: production/prod-cluster
restore:
# Restore only data from Dec 1, 2024 10:00 to 14:00 UTC
time_window_start: 1701424800000
time_window_end: 1701439200000
topic_mapping:
orders: orders_restored
consumer_group_strategy: header-based
reset_consumer_offsets: true
consumer_groups:
- order-processor
- analytics-service
Disaster Recovery Restore
mode: restore
backup_id: "prod-backup-latest"
target:
bootstrap_servers:
- dr-broker-1.dr.kafka:9092
- dr-broker-2.dr.kafka:9092
- dr-broker-3.dr.kafka:9092
security:
security_protocol: SASL_SSL
sasl_mechanism: SCRAM-SHA-256
sasl_username: restore-service
sasl_password: ${DR_KAFKA_PASSWORD}
ssl_ca_location: /etc/kafka/ca.crt
storage:
backend: s3
bucket: company-kafka-backups
region: us-east-1 # DR region
prefix: production/prod-cluster
restore:
dry_run: false
max_concurrent_partitions: 8
produce_batch_size: 5000
consumer_group_strategy: header-based
reset_consumer_offsets: true
consumer_groups:
- order-service
- payment-service
- notification-service
- analytics-pipeline
Environment Variable Substitution
Configuration files support environment variable substitution using ${VAR_NAME} syntax:
source:
security:
sasl_password: ${KAFKA_PASSWORD}
storage:
backend: s3
access_key: ${AWS_ACCESS_KEY_ID}
secret_key: ${AWS_SECRET_ACCESS_KEY}
Set variables before running:
export KAFKA_PASSWORD="secret123"
export AWS_ACCESS_KEY_ID="AKIA..."
export AWS_SECRET_ACCESS_KEY="..."
kafka-backup backup --config backup.yaml