Queues & Processing Pipelines β
For the general philosophy behind queues, see Queues & Async. This page covers the specific pipelines in the backend.
The backend uses BullMQ backed by Redis for all asynchronous processing. Each queue has dedicated processors that can be scaled independently.
Queue Overview β
Chat & Notes Pipeline β
This is the main processing path for both chat messages and map notes.
Stage 1: Message Receive β
Queue: chat.user-message.receive
Triggered when a user sends a message in chat. The processor:
- Creates or updates the conversation's active Draft
- Converts the message to a
DraftMessage - Detects GPS coordinates in the message text (GPT-4o-mini)
- Extracts GPS from attachment EXIF metadata
- Enqueues attachments for analysis (Stage 2)
Stage 2: Attachment Processing β
Queue: chat.attachments.process
Lightweight first pass on attachments:
- Extracts text content (OCR for images, text extraction for PDFs)
- Stores extracted text as attachment chunks
- Enqueues for deep analysis (Stage 3)
Stage 3: Attachment Analysis + Parsing β
Queue: chat.attachments.analyze
The heavy-lifting stage β also used directly by map notes (createFromMapNote()):
- Runs deep attachment analysis (OpenAI Vision, Reducto for documents)
- Calls
DraftParseUtil.regenerateParseResults():- Assembles all content (messages + transcript + locations + attachment chunks)
- Sends to
ParseProcessService.parseTranscript()for LLM extraction
- Saves ParseResults to the database
- Updates draft:
status β 'stored', setsnameandtranscriptProcessed - Emits
draft_updatedvia WebSocket
Transcription Pipeline β
Handles audio-to-text conversion for voice recordings.
Three transcription models run in parallel for accuracy:
- GPT-4o β highest quality, multi-language
- Whisper-1 β OpenAI's dedicated speech model
- Qwen3 β alternative provider
Results are deduplicated by SHA256 hash of the transcription text before saving.
WebSocket Event β
// Event: 'transcription_completed'
// Room: 'user:<userId>'
{
callLogId: string;
status: 'completed' | 'ai-processed' | 'ai-processing-failed' | 'error';
transcript?: string;
error?: string;
}Parse Processing Pipeline β
The AI extraction pipeline is orchestrated by ParseProcessService and chains multiple services:
| Service | Responsibility |
|---|---|
| TranscriptPreprocessingService | Clean and normalize text |
| IntentDetectionService | LLM-based intent classification |
| ParseService | LLM-based structured data extraction (OpenAI GPT-4o) |
| WorkspaceValidationService | Validate extracted fields against workspace schema |
| ReferenceResolutionService | Resolve field names β ObjectIds, fuzzy matching |
| ReferenceValidationService | Confirm resolved references exist in the database |
| CallLogManagementService | Link call log IDs to parse results |
Scheduled Jobs β
Company-scoped scheduled tasks run via BullMQ repeatable jobs:
| Queue | Purpose |
|---|---|
company-scheduler.make-calls | Trigger scheduled outbound calls |
company-scheduler.sync-config | Sync company configuration |
company-scheduler.manager-reports | Generate periodic manager reports |
Error Handling β
- Each queue step retries independently β a transient OpenAI timeout retries only that step, not the entire pipeline
- Failed jobs are tracked with BullMQ's built-in retry mechanism
- After exhausting retries, drafts are marked with
status: 'error' - Mobile app's offline queue has its own retry logic (exponential backoff, max 5 retries)
Related β
- Queues & Async (architecture) β the design philosophy
- Scheduling β repeatable jobs with Redis locking
- Drafts (Notes) β the main consumer of these pipelines