# Active Job Patterns ## Active Job Patterns: Continuations vs. Checkpointing This document explains two distinct patterns for managing complex background jobs in Rails. ## Pattern 1: Job Chaining (Continuations) **Purpose:** Schedule follow-up jobs after the current job completes successfully. **Use Case:** Sequential workflows where Job A must complete before Job B starts. **Implementation:** `Continuable` concern (`app/jobs/concerns/continuable.rb`) ### Example: ```ruby class SendWelcomeEmailJob < ApplicationJob include Continuable # These jobs run AFTER welcome email is sent successfully continuations do |user_id| SendOnboardingTipsJob.set(wait: 5.minutes).perform_later(user_id) NotifyAdminJob.perform_later(user_id) end def perform(user_id) UserMailer.welcome_email(user_id).deliver_now end end ``` ### Flow: ``` SendWelcomeEmailJob completes ↓ Continuations triggered ↓ ├── SendOnboardingTipsJob scheduled (5 min delay) └── NotifyAdminJob scheduled (immediate) ``` ### Key Features: - ✅ **Success-only execution** - Continuations only run if parent succeeds - ✅ **Job chaining** - Build multi-step workflows - ✅ **Error isolation** - Continuation failures don't affect parent - ✅ **Declarative** - Clear job dependencies at class level --- ## Pattern 2: Job Checkpointing (Steps) **Purpose:** Break long-running jobs into resumable steps with progress tracking. **Use Case:** Processing large datasets, imports, bulk operations that can be interrupted. **Implementation:** `Steppable` concern (`app/jobs/concerns/steppable.rb`) ### Example: ```ruby class ProcessImportJob < ApplicationJob include Steppable def perform(import_id) @import = Import.find(import_id) # Step 1: Initialize step :initialize do @import.update!(status: 'processing') end # Step 2: Process with checkpointing step :process do |step| @import.records.find_each(start: step.cursor) do |record| process_record(record) step.advance! from: record.id # Save progress end end # Step 3: Finalize step :finalize end private def finalize @import.update!(status: 'completed') end end ``` ### Flow: ``` Step 1: Initialize ✓ Step 2: Process ├── Record 1 ✓ (checkpoint saved) ├── Record 2 ✓ (checkpoint saved) ├── Record 3 ❌ (job crashes/interrupted) └── [Job restarts from Record 3] Step 3: Finalize ✓ ``` ### Key Features: - ✅ **Resumable** - Jobs can be interrupted and resumed from last checkpoint - ✅ **Fault-tolerant** - Partial progress is never lost - ✅ **Progress tracking** - Know exactly where you are in a long process - ✅ **Idempotent** - Safe to re-run from any checkpoint --- ## When to Use Each Pattern ### Use Continuations When: - ✓ You need to chain multiple jobs sequentially - ✓ Each job is independent and relatively quick - ✓ You want clean separation between jobs - ✓ You need conditional job chains **Examples:** - Email workflows (welcome → onboarding → metrics) - Post-registration tasks - Multi-stage notifications - Async API calls with callbacks ### Use Checkpointing When: - ✓ Processing large datasets (thousands+ records) - ✓ Jobs that take minutes/hours to complete - ✓ Risk of interruption (server restarts, timeouts) - ✓ Need to show progress to users **Examples:** - Bulk email sending - Data imports/exports - Report generation - Database migrations - Image/video processing pipelines --- ## Combining Both Patterns You can use both patterns together for complex workflows: ```ruby class BulkWelcomeEmailJob < ApplicationJob include Steppable # For resumable processing include Continuable # For follow-up jobs # Continuation: runs AFTER all steps complete continuations do |options| TrackBulkEmailMetricsJob.perform_later(options) end def perform(options) # Step 1: Initialize step :initialize_scope do @users = User.where(...) end # Step 2: Process with checkpointing step :send_emails do |step| @users.find_each(start: step.cursor) do |user| send_email(user) step.advance! from: user.id end end # Step 3: Finalize step :finalize end end ``` **Flow:** ``` Step 1: Initialize ✓ Step 2: Send emails (resumable) ├── User 1 ✓ ├── User 2 ✓ └── ... (thousands more) Step 3: Finalize ✓ ↓ Continuation triggered ↓ TrackBulkEmailMetricsJob scheduled ``` --- ## Implementation Details ### Continuable Concern **File:** `app/jobs/concerns/continuable.rb` ```ruby module Continuable extend ActiveSupport::Concern class_methods do def continuations(&block) @continuation_block = block after_perform :execute_continuations end end private def execute_continuations instance_exec(*arguments, &self.class.continuation_block) end end ``` **How it works:** 1. `continuations` block is defined at class level 2. `after_perform` callback is registered 3. When job completes successfully, callback executes the block 4. Block schedules follow-up jobs ### Steppable Concern **File:** `app/jobs/concerns/steppable.rb` ```ruby module Steppable class Step def advance!(from:) @cursor = from Rails.cache.write(cursor_key, from, expires_in: 24.hours) end def cursor Rails.cache.read(cursor_key) end end def step(name, &block) step_obj = Step.new(name, self) block.call(step_obj) step_obj.complete! end end ``` **How it works:** 1. `step` method creates a Step object 2. Step object tracks cursor in Rails.cache 3. `step.advance!` saves progress 4. On restart, `step.cursor` loads last position 5. `find_each(start: step.cursor)` resumes from checkpoint --- ## Testing ### Testing Continuations: ```ruby RSpec.describe SendWelcomeEmailJob do it "schedules continuation jobs after success" do expect(SendOnboardingTipsJob).to receive(:set) .with(wait: 5.minutes) .and_return(double(perform_later: true)) SendWelcomeEmailJob.perform_now(user.id) end it "does not schedule continuations if job fails" do allow(UserMailer).to receive(:welcome_email).and_raise(StandardError) expect(SendOnboardingTipsJob).not_to receive(:set) expect { SendWelcomeEmailJob.perform_now(user.id) }.to raise_error end end ``` ### Testing Checkpointing: ```ruby RSpec.describe ProcessImportJob do it "can resume from checkpoint after failure" do # First attempt: fails midway expect { ProcessImportJob.perform_now(import.id) }.to raise_error # Verify checkpoint was saved step = Steppable::Step.new(:process, job) expect(step.cursor).to eq(last_processed_id) # Second attempt: resumes from checkpoint ProcessImportJob.perform_now(import.id) expect(import.reload.status).to eq('completed') end end ``` --- ## Real-World Examples in Our App ### Continuations (Already Implemented): - ✅ `SendWelcomeEmailJob` → `SendOnboardingTipsJob` → `TrackSignupMetricsJob` - ✅ `SendWelcomeEmailJob` → `NotifyAdminJob` (parallel) ### Checkpointing (Example Implementations): - 📧 `BulkWelcomeEmailJob` - Send welcome emails to thousands of users - 📦 `ProcessImportJob` - Process large data imports - 📊 `GenerateReportJob` - Generate reports from large datasets --- ## Performance Considerations ### Continuations: - **Overhead:** Minimal (just scheduling next jobs) - **Scalability:** Excellent (each job is independent) - **Resource usage:** Low (jobs are small and quick) ### Checkpointing: - **Overhead:** Small (cache writes for checkpoints) - **Scalability:** Good (single job processes many records) - **Resource usage:** Moderate (long-running, but resumable) --- ## Summary | Feature | Continuations | Checkpointing | |---------|--------------|---------------| | **Purpose** | Chain jobs | Resume jobs | | **Granularity** | Job-level | Record-level | | **Resumable** | No | Yes | | **Best for** | Workflows | Large datasets | | **Complexity** | Low | Medium | | **Example** | Email sequences | Bulk operations | **Bottom line:** - Use **Continuations** for job workflows - Use **Checkpointing** for resumable processing - Use **Both** for complex, resumable workflows --- **See also:** - `docs/ACTIVE_JOBS_EMAIL_SETUP.md` - Email signup implementation guide - `app/jobs/concerns/continuable.rb` - Continuations implementation - `app/jobs/concerns/steppable.rb` - Checkpointing implementation