Batch from a CSV
Process a spreadsheet of inputs with idempotency, parallelism, and backoff.
import { Callout } from ‘@stainless-api/docs/components’;
The bread-and-butter pattern for production FLORA workflows: read structured input from a CSV (or DB, or API), run a Technique per row, save the outputs by row key, handle errors and retries.
What you’ll build: a script that processes markets.csv (12 rows × market_code, headline) and produces out/thumbnail_DE.png, out/thumbnail_FR.png, …
TypeScript
Section titled “TypeScript”import Flora, { APIError } from '@flora-ai/flora';import fs from 'node:fs/promises';import path from 'node:path';import { parse } from 'csv-parse/sync';
const client = new Flora({ apiKey: process.env.FLORA_API_KEY });
type Row = { market_code: string; headline: string };
async function processRow(row: Row, references: string[]): Promise<string> { // Stable, deterministic idempotency key — same row + same headline = same key const idempotencyKey = `q3-thumb-${row.market_code}-${hash(row.headline)}`;
const run = await withRetry(() => client.techniques.runs.create('thumbnail-v3', { inputs: [ { id: 'headline', type: 'text', value: row.headline }, ...references.map((url, i) => ({ id: `reference_${i}`, type: 'imageUrl' as const, value: url, })), ], mode: 'async', idempotency_key: idempotencyKey, }) );
// Poll until done let status = run.status; let result = run; while (status === 'pending' || status === 'running') { await sleep(2000); result = await client.techniques.runs.retrieve(run.runId, { techniqueId: 'thumbnail-v3' }); status = result.status; } if (status === 'failed') throw new Error(`${row.market_code}: ${result.errorMessage}`);
return result.outputs[0].url;}
async function downloadTo(url: string, dest: string) { const res = await fetch(url); const buf = Buffer.from(await res.arrayBuffer()); await fs.writeFile(dest, buf);}
async function withRetry<T>(fn: () => Promise<T>, max = 5): Promise<T> { let lastErr: unknown; for (let i = 0; i < max; i++) { try { return await fn(); } catch (err) { lastErr = err; if (err instanceof APIError && [400, 401, 403, 404, 422].includes(err.status)) { throw err; // permanent — don't retry } const delay = Math.min(1000 * 2 ** i, 30_000); await sleep(delay); } } throw lastErr;}
async function parallelMap<T, R>(items: T[], concurrency: number, fn: (item: T) => Promise<R>): Promise<R[]> { const results: R[] = new Array(items.length); let next = 0; await Promise.all( Array.from({ length: concurrency }, async () => { while (true) { const i = next++; if (i >= items.length) return; results[i] = await fn(items[i]); } }) ); return results;}
function hash(s: string): string { // Stable, dependency-free hash. Use crypto.createHash for stronger guarantees. let h = 0; for (const c of s) h = ((h << 5) - h + c.charCodeAt(0)) | 0; return Math.abs(h).toString(16);}
function sleep(ms: number) { return new Promise((r) => setTimeout(r, ms));}
// === Main ===async function main() { const csv = await fs.readFile('./markets.csv', 'utf8'); const rows: Row[] = parse(csv, { columns: true, skip_empty_lines: true });
const references = [ 'https://ik.imagekit.io/flora/...approved_still_4.png', 'https://ik.imagekit.io/flora/...approved_still_7.png', ];
await fs.mkdir('out', { recursive: true });
const urls = await parallelMap(rows, 4, (row) => processRow(row, references));
await Promise.all( urls.map((url, i) => downloadTo(url, path.join('out', `thumbnail_${rows[i].market_code}.png`))) );
console.log(`Done. Wrote ${urls.length} files to ./out/`);}
main().catch((err) => { console.error(err); process.exit(1);});func processRow(ctx context.Context, c *flora.Client, row Row, refs []string) (string, error) { key := fmt.Sprintf("q3-thumb-%s-%s", row.MarketCode, hash(row.Headline))
inputs := []flora.TechniqueRunNewParamsInput{ {ID: "headline", Type: "text", Value: row.Headline}, } for i, url := range refs { inputs = append(inputs, flora.TechniqueRunNewParamsInput{ ID: fmt.Sprintf("reference_%d", i), Type: "imageUrl", Value: url, }) }
run, err := withRetry(func() (*flora.TechniqueRunNewResponse, error) { return c.Techniques.Runs.New(ctx, "thumbnail-v3", flora.TechniqueRunNewParams{ Inputs: inputs, Mode: flora.TechniqueRunNewParamsModeAsync, IdempotencyKey: param.Field(key), }) }) if err != nil { return "", err }
for { result, err := c.Techniques.Runs.Get(ctx, run.RunID, flora.TechniqueRunGetParams{ TechniqueID: "thumbnail-v3", }) if err != nil { return "", err } switch result.Status { case "completed": return result.Outputs[0].URL, nil case "failed": return "", fmt.Errorf("%s: %s", row.MarketCode, result.ErrorMessage) } time.Sleep(2 * time.Second) }}
// Bounded parallelism via a worker pool. Standard Go pattern — see x/sync/errgroup.Resumability
Section titled “Resumability”Because every row uses a deterministic idempotency key, the script is safe to re-run. Already-completed rows hit the idempotency cache (within 24 hours) and return cached results without re-charging credits. Rows that failed retry cleanly.
Even safer: write a small checkpoint file. After each successful row, append the market_code to out/completed.txt. On startup, skip rows already in that file.
Cost pre-check
Section titled “Cost pre-check”Before kicking off:
const tech = await client.techniques.retrieve('thumbnail-v3');const total = tech.runCost * rows.length;console.log(`Estimated cost: ${total} credits across ${rows.length} rows`);If the workspace balance is too low, fail fast — no point starting a batch that will hit 402 insufficient_credits halfway through.
Tuning parallelism
Section titled “Tuning parallelism”- Start with
concurrency: 4. Most workspaces tolerate this without throttling. - If you see
429 rate_limited, drop to 2 and let the backoff inwithRetryhandle the rest. - For very long runs (motion, video), keep concurrency low — you’re rate-limited on concurrent runs, not just requests.
Failure handling per row
Section titled “Failure handling per row”The script above throws on the first failed row. For batch jobs you want to keep going and report failures at the end:
const results = await Promise.allSettled(rows.map(row => processRow(row, references)));
const failures = results .map((r, i) => (r.status === 'rejected' ? { row: rows[i], reason: r.reason } : null)) .filter(Boolean);
if (failures.length) { console.error(`${failures.length} rows failed`); await fs.writeFile('out/failures.json', JSON.stringify(failures, null, 2));}Schedule it
Section titled “Schedule it”Wrap the script in a cron job or CI workflow. Because the idempotency keys are deterministic, a scheduled rerun on the same CSV is a no-op. Edits to the CSV produce new keys for the changed rows, which get re-run.
Related
Section titled “Related”- Idempotency — full semantics of
idempotency_key. - Errors — what to retry vs surface.
- Generate a grid — the single-call building block.
- Iterate on outputs — pattern this batch is built on.