Skip to content
FLORA DocsGo to app

Batch from a CSV

Process a spreadsheet of inputs with idempotency, parallelism, and backoff.

import { Callout } from ‘@stainless-api/docs/components’;

The bread-and-butter pattern for production FLORA workflows: read structured input from a CSV (or DB, or API), run a Technique per row, save the outputs by row key, handle errors and retries.

What you’ll build: a script that processes markets.csv (12 rows × market_code, headline) and produces out/thumbnail_DE.png, out/thumbnail_FR.png, …

This pattern uses three production-grade features together: idempotency keys (so retries don't double-charge), bounded parallelism (so you don't get rate-limited), and exponential backoff (for transient errors). Use all three.
import Flora, { APIError } from '@flora-ai/flora';
import fs from 'node:fs/promises';
import path from 'node:path';
import { parse } from 'csv-parse/sync';
const client = new Flora({ apiKey: process.env.FLORA_API_KEY });
type Row = { market_code: string; headline: string };
async function processRow(row: Row, references: string[]): Promise<string> {
// Stable, deterministic idempotency key — same row + same headline = same key
const idempotencyKey = `q3-thumb-${row.market_code}-${hash(row.headline)}`;
const run = await withRetry(() =>
client.techniques.runs.create('thumbnail-v3', {
inputs: [
{ id: 'headline', type: 'text', value: row.headline },
...references.map((url, i) => ({
id: `reference_${i}`, type: 'imageUrl' as const, value: url,
})),
],
mode: 'async',
idempotency_key: idempotencyKey,
})
);
// Poll until done
let status = run.status;
let result = run;
while (status === 'pending' || status === 'running') {
await sleep(2000);
result = await client.techniques.runs.retrieve(run.runId, { techniqueId: 'thumbnail-v3' });
status = result.status;
}
if (status === 'failed') throw new Error(`${row.market_code}: ${result.errorMessage}`);
return result.outputs[0].url;
}
async function downloadTo(url: string, dest: string) {
const res = await fetch(url);
const buf = Buffer.from(await res.arrayBuffer());
await fs.writeFile(dest, buf);
}
async function withRetry<T>(fn: () => Promise<T>, max = 5): Promise<T> {
let lastErr: unknown;
for (let i = 0; i < max; i++) {
try {
return await fn();
} catch (err) {
lastErr = err;
if (err instanceof APIError && [400, 401, 403, 404, 422].includes(err.status)) {
throw err; // permanent — don't retry
}
const delay = Math.min(1000 * 2 ** i, 30_000);
await sleep(delay);
}
}
throw lastErr;
}
async function parallelMap<T, R>(items: T[], concurrency: number, fn: (item: T) => Promise<R>): Promise<R[]> {
const results: R[] = new Array(items.length);
let next = 0;
await Promise.all(
Array.from({ length: concurrency }, async () => {
while (true) {
const i = next++;
if (i >= items.length) return;
results[i] = await fn(items[i]);
}
})
);
return results;
}
function hash(s: string): string {
// Stable, dependency-free hash. Use crypto.createHash for stronger guarantees.
let h = 0;
for (const c of s) h = ((h << 5) - h + c.charCodeAt(0)) | 0;
return Math.abs(h).toString(16);
}
function sleep(ms: number) {
return new Promise((r) => setTimeout(r, ms));
}
// === Main ===
async function main() {
const csv = await fs.readFile('./markets.csv', 'utf8');
const rows: Row[] = parse(csv, { columns: true, skip_empty_lines: true });
const references = [
'https://ik.imagekit.io/flora/...approved_still_4.png',
'https://ik.imagekit.io/flora/...approved_still_7.png',
];
await fs.mkdir('out', { recursive: true });
const urls = await parallelMap(rows, 4, (row) => processRow(row, references));
await Promise.all(
urls.map((url, i) => downloadTo(url, path.join('out', `thumbnail_${rows[i].market_code}.png`)))
);
console.log(`Done. Wrote ${urls.length} files to ./out/`);
}
main().catch((err) => {
console.error(err);
process.exit(1);
});
func processRow(ctx context.Context, c *flora.Client, row Row, refs []string) (string, error) {
key := fmt.Sprintf("q3-thumb-%s-%s", row.MarketCode, hash(row.Headline))
inputs := []flora.TechniqueRunNewParamsInput{
{ID: "headline", Type: "text", Value: row.Headline},
}
for i, url := range refs {
inputs = append(inputs, flora.TechniqueRunNewParamsInput{
ID: fmt.Sprintf("reference_%d", i), Type: "imageUrl", Value: url,
})
}
run, err := withRetry(func() (*flora.TechniqueRunNewResponse, error) {
return c.Techniques.Runs.New(ctx, "thumbnail-v3", flora.TechniqueRunNewParams{
Inputs: inputs,
Mode: flora.TechniqueRunNewParamsModeAsync,
IdempotencyKey: param.Field(key),
})
})
if err != nil {
return "", err
}
for {
result, err := c.Techniques.Runs.Get(ctx, run.RunID, flora.TechniqueRunGetParams{
TechniqueID: "thumbnail-v3",
})
if err != nil {
return "", err
}
switch result.Status {
case "completed":
return result.Outputs[0].URL, nil
case "failed":
return "", fmt.Errorf("%s: %s", row.MarketCode, result.ErrorMessage)
}
time.Sleep(2 * time.Second)
}
}
// Bounded parallelism via a worker pool. Standard Go pattern — see x/sync/errgroup.

Because every row uses a deterministic idempotency key, the script is safe to re-run. Already-completed rows hit the idempotency cache (within 24 hours) and return cached results without re-charging credits. Rows that failed retry cleanly.

Even safer: write a small checkpoint file. After each successful row, append the market_code to out/completed.txt. On startup, skip rows already in that file.

Before kicking off:

const tech = await client.techniques.retrieve('thumbnail-v3');
const total = tech.runCost * rows.length;
console.log(`Estimated cost: ${total} credits across ${rows.length} rows`);

If the workspace balance is too low, fail fast — no point starting a batch that will hit 402 insufficient_credits halfway through.

  • Start with concurrency: 4. Most workspaces tolerate this without throttling.
  • If you see 429 rate_limited, drop to 2 and let the backoff in withRetry handle the rest.
  • For very long runs (motion, video), keep concurrency low — you’re rate-limited on concurrent runs, not just requests.

The script above throws on the first failed row. For batch jobs you want to keep going and report failures at the end:

const results = await Promise.allSettled(rows.map(row => processRow(row, references)));
const failures = results
.map((r, i) => (r.status === 'rejected' ? { row: rows[i], reason: r.reason } : null))
.filter(Boolean);
if (failures.length) {
console.error(`${failures.length} rows failed`);
await fs.writeFile('out/failures.json', JSON.stringify(failures, null, 2));
}

Wrap the script in a cron job or CI workflow. Because the idempotency keys are deterministic, a scheduled rerun on the same CSV is a no-op. Edits to the CSV produce new keys for the changed rows, which get re-run.