Skip to content

Workflow performance: step processing time grows with completed step count #1315

@okomarov

Description

@okomarov

Summary

When a workflow processes items sequentially (one 'use step' per item), the time to execute each new step grows linearly with the number of previously completed steps. This makes workflows with thousands of items impractical — we observe a 2.7x slowdown over just 100 items, and need to process ~24,000.

Environment

  • workflow: 4.2.0-beta.67
  • Node.js: 22.7.0
  • Next.js: 15.5.10 (App Router)
  • Deployed on Vercel

Observed behavior

We have a workflow that processes documents one at a time. Each item triggers an async external service call (document extraction API) in a fire-and-forget fashion (we receive result by a non-workflow webhook), then simply logs progress count to the database — all within a single step.

Over 100 consecutive items, the gap between each step completing and the next one starting grows from ~2.7s to ~7.2s (trend: +0.62s per minute of runtime):

Image

The external service itself runs at a steady 4–8s per call — the slowdown is entirely in the workflow runtime overhead between steps.

Expected behavior

Step execution overhead should be roughly constant, or have negligible degradation, regardless of how many steps have already completed in the workflow run.

Root cause hypothesis

In our case, each processOneItem step receives and returns a counts object (with an accumulating errors array). After N steps, the runtime must deserialize N step results on every resume — and the serialized payload grows as errors accumulate. However, we observe 1-2 errors top per 1000 runs, so it can't be the source of growth of the event log.

Minimal reproduction

The workflow below distills our actual code. It processes items from a database, one step per item, passing an accumulator through each step:

interface JobCounts {
  processedCount: number
  skippedCount: number
  errorCount: number
  errors: { itemId: string; error: string }[]
  stopped?: boolean
}

const PAGE_SIZE = 100

export async function batchProcessWorkflow(jobId: string) {
  'use workflow'

  try {
    await markJobRunning(jobId)

    const total = await countItemsToProcess()
    await updateJobTotal(jobId, total)

    if (total === 0) {
      await completeJob(jobId, { processedCount: 0, skippedCount: 0, errorCount: 0, errors: [] })
      return
    }

    // Paginate item IDs to avoid loading all 24k into a single step return value
    // (that return value would be deserialized on every replay)
    let counts: JobCounts = { processedCount: 0, skippedCount: 0, errorCount: 0, errors: [] }
    let offset = 0

    while (offset < total) {
      const page = await getItemsPage(offset)

      for (const item of page) {
        // Single step per item: calls external service + writes progress
        // Pass counts as param, get new counts back (value semantics for replay safety)
        counts = await processOneItem(jobId, item.id, counts)
        if (counts.stopped) return
      }

      if (page.length === 0) break
      offset += page.length
    }

    await completeJob(jobId, counts)
  } catch (err) {
    await failJob(jobId, err instanceof Error ? err.message : String(err))
    throw err
  }
}

Steps implementation (simplified). Note: every step uses dynamic await import() because 'use step' requires it — these run on every replay too:

async function markJobRunning(jobId: string) {
  'use step'

  const { db } = await import('@/db')
  const { jobsTable } = await import('@/db/schema/jobs')
  const { eq } = await import('drizzle-orm')

  await db.update(jobsTable).set({ status: 'running' }).where(eq(jobsTable.id, jobId))
}

async function countItemsToProcess(): Promise<number> {
  'use step'

  const { db } = await import('@/db')
  const { itemsTable } = await import('@/db/schema/items')
  const { sql } = await import('drizzle-orm')

  const [row] = await db
    .select({ count: sql<number>`count(*)` })
    .from(itemsTable)

  return Number(row.count)
}

async function getItemsPage(offset: number): Promise<{ id: string }[]> {
  'use step'

  const { db } = await import('@/db')
  const { itemsTable } = await import('@/db/schema/items')
  const { sql } = await import('drizzle-orm')

  // Only fetch 100 IDs at a time to keep step return values small
  return db
    .select({ id: itemsTable.id })
    .from(itemsTable)
    .orderBy(sql`updated_at ASC NULLS FIRST`)
    .limit(PAGE_SIZE)
    .offset(offset)
}

async function processOneItem(jobId: string, itemId: string, counts: JobCounts): Promise<JobCounts> {
  'use step'

  const { db } = await import('@/db')
  const { itemsTable } = await import('@/db/schema/items')
  const { jobsTable } = await import('@/db/schema/jobs')
  const { processDocument } = await import('@/lib/extraction-service')
  const { eq } = await import('drizzle-orm')

  // Create new object (value semantics — mutations don't survive replay)
  const updated: JobCounts = { ...counts, errors: [...counts.errors] }

  try {
    // External async API call (~4-8s) — the actual work
    await processDocument(itemId)
    updated.processedCount++
  } catch (err) {
    updated.errorCount++
    updated.errors.push({ itemId, error: err instanceof Error ? err.message : String(err) })
  }

  // Write progress to DB and check for user-initiated stop signal
  const [row] = await db
    .update(jobsTable)
    .set({
      processedCount: updated.processedCount,
      skippedCount: updated.skippedCount,
      errorCount: updated.errorCount,
      errors: updated.errors.length > 0 ? updated.errors : undefined,
    })
    .where(eq(jobsTable.id, jobId))
    .returning({ status: jobsTable.status })

  if (row?.status === 'stopped') return { ...updated, stopped: true }
  return updated
}

async function updateJobTotal(jobId: string, total: number) {
  'use step'

  const { db } = await import('@/db')
  const { jobsTable } = await import('@/db/schema/jobs')
  const { eq } = await import('drizzle-orm')

  await db.update(jobsTable).set({ totalCount: total }).where(eq(jobsTable.id, jobId))
}

async function completeJob(jobId: string, counts: JobCounts) {
  'use step'

  const { db } = await import('@/db')
  const { jobsTable } = await import('@/db/schema/jobs')
  const { eq } = await import('drizzle-orm')

  await db
    .update(jobsTable)
    .set({
      status: 'completed',
      completedAt: new Date(),
      processedCount: counts.processedCount,
      skippedCount: counts.skippedCount,
      errorCount: counts.errorCount,
      errors: counts.errors,
    })
    .where(eq(jobsTable.id, jobId))
}

async function failJob(jobId: string, errorMessage: string) {
  'use step'

  const { db } = await import('@/db')
  const { jobsTable } = await import('@/db/schema/jobs')
  const { eq } = await import('drizzle-orm')

  await db
    .update(jobsTable)
    .set({ status: 'failed', errorMessage, completedAt: new Date() })
    .where(eq(jobsTable.id, jobId))
}

Mitigations we've already applied

  1. Paginated ID fetching — load 100 item IDs at a time instead of all 24k upfront, keeping each getItemsPage step return value small
  2. Single step per item — merged what was previously 2 steps (process + write progress) into one, halving the replay log entries
  3. Value semantics — pass counts as a parameter and return a new object, rather than mutating a closure variable (which doesn't survive replay anyway)

These helped but didn't eliminate the linear growth.

Questions

  1. Are we fundamentally misusing the workflows? Or there is an unintended memory leak into the log which degrades performance? Sending a batch of documents is potentially an option but we always want to start with an atomic flow and eventually paginate/batch etc...

  2. Is child workflow composition the recommended pattern for large item counts? e.g., spawn a child workflow per page of 100 items via start(), so each child's replay log stays bounded. The composition docs mention this but don't address the performance motivation.

  3. Is there a way to "checkpoint" or compact the event log mid-workflow, so completed steps don't need to be replayed?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions