OpenAI Assistants API & Real Project Patterns
OpenAI Assistants API (Threads)
The Assistants API is a higher-level alternative to Chat Completions. Instead of you managing message history, OpenAI stores it server-side in a Thread. You add messages to the thread, create a Run to process them, then poll until the run completes.
| Use Assistants when... | Use Chat Completions when... |
|---|---|
| You want OpenAI to manage conversation history | You need full control over the context window |
| You need file attachments or Code Interpreter | You need to stream tokens in real-time |
| You've configured a persistent Assistant via the dashboard | You need low latency |
Setup
Create an Assistant once in the OpenAI dashboard and save its ID:
OPENAI_ASSISTANT_ID=asst_...
import OpenAI from 'openai';
import { env } from './env.js';
export const openai = env.OPENAI_API_KEY
? new OpenAI({ apiKey: env.OPENAI_API_KEY })
: null;
export const ASSISTANT_ID = env.OPENAI_ASSISTANT_ID ?? '';
The null guard lets the app start without an API key and return 503 when accessed, rather than crashing at boot.
Thread execution with polling
import { openai, ASSISTANT_ID } from '../config/openai.js';
import { AppError } from '../interfaces/base.js';
async function executeAssistantRun(
threadId: string,
message: string,
skipMessageCreate = false,
): Promise<{ text: string; tokensUsed: number | null }> {
if (!openai) throw new AppError(503, 'OpenAI not configured');
if (!skipMessageCreate) {
await openai.beta.threads.messages.create(threadId, { role: 'user', content: message });
}
let run = await openai.beta.threads.runs.create(threadId, { assistant_id: ASSISTANT_ID });
// Poll until complete (90 second timeout)
const startTime = Date.now();
while (run.status === 'queued' || run.status === 'in_progress') {
if (Date.now() - startTime > 90_000) throw new AppError(504, 'OpenAI run timeout');
await new Promise(resolve => setTimeout(resolve, 1200));
run = await openai.beta.threads.runs.retrieve(threadId, run.id);
}
if (run.status !== 'completed') {
throw new AppError(502, `Assistant run failed with status: ${run.status}`);
}
const msgs = await openai.beta.threads.messages.list(threadId);
const assistantMsg = msgs.data?.[0];
const text = assistantMsg?.content?.[0]?.type === 'text'
? assistantMsg.content[0].text.value
: '';
return { text, tokensUsed: run.usage?.total_tokens ?? null };
}
run.status values:
| Status | Meaning |
|---|---|
queued | Waiting for a worker |
in_progress | Assistant is generating |
completed | Done — fetch messages |
requires_action | Tool call needed |
failed | Errored — check run.last_error |
expired | Exceeded 10 minute timeout |
Async Background Jobs (202 Pattern)
For long-running assistant calls, return immediately with a job ID and let the client poll for status. This prevents HTTP timeouts.
async create(userId: string, query: string): Promise<Simulation> {
if (!openai) throw new AppError(503, 'OpenAI not configured');
const user = await this.userRepo.findById(userId);
if (!user) throw new AppError(404, 'User not found');
if (user.monthlySimCount >= user.simRateLimit) {
throw new AppError(429, 'Monthly simulation limit reached');
}
// Create DB record immediately (status: PENDING)
const sim = await this.repo.create({ userId, query: query.slice(0, 12_000) });
// Fire-and-forget — do NOT await
this.runSimulation(sim.id, query).catch(err => {
console.error('Simulation run failed:', err);
});
await this.userRepo.incrementSimCount(userId);
return sim; // return PENDING record immediately
}
private async runSimulation(simId: string, query: string): Promise<void> {
try {
await this.repo.update(simId, { status: 'RUNNING' });
const thread = await openai!.beta.threads.create();
await openai!.beta.threads.messages.create(thread.id, { role: 'user', content: query });
const answer = await executeAssistantRun(thread.id, query, true);
await this.repo.update(simId, {
status: 'COMPLETED',
threadId: thread.id,
answer: sanitizeAnswer(answer.text),
tokensUsed: answer.tokensUsed ?? 0,
});
} catch {
await this.repo.update(simId, { status: 'FAILED' });
}
}
// Returns 202 immediately with a PENDING job
router.post('/simulations', requireAuth, async (req, res, next) => {
try {
const sim = await simulationService.create(req.user!.id, req.body.query);
res.status(202).json(sim);
} catch (err) { next(err); }
});
// Client polls until status === 'COMPLETED'
router.get('/simulations/:id', requireAuth, async (req, res, next) => {
try {
res.json(await simulationService.getById(req.params.id));
} catch (err) { next(err); }
});
Keyword-Based Knowledge Base Injection
A lightweight alternative to vector search — a curated keyword→article-ID map is deterministic, zero-latency, and perfect for bounded domain vocabularies.
import { KnowledgeRepository } from '../repositories/knowledge.repository.js';
import type { KnowledgeBaseArticle } from '../entities/index.js';
const KB_KEYWORD_MAP: Record<string, string[]> = {
'threat assessment': ['srm-intro-001', 'field-security-handbook-001'],
'access control': ['field-security-handbook-001'],
'vulnerability': ['srm-intro-001', 'infosec-intro-001'],
'information security': ['infosec-intro-001'],
'encryption': ['infosec-intro-001'],
// add entries as your domain grows...
};
export class KnowledgeService {
private repo = new KnowledgeRepository();
findRelevantArticleIds(query: string): string[] {
const lower = query.toLowerCase();
const matched = new Set<string>();
for (const [keyword, ids] of Object.entries(KB_KEYWORD_MAP)) {
if (lower.includes(keyword)) ids.forEach(id => matched.add(id));
}
return Array.from(matched);
}
async getArticlesByIds(ids: string[]): Promise<KnowledgeBaseArticle[]> {
return this.repo.findByIds(ids);
}
buildKBContext(articles: KnowledgeBaseArticle[], userLevel: string): string {
if (articles.length === 0) return '';
let context = '\n\n---\nKNOWLEDGE BASE CONTEXT:\n';
context += `[User Level: ${userLevel}]\n\n`;
for (const article of articles) {
const truncated = article.text.length > 4000
? article.text.slice(0, 4000) + '\n\n[...truncated]'
: article.text;
context += `### ${article.title} (${article.category})\n`;
context += `Tags: ${article.tags.join(', ')}\n\n`;
context += truncated + '\n\n---\n';
}
context += '\nUse the above knowledge base content to inform your response. Cite specific concepts when relevant.\n';
return context;
}
}
Inject before sending to the LLM:
const relevantIds = kbService.findRelevantArticleIds(userQuery);
let enrichedQuery = userQuery;
if (relevantIds.length > 0) {
const articles = await kbService.getArticlesByIds(relevantIds);
const userLevel = user.role === 'ADMIN' ? 'advanced' : 'standard';
enrichedQuery = `${userQuery}\n${kbService.buildKBContext(articles.slice(0, 3), userLevel)}`;
}
| Approach | Best for | Trade-off |
|---|---|---|
| Keyword map | Bounded domain, fast iteration | Manual maintenance, exact-match only |
| Vector search | Open-ended queries, large corpora | Infrastructure cost, embedding latency |
Output Sanitization
LLMs don't always follow formatting instructions perfectly. Post-process output before returning it to clients.
export function sanitizeAnswer(text: string): string {
try {
const original = String(text || '');
let out = original;
// Strip section headers the model might add despite instructions
const headerRx = /^(?:\s*(?:#{1,6}\s*)?(?:\d+\.\s*)?)?(Executive Summary|Situational Synthesis|What It Means|What to Watch|Follow-?up Questions|Red Team(?:\s*[–-]?\s*Consideration)?|Interpretive Note)\s*[:\-—]?\s*$/gmi;
out = out.replace(headerRx, '');
out = out.replace(/(Executive Summary|Situational Synthesis|Interpretive Note)\s*[:\-—]?\s*/gi, '');
// Remove risk disclaimer blocks
out = out.replace(/(?:^|\n)\s*(?:>\s*)?(?:\*\*|__)?\s*Note on Risk[:\-]?[\s\S]*?(?=\n{2,}|$)/gi, '');
out = out.replace(/(?:^|\n)\s*(?:>\s*)?(?:\*\*|__)?\s*Risk disclaimer[:\-]?[\s\S]*?(?=\n{2,}|$)/gi, '');
out = out.replace(/\n{3,}/g, '\n\n').trim();
if (out.length < 40 && original.length > 0) {
return 'No detailed analysis is available at this time. Please refine your query.';
}
return out;
} catch {
return String(text || '');
}
}
Typed Error Handling
export class AppError extends Error {
constructor(public statusCode: number, message: string) {
super(message);
this.name = 'AppError';
}
}
import type { Request, Response, NextFunction } from 'express';
import { AppError } from '../interfaces/base.js';
import { env } from '../config/env.js';
export function errorHandler(err: Error, _req: Request, res: Response, _next: NextFunction): void {
if (err instanceof AppError) {
res.status(err.statusCode).json({ error: err.name, message: err.message, statusCode: err.statusCode });
return;
}
console.error('Unhandled error:', err);
res.status(500).json({
error: 'InternalServerError',
message: env.NODE_ENV === 'production' ? 'An unexpected error occurred' : err.message,
statusCode: 500,
});
}
Zod Environment Validation
npm install zod dotenv
import { z } from 'zod';
import { config } from 'dotenv';
import { resolve } from 'path';
config({ path: resolve(import.meta.dirname, '..', '..', '.env') });
const envSchema = z.object({
DATABASE_URL: z.string().url(),
JWT_SECRET: z.string().min(32),
PORT: z.coerce.number().default(3001),
NODE_ENV: z.enum(['development', 'production', 'test']).default('development'),
FRONTEND_URL: z.string().url().default('http://localhost:3000'),
OPENAI_API_KEY: z.string().startsWith('sk-').optional().or(z.literal('')),
OPENAI_ASSISTANT_ID: z.string().startsWith('asst_').optional().or(z.literal('')),
});
export type Env = z.infer<typeof envSchema>;
function loadEnv(): Env {
const result = envSchema.safeParse(process.env);
if (!result.success) {
console.error('Invalid environment variables:', result.error.flatten().fieldErrors);
process.exit(1);
}
return result.data;
}
export const env = loadEnv();
Benefits over manual checks:
- Coerces types (
PORTstring → number) - Reports all missing variables at once
- Optional values are type-safe (
string | undefined) at the call site
RBAC for LLM Routes
LLM endpoints are expensive — protect them with both authentication and authorization.
import type { Request, Response, NextFunction } from 'express';
export function requireRole(...roles: string[]) {
return (req: Request, res: Response, next: NextFunction): void => {
if (!req.user) { res.status(401).json({ error: 'Unauthorized' }); return; }
if (!roles.includes(req.user.role)) {
res.status(403).json({ error: 'Forbidden', message: `Requires role: ${roles.join(' or ')}` });
return;
}
next();
};
}
export function requireModule(moduleName: string) {
return (req: Request, res: Response, next: NextFunction): void => {
if (!req.user) { res.status(401).json({ error: 'Unauthorized' }); return; }
if (!req.user.assignedModules.includes(moduleName)) {
res.status(403).json({ error: 'Forbidden', message: `No access to module: ${moduleName}` });
return;
}
next();
};
}
import { requireAuth } from '../middleware/auth.middleware.js';
import { requireRole, requireModule } from '../middleware/rbac.js';
router.post('/simulations', requireAuth, requireModule('simulations'), SimulationController.create);
router.get('/knowledge/search', requireAuth, requireModule('knowledge-base'), KnowledgeController.search);
router.post('/knowledge/articles', requireAuth, requireRole('ADMIN'), KnowledgeController.create);
router.post('/agents/:id/run', requireAuth, requireRole('ADMIN'), AgentController.manualRun);
requireRole checks the user's global role. requireModule checks feature-level access — letting you grant a non-admin user access to simulations without promoting them to ADMIN.