Skip to content

Queues & Processing Pipelines ​

For the general philosophy behind queues, see Queues & Async. This page covers the specific pipelines in the backend.

The backend uses BullMQ backed by Redis for all asynchronous processing. Each queue has dedicated processors that can be scaled independently.

Queue Overview ​

Chat & Notes Pipeline ​

This is the main processing path for both chat messages and map notes.

Stage 1: Message Receive ​

Queue: chat.user-message.receive

Triggered when a user sends a message in chat. The processor:

  1. Creates or updates the conversation's active Draft
  2. Converts the message to a DraftMessage
  3. Detects GPS coordinates in the message text (GPT-4o-mini)
  4. Extracts GPS from attachment EXIF metadata
  5. Enqueues attachments for analysis (Stage 2)

Stage 2: Attachment Processing ​

Queue: chat.attachments.process

Lightweight first pass on attachments:

  • Extracts text content (OCR for images, text extraction for PDFs)
  • Stores extracted text as attachment chunks
  • Enqueues for deep analysis (Stage 3)

Stage 3: Attachment Analysis + Parsing ​

Queue: chat.attachments.analyze

The heavy-lifting stage β€” also used directly by map notes (createFromMapNote()):

  1. Runs deep attachment analysis (OpenAI Vision, Reducto for documents)
  2. Calls DraftParseUtil.regenerateParseResults():
    • Assembles all content (messages + transcript + locations + attachment chunks)
    • Sends to ParseProcessService.parseTranscript() for LLM extraction
  3. Saves ParseResults to the database
  4. Updates draft: status β†’ 'stored', sets name and transcriptProcessed
  5. Emits draft_updated via WebSocket

Transcription Pipeline ​

Handles audio-to-text conversion for voice recordings.

Three transcription models run in parallel for accuracy:

  • GPT-4o β€” highest quality, multi-language
  • Whisper-1 β€” OpenAI's dedicated speech model
  • Qwen3 β€” alternative provider

Results are deduplicated by SHA256 hash of the transcription text before saving.

WebSocket Event ​

typescript
// Event: 'transcription_completed'
// Room: 'user:<userId>'
{
  callLogId: string;
  status: 'completed' | 'ai-processed' | 'ai-processing-failed' | 'error';
  transcript?: string;
  error?: string;
}

Parse Processing Pipeline ​

The AI extraction pipeline is orchestrated by ParseProcessService and chains multiple services:

ServiceResponsibility
TranscriptPreprocessingServiceClean and normalize text
IntentDetectionServiceLLM-based intent classification
ParseServiceLLM-based structured data extraction (OpenAI GPT-4o)
WorkspaceValidationServiceValidate extracted fields against workspace schema
ReferenceResolutionServiceResolve field names β†’ ObjectIds, fuzzy matching
ReferenceValidationServiceConfirm resolved references exist in the database
CallLogManagementServiceLink call log IDs to parse results

Scheduled Jobs ​

Company-scoped scheduled tasks run via BullMQ repeatable jobs:

QueuePurpose
company-scheduler.make-callsTrigger scheduled outbound calls
company-scheduler.sync-configSync company configuration
company-scheduler.manager-reportsGenerate periodic manager reports

Error Handling ​

  • Each queue step retries independently β€” a transient OpenAI timeout retries only that step, not the entire pipeline
  • Failed jobs are tracked with BullMQ's built-in retry mechanism
  • After exhausting retries, drafts are marked with status: 'error'
  • Mobile app's offline queue has its own retry logic (exponential backoff, max 5 retries)