Queues in CluedIn
On this page
- Introduction
- Source queues
- Data pipeline queues
- Stream queues
- Distributed jobs subsystem queues
- Robust callback subsystem queue
- Remote events
In this article, you will learn about the queue-based service bus architecture used by CluedIn and find a complete list of queues involved in its operations.
Introduction
Queues are central to how CluedIn operates. Each operation or record in CluedIn is represented as a message in a queue. This design means that to truly understand how CluedIn functions under the hood, you need to understand how these queues behave.
Some of these queues are visible directly in the CluedIn UI—for example, on the Monitoring tab of a data set or within the Processing Pipeline section in Engine Room. For customers running CluedIn in a self-managed environment, a great deal of insight can be gained simply by observing the state of these queues.
You might ask questions like:
-
How many messages are currently in a queue?
-
How many consumers are connected to a given queue?
-
Are there any dead-letter messages?
Why CluedIn uses a service bus instead of an event log?
A common question is: why did CluedIn adopt a queue-based service bus architecture (e.g., RabbitMQ) instead of a log-based event-driven system (e.g., Kafka)? The answer lies in a combination of operational simplicity, data safety, and reliability.
While both approaches are valid and widely used in modern architectures, event-driven platforms like Kafka come with added complexity—especially in areas like infrastructure (e.g., managing Zookeeper), monitoring, and tracing data lineage. They also follow a different philosophy: Kafka acts more like a distributed commit log where messages are retained for a period and can be replayed, while RabbitMQ uses a work queue model where messages are consumed and removed.
CluedIn made an early architectural decision to use a service bus and an actor model. This approach provided a few key advantages:
-
Easier traceability – it’s more straightforward to track “who did what and when” in a service bus pattern.
-
Durability – messages in CluedIn are stored on disk, so even if a service crashes or is restarted (even abruptly), the messages remain intact, virtually eliminating the risk of data loss.
-
Snapshotting and recovery – the queue-based model makes it easier to pause, inspect, or recover processing flows.
-
Operational resilience – RabbitMQ offers robust support for acknowledgment, retries, and dead-lettering, which are vital for maintaining a healthy pipeline under load.
After years of investment in this architecture, we’ve reached a point where CluedIn’s pipeline is stable, performant, and resilient, even under heavy record volumes. This is why we don’t anticipate major architectural changes to this part of the system in the near future.
Source queues
The source queues are used for data ingestion in CluedIn.
General ingestion queues
The following queues are not linked to a specific data source. They are cross-source queues, primarily used for service-to-service communication and to enable asynchronous execution of long-running jobs.
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Ingestion Data Set | ingestion-data-set | General-purpose queue used to ingest records from all source types (files, endpoints, databases, and so on). | If a failure occurs, the data source is set to an error state, and a message is displayed in the UI. |
Commit Data Set | commit-data-set | General-purpose queue used to initiate processing of a source of any type. | In case of an error, the message is retried after 30 seconds; errors are logged in the logs. |
File Recovery Queue | file-recovery | Used when a file upload fails during processing. Upon pod restart, a recovery message is sent to retry parsing/loading the file. | If recovery fails, the data source is set to an error state, and the error is visible in the UI. |
Cloning Data Set Queue | cloning-data-set | Instructs the system to set a source in edit mode and clone the data; can also revert the data set to its original state. | In case of an error, it is logged in the data set logs and chunk loading may fail. |
Source Operation Queue | operation-data-set | Stores all jobs related to operations on a data set, especially actions performed in edit mode. | If a failure occurs, the error is logged. |
Source Anomaly Detection | profiling-anomaly-detection-job | Verifies if ingestions are in a correct state, attempts self-healing, and triggers alerts if needed. | If a failure occurs, the error is logged, and the message is moved to the dead letter queue after 5 attempts. |
Source Anomaly Detection Dead Letter | profiling-anomaly-detection-job_dead_letter | Stores messages that failed after 5 retry attempts in the anomaly detection queue. | If moved back manually to profiling-anomaly-detection-job , the job will be retried. |
Source Key Metrics Queue | profiling-ingestion-metrics | Scheduled job that collects key ingestion metrics. | Retries up to 5 times; if a failure occurs, the message is stored in the dedicated dead letter queue; the dead letter queue also serves as a log for critical failures. Messages can be manually retried. |
Source Key Metrics Dead Letter Queue | profiling-ingestion-metrics_dead_letter | Stores messages that failed after 5 retry attempts in the key metrics queue. | Moving the message back to profiling-ingestion-metrics will retry the job. |
Duplicate Check | profiling-duplicate-check-job | Scheduled job to detect duplicate records based on identifiers. | After 5 retries, the message is moved to the profiling-duplicate-check-job_dead_letter queue. |
Duplicate Check Dead-Letter Queue | profiling-duplicate-check-job_dead_letter | Backup queue for messages that failed duplicate checks. | Messages can be manually moved back to profiling-duplicate-check-job to retry. |
Exporter Queue | exporter | Sends export information to consumers. | If a failure occurs, the message is rejected, and an error is logged. |
Dedicated ingestion queues
The following queues are dedicated to individual data sources. Depending on the source type and the actions performed, a separate set of queues may be created for each source.
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Source Mapping Queue | clue_datasource_submit_[datasetId] | Receives JSON data and applies mapping to transform it into a clue. | If mapping fails, an error log is added to the data set logs in the UI. |
Source Processing Queue | clue_datasource_process_[datasetId] | Sends clues to the data processing pipelines. | In case of an error, logs are generated, and a Retry button appears on the Process tab of the data set. |
Source Error Processing Queue | clue_datasource_process_[datasetId]_error | Stores clues that failed during processing. | Temporarily stores clues that could not be successfully sent. |
Manual Data Entry Mapping Queue | clue_manual_entry_submit_[manualProjectId] | Receives user-entered data and transforms it into clues. | If mapping fails, an error log is added to the logs in the UI. |
Manual Data Entry Processing Queue | clue_manual_entry_process_[manualProjectId] | Sends manually entered clues to the data pipelines. | In case of an error, logs are generated and a Retry button appears on the Process tab of the data set. |
Manual Data Entry Error Processing Queue | clue_manual_entry_process_[manualProjectId]_error | Stores clues that failed during manual data entry processing. | Temporarily stores clues that could not be successfully sent. |
Quarantine Mapping Queue | source_mapping_quarantine_[sourceId] | Reverse maps clues for approval validation before resending them to the quarantine processing queue. | In case of an error, a log message is shown in the UI. |
Quarantine Processing Queue | source_process_quarantine_[sourceId] | Sends approved or quarantined clues to the data processing pipeline. | In case of an error, logs are generated, and a Retry button appears on the Process tab of the data set. |
Quarantine Error Processing Queue | source_process_quarantine_[sourceId]_error | Stores clues that failed during quarantine processing. | Temporarily stores clues that could not be successfully sent. |
Source Failed Loading Queue | dataset_failed_elastic_search_bulk_[datasetId] | Stores failed chunks when saving records to Elasticsearch. | If a failure occurs, a Retry button is shown in the UI. |
Manual Data Entry Failed Loading Queue | manual-data-entry-session_failed_elastic_search_bulk_[manualProjectId] | Stores failed chunks when saving manually entered records to Elasticsearch. | If a failure occurs, a Retry button is shown in the UI. |
Data pipeline queues
Clue processing queues
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Default Clue Processing | CluedIn.Core.Messages.Processing.IProcessingCommand , CluedIn.Core_CluedIn_Clues | General clue data ingestion. | Default processing error handling |
High Priority Clue Processing | CluedIn.Core.Messages.Processing.ProcessPrioritizedClueCommand , CluedIn.Core_CluedIn | Prioritized clue ingestion. | Default processing error handling |
Big Clue Processing | CluedIn.Core.Messages.Processing.ProcessBigClueCommand , CluedIn.Core_CluedIn | Clues with raw byte size larger than 500 KB threshold to be executed with less concurrency. | Default processing error handling |
Low Priority Clue Processing | CluedIn.Core.Messages.Processing.ProcessLowPriorityClueCommand , CluedIn.Core_CluedIn | Clue data ingestion by low priority crawling jobs. | Default processing error handling |
Edge processing queue
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Edge Processing Queue | CluedIn.Core.Messages.Processing.ProcessEdgesCommand , CluedIn.Core_CluedIn | Resolves edges and creates shadow entities for missing reference points. | Default processing error handling |
Parent Processing Queue | CluedIn.Core.Messages.Processing.ParentsProcessingCommand , CluedIn.Core_CluedIn_ParentIds | Old queue, disabled by default. | Default processing error handling |
Metrics processing queue
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Entity Metrics Queue | CluedIn.Core.Messages.Processing.Metrics.ProcessEntityMetricsCommand , CluedIn.Core_CluedIn | Calculates entity level metric values. | Default processing error handling |
Global Metrics Queue | CluedIn.Core.Messages.Processing.Metrics.ProcessGlobalMetricsCommand , CluedIn.Core_CluedIn | Scheduled job to calculate global dimension metric values. | Default processing error handling |
Archive Metrics Queue | CluedIn.Core.Messages.Processing.Metrics.ArchiveMetricsValuesCommand , CluedIn.Core_CluedIn | Creates history of entity level metric values. | Default processing error handling |
Commands queue
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Deduplicate Entity | CluedIn.Core.Messages.Processing.DeduplicateEntityCommand , CluedIn.Core_CluedIn | Deduplicates entity by overlapping entity codes or any enabled fuzzy matchers. | Default processing error handling |
Anonymise | CluedIn.Core.Messages.Processing.AnonymiseDataCommand , CluedIn.Core_CluedIn | Anonymizes entity data processing. | Default processing error handling |
Mesh Commands | CluedIn.Core.Messages.Processing.MeshDataCommand , CluedIn.Core_CluedIn | Mesh command execution from mesh center. | Default processing error handling |
Delete Entity | CluedIn.Core.Messages.Processing.DeleteEntityCommand , CluedIn.Core_CluedIn | Deletion of entities. | Default processing error handling |
Merge Entity | CluedIn.Core.Messages.Processing.MergeEntitiesCommand , CluedIn.Core_CluedIn | Merging of multiple entities and single entity, either by automatic entity deduplication or manual merging. | Default processing error handling |
Deduplicate | CluedIn.Core.Messages.Processing.DeduplicateCommand , CluedIn.Core_CluedIn | Used for background admin job to execute entity deduplication for all entities or by a specific business domain. | Default processing error handling |
Remove From Processing | CluedIn.Core.Messages.Processing.RemoveFromProcessingDataCommand , CluedIn.Core_CluedIn | Mesh command to mark entities as removed from processing. | Default processing error handling |
Split Entity | CluedIn.Core.Messages.Processing.SplitEntityCommand , CluedIn.Core_CluedIn | Re-evaluates entity code overlap and fuzzy matching overlap of records inside an entity and splits entity into several golden records if required. | Default processing error handling |
Ensure No Entity Code Overlap | CluedIn.Core.Messages.Processing.EnsureNoDuplicatedEntitiesForEntityCodesCommand , CluedIn.Core_CluedIn | Variant of Deduplicate Entity triggered by clue ingestion to validate if deduplication is needed. | Default processing error handling |
Send Mail | CluedIn.Core.Messages.SendMailCommand , CluedIn.Core_CluedIn | Used to send emails to users. | Default processing error handling |
Enricher queue
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Entity Enrichment Queue | CluedIn.ExternalSearch.ExternalSearchCommand , CluedIn.ExternalSearch_CluedIn | Performs enrichment for an entity. | Default processing error handling |
Public Enricher Queue | CluedIn.PublicApi.PublicApiEnrichmentCommand , CluedIn.PublicApi_CluedIn | Performs async enrichment of data submitted via the public API endpoint to enrich arbitrary data that is not ingested into CluedIn. | Default processing error handling |
Processing retry handling
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Dedicated Retry Queue | CluedIn.Core.Messages.Processing.RetryCommand , CluedIn.Core_CluedIn | Used when in-queue retry exceeds threshold (default 2) retry count. Messages will be serialized in fashion to avoid concurrency of processing the same subject at the same time. | In-queue retry until max retry count is reached, then message is sent to dead letter. |
Processing error handling queues
Default processing error handling goes through the following steps:
-
In-place “fast” retry with exponential backoff (default 3 attempts).
-
In-queue retry – commands erroring due to transient error will be re-queued to the same queue (default 10 times for general transient errors, default 30 times for database concurrency errors).
-
Dedicated retry queue – once in-queue retry attempts have reached threshold (default after 2 retry attempts), the message is routed to dedicated retry queue that will be serialized in a fashion to avoid concurrent processing of the same records.
-
When all retries have been exhausted, the message is sent to the dead-letter queue.
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Processing Dead-Letter Queue | DeadLetterCommands | Contains processing dead-letter messages when all reties have been exhausted. | Messages can be retried by moving them to CluedIn.Core.Messages.Processing.DeadLetterCommand, CluedIn.Core_CluedIn_Retry . |
Recoverable Dead-Letter Queue | CluedIn_DeadLetterMessages_Recoverable | Contains messages that were dead-lettered due to identified transient error. | Messages can be retried by moving them to CluedIn_DeadLetterMessages_Retry . |
Non-Recoverable Dead-Letter Queue | CluedIn_DeadLetterMessages_NonRecoverable | Contains messages that was dead-lettered due to non-transient error. | Messages can be retried by moving them to CluedIn_DeadLetterMessages_Retry . |
Retry Dead-Letter Processing Command Queue | CluedIn.Core.Messages.Processing.DeadLetterCommand , CluedIn.Core_CluedIn_Retry | Re-queues processing commands to be retried. | N/A |
Retry Dead-Letter Command Qqueue | CluedIn_DeadLetterMessages_Retry | Re-queues generic commands to be retried. | N/A |
Stream queues
Name | Queue name | Purpose | Error handling |
---|---|---|---|
StreamControlCommands Queue | StreamControlCommands | Used for stream maintenance and state change. | Default stream error handling |
Stream Subscribe Queue | StreamSubscriber_[streamId] | Receives messages containing entities to be handled by the stream. | Default stream error handling |
Stream Events Queue | StreamEvents_[machinename] | Used for managing control events such as start, stop and pause. | Default stream error handling |
Stream error handling queues
Stream error handling is similar to processing error handling and goes through the following steps:
-
In-place “fast” retry with exponential backoff (default 3 attempts).
-
In-queue retry – commands erroring due to transient error will be re-queued to the same queue (default 10 times for general transient errors, default 30 times for database concurrency errors).
-
When all retries have been exhausted, the message is sent to the dead-letter queue.
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Stream Dead-Letter Queue | StreamSubscriber-deadLetter-[streamid] | Dead-etter queue for stream. | Messages can be moved to retry queue or admin Web API can be used to retry messages. |
Distributed jobs subsystem queues
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Distributed Job Control Queue | CluedIn.DistributedJobs.Commands.IDistributedJobsControlCommand , CluedIn.DistributedJobs_DistributedJobQueueManager | Used by distributed jobs subsystem to handle job lifecycle events, | In-place retry; otherwise, always use non-recoverable dead-letter queue (except for completion callback commands). |
Distributed Job Queue | DistributedJob_[job id] | Created per job and removed when job is done. Used to store and process job-specific work item commands. | Perform in-place retry, then silently drop command if cannot process after all tries. |
Robust callback subsystem queue
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Robust Callbacks Queue | CluedIn.Processing.RobustCallback.RobustCallbackCommand , CluedIn.Processing_CluedIn | Used to store and process robust callbacks. | Default robust messaging error handling. |
Remote events
Name | Queue name | Purpose | Error handling |
---|---|---|---|
Remove Events Queue | RemoteEvents_[machinename] | Synchronizes events between machines. | N/A |