Skip to content

Ingestion Pipeline: Idempotent Data Ingestion

Status: Final Version: 2.0


Table of Contents

  1. Purpose
  2. Pipeline Overview
  3. UUID-Based Deduplication
  4. Idempotency Implementation
  5. Pipeline Stages
  6. Error Recovery Procedures
  7. Retry Mechanisms
  8. Message Queue Architecture
  9. Dead Letter Queue Management
  10. Monitoring and Alerting
  11. Performance Considerations
  12. Security Requirements
  13. Testing Strategy
  14. Implementation Guide

Purpose

The ingestion pipeline is a mission-critical backend system that handles metric submissions from mobile collectors with:

  • Idempotency Guarantees: Same submission UUID never creates duplicates
  • UUID-Based Deduplication: Client-generated UUIDs prevent race conditions
  • Automatic Retry: Failed jobs retry with exponential backoff
  • Error Recovery: Dead letter queue captures permanent failures
  • Tenant Isolation: Multi-tenant architecture with strict data isolation
  • Audit Trail: Complete chain-of-custody for compliance

This pipeline ensures data integrity for ESG regulatory reporting where duplicate or lost submissions could result in compliance violations.


Pipeline Overview

High-Level Architecture

┌─────────────┐       ┌──────────────┐       ┌──────────────┐
│   Mobile    │──────▶│   API        │──────▶│  Database    │
│   Collector │◀──────│   Gateway    │◀──────│  (Primary)   │
└─────────────┘       └──────────────┘       └──────────────┘
                             │                       │
                             ▼                       ▼
                      ┌──────────────┐       ┌──────────────┐
                      │ Idempotency  │       │   RabbitMQ   │
                      │    Cache     │       │    Queue     │
                      │   (Redis)    │       └──────────────┘
                      └──────────────┘              │
                                            ┌──────────────┐
                                            │  Validation  │
                                            │    Worker    │
                                            └──────────────┘
                                            ┌──────────────┐
                                            │  Processing  │
                                            │    Worker    │
                                            └──────────────┘

Data Flow Stages

1. HTTP Request Received
2. Authentication & Authorization (JWT validation)
3. Tenant Isolation Check (tenant_id from token)
4. Idempotency Check (Redis cache lookup)
         ↓ (cache miss)
5. Database Transaction Begin
6. UUID Deduplication Check (SELECT ... FOR UPDATE)
         ↓ (not exists)
7. Insert Raw Submission (state: RECEIVED)
8. Queue Validation Job (RabbitMQ)
9. Transaction Commit
10. Return 201 Created Response
11. ASYNC: Validation Job Executes
12. ASYNC: Processing Job Executes
13. ASYNC: Notification to Mobile App

If Duplicate Detected at Step 4 or 6:

Idempotency Cache Hit (Step 4) → Return 200 OK with existing submission
         OR
Database UUID Exists (Step 6) → Return 200 OK with existing submission


UUID-Based Deduplication

Core Principle

Mobile app generates UUIDs (not the server). This enables: - Idempotent retries (same UUID = same submission) - Offline-first architecture (no server roundtrip needed) - Race condition prevention (unique constraint enforcement)

UUID Generation

Mobile (Kotlin/Android):

import java.util.UUID

data class SubmissionDraft(
    val submissionUuid: UUID = UUID.randomUUID(),  // Client-generated
    val metricDefinitionId: UUID,
    val reportingPeriodId: UUID,
    val siteId: UUID,
    val value: Double,
    val unitOfMeasure: String,
    val metadata: Map<String, Any> = emptyMap()
)

// Example usage in ViewModel
fun createDraft() {
    val draft = SubmissionDraft(
        submissionUuid = UUID.randomUUID(),  // Generate once when draft created
        metricDefinitionId = selectedMetric.id,
        reportingPeriodId = selectedPeriod.id,
        siteId = selectedSite.id,
        value = 123.45,
        unitOfMeasure = "kWh"
    )

    // Save to local database with this UUID
    database.submissionDao().insert(draft)
}

CRITICAL: The UUID must be generated when the draft is created (not when submitted). This ensures the same UUID is used for all retry attempts.

Database Constraint

-- PostgreSQL migration
CREATE UNIQUE INDEX idx_metric_submissions_uuid_unique
ON metric_submissions (tenant_id, submission_uuid);

-- This prevents duplicates even if application-level checks fail
-- The database will enforce uniqueness via constraint violation

Why Composite Key (tenant_id, submission_uuid)? - Tenant isolation: Different tenants can use same UUID (extremely unlikely but possible) - Performance: Index on tenant_id is needed for all queries anyway - Security: Prevents cross-tenant UUID collision attacks

Deduplication Algorithm

// Quarkus Service (Kotlin)
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.fasterxml.jackson.databind.ObjectMapper

@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val idempotencyCache: IdempotencyCache,
    @Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
    private val auditLogService: AuditLogService,
    private val objectMapper: ObjectMapper
) {

    @Transactional
    fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
        // Step 1: Check idempotency cache (fast path)
        idempotencyCache.get(request.submissionUuid)?.let { cachedSubmission ->
            logger.info("Idempotency cache hit for UUID: ${request.submissionUuid}")
            return SubmissionResponse.fromEntity(cachedSubmission)
        }

        // Step 2: Check database with pessimistic lock (prevents race conditions)
        val existing = submissionRepository.findByTenantIdAndSubmissionUuidForUpdate(
            tenantId = tenantId,
            submissionUuid = request.submissionUuid
        )

        if (existing != null) {
            // Duplicate detected in database
            logger.info("Duplicate submission UUID detected: ${request.submissionUuid}")

            // Cache for future requests (24-hour TTL)
            idempotencyCache.put(request.submissionUuid, existing, Duration.ofHours(24))

            return SubmissionResponse.fromEntity(existing)
        }

        // Step 3: Create new submission (no duplicate exists)
        val submission = MetricSubmission(
            id = UUID.randomUUID(),  // Database primary key
            tenantId = tenantId,
            submissionUuid = request.submissionUuid,  // Client-generated UUID
            reportingPeriodId = request.reportingPeriodId,
            siteId = request.siteId,
            metricDefinitionId = request.metricDefinitionId,
            value = request.value,
            unitOfMeasure = request.unitOfMeasure,
            metadata = request.metadata,
            state = SubmissionState.RECEIVED,
            submittedByUserId = userId,
            submittedAt = Instant.now(),
            rawData = objectMapper.writeValueAsString(request)  // Store original request
        )

        // Step 4: Persist to database
        val savedSubmission = submissionRepository.persist(submission)

        // Step 5: Cache for idempotency
        idempotencyCache.put(request.submissionUuid, savedSubmission, Duration.ofHours(24))

        // Step 6: Audit log (async with CDI events)
        auditLogService.logSubmissionReceived(savedSubmission, userId)

        // Step 7: Queue validation job using Emitter (Quarkus Reactive Messaging)
        validationEmitter.send(ValidationJobMessage(submissionId = savedSubmission.id))

        logger.info("Submission ingested successfully: ${savedSubmission.id} (UUID: ${request.submissionUuid})")

        return SubmissionResponse.fromEntity(savedSubmission)
    }
}

Repository Implementation

import io.quarkus.hibernate.orm.panache.kotlin.PanacheRepositoryBase
import jakarta.enterprise.context.ApplicationScoped
import jakarta.persistence.LockModeType

@ApplicationScoped
class SubmissionRepository : PanacheRepositoryBase<MetricSubmission, UUID> {

    /**
     * Find submission by tenant and UUID with pessimistic write lock.
     * This prevents race conditions when multiple requests arrive simultaneously.
     */
    fun findByTenantIdAndSubmissionUuidForUpdate(
        tenantId: UUID,
        submissionUuid: UUID
    ): MetricSubmission? {
        return find(
            "tenantId = ?1 AND submissionUuid = ?2",
            tenantId,
            submissionUuid
        )
        .withLock(LockModeType.PESSIMISTIC_WRITE)
        .firstResult()
    }
}

Idempotency Implementation

Why Two-Level Idempotency?

  1. Redis Cache (Fast Path):
  2. O(1) lookup, ~1ms response time
  3. Handles 99% of duplicate requests (client retries)
  4. 24-hour TTL (covers typical retry window)

  5. Database Check (Slow Path):

  6. O(log n) lookup via B-tree index, ~10ms response time
  7. Handles cache misses (first request, cache eviction, cold start)
  8. Permanent record (survives cache expiry)

Idempotency Cache Service

import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.value.ValueCommands
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Duration

@ApplicationScoped
class IdempotencyCache @Inject constructor(
    private val redisDataSource: RedisDataSource,
    private val objectMapper: ObjectMapper
) {
    private val cacheTTL = Duration.ofHours(24)
    private val valueCommands: ValueCommands<String, String> = redisDataSource.value(String::class.java)

    fun get(submissionUuid: UUID): MetricSubmission? {
        val key = "idempotency:submission:$submissionUuid"
        val cached = valueCommands.get(key) ?: return null

        return try {
            objectMapper.readValue(cached, MetricSubmission::class.java)
        } catch (e: Exception) {
            logger.error("Failed to deserialize cached submission", e)
            null
        }
    }

    fun put(submissionUuid: UUID, submission: MetricSubmission, ttl: Duration = cacheTTL) {
        val key = "idempotency:submission:$submissionUuid"
        val value = objectMapper.writeValueAsString(submission)

        valueCommands.setex(key, ttl.seconds, value)
    }

    fun invalidate(submissionUuid: UUID) {
        val key = "idempotency:submission:$submissionUuid"
        valueCommands.getdel(key)
    }
}

Cache Invalidation Strategy

When to invalidate: - Submission state changes (RECEIVED → VALIDATED → PROCESSED → APPROVED) - Submission is rejected or resubmitted - Admin manually modifies submission

Why invalidate? - Ensure clients get latest state on retry - Cache contains stale state after updates

import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional

@ApplicationScoped
class SubmissionStateService @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val idempotencyCache: IdempotencyCache
) {

    @Transactional
    fun updateState(submissionId: UUID, newState: SubmissionState, reason: String?) {
        val submission = submissionRepository.findByIdOptional(submissionId)
            .orElseThrow { NotFoundException("Submission not found: $submissionId") }

        submission.state = newState
        submission.updatedAt = Instant.now()

        submissionRepository.persist(submission)

        // Invalidate cache to force clients to fetch updated state
        idempotencyCache.invalidate(submission.submissionUuid)

        logger.info("Submission ${submission.id} state updated: $newState")
    }
}

Pipeline Stages

Stage 1: HTTP Request Reception

Controller Endpoint:

import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.validation.Valid
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import io.quarkus.security.identity.SecurityIdentity

@Path("/api/v1/collector")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class CollectorSubmissionResource @Inject constructor(
    private val ingestionService: SubmissionIngestionService,
    private val securityIdentity: SecurityIdentity
) {

    @POST
    @Path("/submissions")
    @RolesAllowed("collector.submissions.create")
    fun submitMetric(
        @Valid request: SubmissionRequest,
        @HeaderParam("Idempotency-Key") idempotencyKey: String?  // Optional header
    ): Response {
        val principal = securityIdentity.principal as AuthenticatedUser

        // Resolve tenant from JWT
        val tenantId = principal.tenantId
        val userId = principal.userId

        // Ingest submission
        val response = ingestionService.ingestSubmission(request, tenantId, userId)

        // Return 201 Created for new submission, 200 OK for duplicate
        val status = if (response.isNewSubmission) 201 else 200

        return Response.status(status)
            .header("X-Submission-Id", response.id.toString())
            .header("X-Submission-UUID", response.submissionUuid.toString())
            .entity(response)
            .build()
    }
}

Request Validation:

data class SubmissionRequest(
    @field:NotNull(message = "submission_uuid is required")
    val submissionUuid: UUID,

    @field:NotNull(message = "reporting_period_id is required")
    val reportingPeriodId: UUID,

    @field:NotNull(message = "site_id is required")
    val siteId: UUID,

    @field:NotNull(message = "metric_definition_id is required")
    val metricDefinitionId: UUID,

    @field:NotNull(message = "value is required")
    @field:DecimalMin(value = "0.0", message = "value must be non-negative")
    val value: Double,

    @field:NotBlank(message = "unit_of_measure is required")
    val unitOfMeasure: String,

    val metadata: Map<String, Any> = emptyMap(),

    val evidenceFileIds: List<UUID> = emptyList()
)

Stage 2: Validation Job

Quarkus Reactive Messaging Consumer (SmallRye):

import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class ValidationJobListener @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val validationEngine: ValidationEngine,
    @Channel("processing-queue") private val processingEmitter: Emitter<ProcessingJobMessage>
) {

    @Incoming("validation-queue")
    @Blocking  // Process on worker thread, not event loop
    fun handleValidationJob(message: ValidationJobMessage) {
        logger.info("Processing validation job for submission: ${message.submissionId}")

        val submission = submissionRepository.findByIdOptional(message.submissionId)
            .orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }

        try {
            // Run all 6 validation types
            val validationResult = validationEngine.validate(submission)

            if (validationResult.isValid) {
                // Update state to VALIDATED
                submission.state = SubmissionState.VALIDATED
                submission.validatedAt = Instant.now()
                submissionRepository.persist(submission)

                // Queue processing job using Emitter
                processingEmitter.send(ProcessingJobMessage(submissionId = submission.id))

                logger.info("Submission ${submission.id} validated successfully")
            } else {
                // Validation failed
                submission.state = SubmissionState.VALIDATION_FAILED
                submission.validationErrors = validationResult.errors
                submissionRepository.persist(submission)

                logger.warn("Submission ${submission.id} validation failed: ${validationResult.errors}")
            }
        } catch (e: Exception) {
            logger.error("Validation job failed for submission ${submission.id}", e)
            throw e  // Re-throw to trigger retry
        }
    }
}

Retry Configuration (application.properties):

# Quarkus Reactive Messaging with RabbitMQ connector
# Validation queue configuration
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.failure-strategy=dead-letter-queue
mp.messaging.incoming.validation-queue.dead-letter-queue.name=validation-dlq
mp.messaging.incoming.validation-queue.dead-letter-exchange.name=dlx-exchange
mp.messaging.incoming.validation-queue.max-retries=3
mp.messaging.incoming.validation-queue.retry-attempts=3
mp.messaging.incoming.validation-queue.retry-interval=2000
mp.messaging.incoming.validation-queue.retry-multiplier=2.0
mp.messaging.incoming.validation-queue.retry-max-interval=30000

# Processing queue configuration
mp.messaging.outgoing.processing-queue.connector=smallrye-rabbitmq
mp.messaging.outgoing.processing-queue.exchange.name=processing-exchange
mp.messaging.outgoing.processing-queue.exchange.type=topic
mp.messaging.outgoing.processing-queue.routing-keys=processing.submission.validated
mp.messaging.outgoing.processing-queue.default-routing-key=processing.submission.validated
mp.messaging.outgoing.processing-queue.durable=true

# RabbitMQ connection
rabbitmq-host=${RABBITMQ_HOST:localhost}
rabbitmq-port=${RABBITMQ_PORT:5672}
rabbitmq-username=${RABBITMQ_USERNAME:guest}
rabbitmq-password=${RABBITMQ_PASSWORD:guest}
rabbitmq-virtual-host=${RABBITMQ_VHOST:/}

# Profile-specific overrides
%dev.rabbitmq-host=localhost
%test.rabbitmq-host=localhost

SmallRye Fault Tolerance for Retry:

import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import jakarta.enterprise.context.ApplicationScoped

@ApplicationScoped
class ValidationService {

    @Retry(
        maxRetries = 3,
        delay = 2000,          // 2 seconds
        maxDuration = 30000,   // 30 seconds max
        jitter = 200           // Random jitter to prevent thundering herd
    )
    @Fallback(fallbackMethod = "fallbackValidation")
    fun validateSubmissionWithRetry(submissionId: UUID): ValidationResult {
        return validationEngine.validate(submissionRepository.findByIdOptional(submissionId).orElseThrow())
    }

    private fun fallbackValidation(submissionId: UUID): ValidationResult {
        logger.error("Validation failed after retries for submission: $submissionId")
        return ValidationResult(isValid = false, errors = listOf("Validation service unavailable"))
    }
}

Stage 3: Processing Job

Processing Job Listener (Quarkus Reactive Messaging):

import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class ProcessingJobListener @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val calculationService: CalculationService,
    private val notificationService: NotificationService
) {

    @Incoming("processing-queue")
    @Blocking  // Process on worker thread for blocking I/O operations
    fun handleProcessingJob(message: ProcessingJobMessage) {
        logger.info("Processing job for submission: ${message.submissionId}")

        val submission = submissionRepository.findByIdOptional(message.submissionId)
            .orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }

        try {
            // Perform calculations (e.g., carbon emission factors)
            val calculatedMetrics = calculationService.calculate(submission)

            // Update submission with calculated values
            submission.state = SubmissionState.PROCESSED
            submission.calculatedValues = calculatedMetrics
            submission.processedAt = Instant.now()
            submissionRepository.persist(submission)

            // Notify mobile app (async with virtual threads)
            notificationService.notifySubmissionProcessed(submission)

            logger.info("Submission ${submission.id} processed successfully")
        } catch (e: Exception) {
            logger.error("Processing job failed for submission ${submission.id}", e)
            throw e  // Re-throw to trigger retry
        }
    }
}

/**
 * Alternative: Non-blocking reactive processing with Mutiny
 */
@ApplicationScoped
class ReactiveProcessingJobListener @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val calculationService: CalculationService,
    private val notificationService: NotificationService
) {

    @Incoming("processing-queue")
    fun handleProcessingJobReactive(message: ProcessingJobMessage): Uni<Void> {
        logger.info("Processing job for submission: ${message.submissionId}")

        return submissionRepository.findByIdOptional(message.submissionId)
            .map { submission ->
                submission.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
            }
            .onItem().transformToUni { submission ->
                // Perform calculations asynchronously
                calculationService.calculateAsync(submission)
                    .onItem().transform { calculatedMetrics ->
                        submission.state = SubmissionState.PROCESSED
                        submission.calculatedValues = calculatedMetrics
                        submission.processedAt = Instant.now()
                        submission
                    }
            }
            .onItem().transformToUni { submission ->
                submissionRepository.persist(submission)
                    .onItem().transformToUni {
                        // Notify mobile app asynchronously
                        notificationService.notifySubmissionProcessedAsync(submission)
                    }
            }
            .onItem().invoke { 
                logger.info("Submission ${message.submissionId} processed successfully") 
            }
            .onFailure().invoke { e ->
                logger.error("Processing job failed for submission ${message.submissionId}", e)
            }
            .replaceWithVoid()
    }
}


Error Recovery Procedures

Scenario 1: Network Timeout During Upload

Problem: Mobile app sends POST request, but network drops before response received.

Client Behavior:

// Mobile app (Kotlin)
suspend fun uploadSubmission(submission: SubmissionDraft): Result<SubmissionResponse> {
    return withContext(Dispatchers.IO) {
        retryWithExponentialBackoff(
            maxRetries = 5,
            initialDelay = 2000L,
            maxDelay = 32000L
        ) {
            try {
                // Use SAME UUID for all retries
                val response = apiService.submitMetric(submission)

                // Mark as uploaded locally
                database.submissionDao().updateState(
                    submissionUuid = submission.submissionUuid,
                    state = SubmissionState.RECEIVED,
                    serverId = response.id
                )

                Result.success(response)
            } catch (e: SocketTimeoutException) {
                // Network timeout - retry with same UUID
                logger.warn("Network timeout for submission ${submission.submissionUuid}, retrying...")
                throw e  // Trigger retry
            }
        }
    }
}

Server Behavior: - First request: Creates submission, returns 201 Created - Retry requests: Idempotency check returns existing submission, returns 200 OK - Outcome: No duplicate submission created

Scenario 2: Validation Job Fails (Transient Error)

Problem: Validation job encounters transient error (e.g., database connection timeout).

Retry Strategy: - Attempt 1: Immediate execution - Attempt 2: After 2 seconds (exponential backoff) - Attempt 3: After 4 seconds - After 3 failures: Move to Dead Letter Queue

Code (Quarkus Fault Tolerance):

import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class ValidationJobListener @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val validationEngine: ValidationEngine,
    private val adminNotificationService: AdminNotificationService
) {

    @Incoming("validation-queue")
    @Blocking
    @Retry(
        maxRetries = 3,
        delay = 2000,         // Initial delay: 2 seconds
        maxDuration = 30000,  // Max duration: 30 seconds
        jitter = 200,         // Random jitter: 200ms
        retryOn = [TransientDataAccessException::class, TimeoutException::class]
    )
    @Fallback(fallbackMethod = "recoverFromValidationFailure")
    fun handleValidationJob(message: ValidationJobMessage) {
        logger.info("Processing validation job for submission: ${message.submissionId}")

        val submission = submissionRepository.findByIdOptional(message.submissionId)
            .orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }

        // Validation logic
        val result = validationEngine.validate(submission)

        if (result.isValid) {
            submission.state = SubmissionState.VALIDATED
            submission.validatedAt = Instant.now()
            submissionRepository.persist(submission)
        } else {
            submission.state = SubmissionState.VALIDATION_FAILED
            submission.validationErrors = result.errors
            submissionRepository.persist(submission)
        }
    }

    // Fallback method - called after all retries exhausted
    private fun recoverFromValidationFailure(message: ValidationJobMessage) {
        logger.error("Validation job failed after 3 attempts: ${message.submissionId}")

        // Update submission state
        val submission = submissionRepository.findByIdOptional(message.submissionId).orElse(null)
        submission?.let {
            it.state = SubmissionState.FAILED
            it.errorMessage = "Validation failed after maximum retry attempts"
            submissionRepository.persist(it)
        }

        // Notify admin
        adminNotificationService.notifyValidationFailure(message.submissionId, 
            RuntimeException("Max retries exceeded"))
    }
}

Scenario 3: Database Deadlock

Problem: Multiple concurrent requests cause database deadlock.

Detection:

try {
    submissionRepository.save(submission)
} catch (e: CannotAcquireLockException) {
    // PostgreSQL deadlock detected
    logger.warn("Deadlock detected for submission ${submission.submissionUuid}, rolling back")

    throw ServiceUnavailableException(
        code = "SYSTEM_DEADLOCK",
        message = "Database is temporarily busy. Please retry in a few seconds.",
        retryAfter = 5  // seconds
    )
}

Client Behavior:

// Mobile app handles 503 Service Unavailable
if (response.statusCode == 503) {
    val retryAfter = response.headers["Retry-After"]?.toIntOrNull() ?: 5
    delay(retryAfter * 1000L)

    // Retry with SAME UUID
    return uploadSubmission(submission)
}

Scenario 4: Duplicate Submission from Multiple Devices

Problem: User logs in on two devices, both upload same submission.

Prevention: - Mobile app generates UUID when draft is created (not when submitted) - Both devices use SAME UUID (stored in local database) - Server idempotency check prevents duplicate

Flow:

Device A: POST /submissions (UUID: abc-123) → Server creates submission
Device B: POST /submissions (UUID: abc-123) → Server returns existing submission (200 OK)


Retry Mechanisms

Client-Side Retry (Mobile App)

Exponential Backoff Implementation:

suspend fun <T> retryWithExponentialBackoff(
    maxRetries: Int = 5,
    initialDelay: Long = 2000L,  // 2 seconds
    maxDelay: Long = 32000L,      // 32 seconds
    factor: Double = 2.0,
    block: suspend () -> T
): T {
    var currentDelay = initialDelay
    repeat(maxRetries) { attempt ->
        try {
            return block()
        } catch (e: Exception) {
            if (attempt == maxRetries - 1) throw e

            // Log retry attempt
            logger.info("Retry attempt ${attempt + 1}/$maxRetries after ${currentDelay}ms")

            // Wait before retry
            delay(currentDelay)

            // Increase delay exponentially
            currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
        }
    }
    throw IllegalStateException("Retry logic failed")
}

Retry Decision Matrix:

Error Type HTTP Status Retry? Max Retries Notes
Network timeout N/A ✅ Yes 5 Use exponential backoff
DNS failure N/A ✅ Yes 3 May indicate network issue
SSL error N/A ❌ No 0 Certificate problem
400 Bad Request 400 ❌ No 0 Client error, fix request
401 Unauthorized 401 ✅ Yes 1 Refresh token, retry once
403 Forbidden 403 ❌ No 0 Permission denied
404 Not Found 404 ❌ No 0 Resource doesn't exist
409 Conflict 409 ❌ No 0 Duplicate detected
422 Validation 422 ❌ No 0 Invalid data
429 Rate Limit 429 ✅ Yes 3 Respect Retry-After header
500 Internal Error 500 ✅ Yes 5 Server error, retry
503 Unavailable 503 ✅ Yes 5 Server overload, retry

Server-Side Retry (Background Jobs)

Quarkus Reactive Messaging Configuration (application.properties):

# Validation queue with retry and DLQ
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.validation-queue.queue.x-dead-letter-routing-key=validation.dlq
mp.messaging.incoming.validation-queue.queue.x-message-ttl=300000
mp.messaging.incoming.validation-queue.failure-strategy=dead-letter-queue
mp.messaging.incoming.validation-queue.dead-letter-queue.exchange.name=dlx-exchange
mp.messaging.incoming.validation-queue.dead-letter-queue.routing-key=validation.dlq
mp.messaging.incoming.validation-queue.max-retries=3

# Processing queue with retry and DLQ
mp.messaging.incoming.processing-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-queue.queue.name=processing-queue
mp.messaging.incoming.processing-queue.queue.durable=true
mp.messaging.incoming.processing-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.processing-queue.queue.x-dead-letter-routing-key=processing.dlq
mp.messaging.incoming.processing-queue.queue.x-message-ttl=600000
mp.messaging.incoming.processing-queue.failure-strategy=dead-letter-queue

Job Retry Logic with Acknowledgment Strategy:

import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Message
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage
import jakarta.enterprise.context.ApplicationScoped
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture

@ApplicationScoped
class ValidationJobProcessor @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val validationEngine: ValidationEngine
) {

    @Incoming("validation-queue")
    fun process(message: Message<ValidationJobMessage>): CompletionStage<Void> {
        val payload = message.payload

        return try {
            // Process validation
            validateSubmission(payload.submissionId)

            // Acknowledge success
            message.ack()

        } catch (e: TransientDataAccessException) {
            // Transient error - NACK to trigger retry
            logger.warn("Transient error, will retry: ${e.message}")
            message.nack(e)  // Quarkus will requeue based on failure-strategy
            CompletableFuture.completedFuture(null)

        } catch (e: PermanentValidationException) {
            // Permanent error - acknowledge but mark as failed
            logger.error("Permanent error, moving to DLQ: ${e.message}")
            message.nack(e)  // Will go to DLQ after retries exhausted
            CompletableFuture.completedFuture(null)
        }
    }

    private fun validateSubmission(submissionId: UUID) {
        val submission = submissionRepository.findByIdOptional(submissionId)
            .orElseThrow { NotFoundException("Submission not found: $submissionId") }

        val result = validationEngine.validate(submission)

        if (result.isValid) {
            submission.state = SubmissionState.VALIDATED
            submission.validatedAt = Instant.now()
        } else {
            submission.state = SubmissionState.VALIDATION_FAILED
            submission.validationErrors = result.errors
        }

        submissionRepository.persist(submission)
    }
}


Message Queue Architecture

Queue Topology

┌──────────────────┐
│  validation-     │
│  exchange        │──┐
│  (topic)         │  │
└──────────────────┘  │
                      ├──▶ validation-queue ──▶ ValidationWorker
                      │        (3 retries)
                      └──▶ validation-dlq ──▶ ManualReview
                               (no retry)

┌──────────────────┐
│  processing-     │
│  exchange        │──┐
│  (topic)         │  │
└──────────────────┘  │
                      ├──▶ processing-queue ──▶ ProcessingWorker
                      │        (3 retries)
                      └──▶ processing-dlq ──▶ ManualReview
                               (no retry)

Queue Configuration

Validation Queue (application.properties):

# Validation incoming channel (consumer)
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.queue.declare=true
mp.messaging.incoming.validation-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.validation-queue.queue.x-dead-letter-routing-key=validation.dlq
mp.messaging.incoming.validation-queue.queue.x-message-ttl=300000
mp.messaging.incoming.validation-queue.queue.x-max-priority=10
mp.messaging.incoming.validation-queue.exchange.name=validation-exchange
mp.messaging.incoming.validation-queue.exchange.type=topic
mp.messaging.incoming.validation-queue.routing-keys=validation.#

# Validation outgoing channel (producer)
mp.messaging.outgoing.validation-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.validation-events.exchange.name=validation-exchange
mp.messaging.outgoing.validation-events.exchange.type=topic
mp.messaging.outgoing.validation-events.exchange.durable=true
mp.messaging.outgoing.validation-events.default-routing-key=validation.submission.received

Processing Queue (application.properties):

# Processing incoming channel
mp.messaging.incoming.processing-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-queue.queue.name=processing-queue
mp.messaging.incoming.processing-queue.queue.durable=true
mp.messaging.incoming.processing-queue.queue.declare=true
mp.messaging.incoming.processing-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.processing-queue.queue.x-dead-letter-routing-key=processing.dlq
mp.messaging.incoming.processing-queue.queue.x-message-ttl=600000
mp.messaging.incoming.processing-queue.exchange.name=processing-exchange
mp.messaging.incoming.processing-queue.exchange.type=topic
mp.messaging.incoming.processing-queue.routing-keys=processing.#

# Processing outgoing channel
mp.messaging.outgoing.processing-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.processing-events.exchange.name=processing-exchange
mp.messaging.outgoing.processing-events.exchange.type=topic
mp.messaging.outgoing.processing-events.exchange.durable=true
mp.messaging.outgoing.processing-events.default-routing-key=processing.submission.validated

Priority Queuing

Use Case: Urgent submissions (e.g., near deadline) should process first.

import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.reactive.messaging.Message
import io.smallrye.reactive.messaging.rabbitmq.OutgoingRabbitMQMetadata
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class SubmissionQueueService @Inject constructor(
    @Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>
) {

    fun queueValidationJob(submission: MetricSubmission) {
        val priority = if (submission.isUrgent) 9 else 5

        // Create message with priority metadata
        val metadata = OutgoingRabbitMQMetadata.builder()
            .withPriority(priority)
            .withRoutingKey("validation.submission.received")
            .build()

        val message = Message.of(ValidationJobMessage(submissionId = submission.id))
            .addMetadata(metadata)

        validationEmitter.send(message)
        logger.info("Queued validation job for submission ${submission.id} with priority $priority")
    }
}

Dead Letter Queue Management

DLQ Purpose

Dead Letter Queue (DLQ) captures messages that: - Failed after max retries (3 attempts) - Expired (TTL exceeded) - Rejected by consumer (permanent errors)

DLQ Configuration

application.properties:

# Dead Letter Exchange
mp.messaging.outgoing.dlx-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.dlx-events.exchange.name=dlx-exchange
mp.messaging.outgoing.dlx-events.exchange.type=direct
mp.messaging.outgoing.dlx-events.exchange.durable=true

# Validation DLQ
mp.messaging.incoming.validation-dlq.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-dlq.queue.name=validation-dlq
mp.messaging.incoming.validation-dlq.queue.durable=true
mp.messaging.incoming.validation-dlq.queue.declare=true
mp.messaging.incoming.validation-dlq.queue.x-queue-mode=lazy
mp.messaging.incoming.validation-dlq.exchange.name=dlx-exchange
mp.messaging.incoming.validation-dlq.exchange.type=direct
mp.messaging.incoming.validation-dlq.routing-keys=validation.dlq

# Processing DLQ
mp.messaging.incoming.processing-dlq.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-dlq.queue.name=processing-dlq
mp.messaging.incoming.processing-dlq.queue.durable=true
mp.messaging.incoming.processing-dlq.queue.declare=true
mp.messaging.incoming.processing-dlq.queue.x-queue-mode=lazy
mp.messaging.incoming.processing-dlq.exchange.name=dlx-exchange
mp.messaging.incoming.processing-dlq.exchange.type=direct
mp.messaging.incoming.processing-dlq.routing-keys=processing.dlq

DLQ Consumer (for monitoring/reprocessing):

import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Message
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class ValidationDLQConsumer @Inject constructor(
    private val alertService: AlertService,
    private val auditLogService: AuditLogService
) {

    @Incoming("validation-dlq")
    @Blocking
    fun handleDLQMessage(message: Message<ValidationJobMessage>): CompletionStage<Void> {
        val payload = message.payload

        logger.error("Message in validation DLQ: ${payload.submissionId}")

        // Log to audit trail
        auditLogService.logDLQMessage(
            queueName = "validation-queue",
            submissionId = payload.submissionId,
            timestamp = Instant.now()
        )

        // Send alert
        alertService.sendAlert(
            severity = AlertSeverity.HIGH,
            message = "Submission ${payload.submissionId} failed validation after max retries",
            tags = listOf("dlq", "validation")
        )

        return message.ack()
    }
}

DLQ Monitoring

Scheduled Job to Check DLQ Depth (Quarkus Scheduler):

import io.quarkus.scheduler.Scheduled
import com.rabbitmq.client.ConnectionFactory
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.config.inject.ConfigProperty

@ApplicationScoped
class DLQMonitor @Inject constructor(
    private val alertService: AlertService,
    @ConfigProperty(name = "rabbitmq-host") private val rabbitHost: String,
    @ConfigProperty(name = "rabbitmq-port") private val rabbitPort: Int,
    @ConfigProperty(name = "rabbitmq-username") private val rabbitUsername: String,
    @ConfigProperty(name = "rabbitmq-password") private val rabbitPassword: String
) {

    @Scheduled(every = "60s")  // Every minute
    fun checkDLQDepth() {
        val factory = ConnectionFactory().apply {
            host = rabbitHost
            port = rabbitPort
            username = rabbitUsername
            password = rabbitPassword
        }

        factory.newConnection().use { connection ->
            connection.createChannel().use { channel ->
                // Get validation DLQ depth
                val validationDLQDepth = try {
                    channel.queueDeclarePassive("validation-dlq").messageCount
                } catch (e: Exception) {
                    logger.warn("Failed to check validation DLQ depth", e)
                    0
                }

                // Get processing DLQ depth
                val processingDLQDepth = try {
                    channel.queueDeclarePassive("processing-dlq").messageCount
                } catch (e: Exception) {
                    logger.warn("Failed to check processing DLQ depth", e)
                    0
                }

                // Alert if DLQ depth exceeds threshold
                if (validationDLQDepth > 10) {
                    alertService.sendAlert(
                        severity = AlertSeverity.HIGH,
                        message = "Validation DLQ depth is $validationDLQDepth (threshold: 10)",
                        tags = listOf("dlq", "validation")
                    )
                }

                if (processingDLQDepth > 10) {
                    alertService.sendAlert(
                        severity = AlertSeverity.HIGH,
                        message = "Processing DLQ depth is $processingDLQDepth (threshold: 10)",
                        tags = listOf("dlq", "processing")
                    )
                }

                logger.debug("DLQ depths - validation: $validationDLQDepth, processing: $processingDLQDepth")
            }
        }
    }
}

Manual DLQ Reprocessing

Admin Endpoint to Retry Failed Jobs (JAX-RS):

import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.rabbitmq.client.ConnectionFactory
import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.microprofile.config.inject.ConfigProperty

@Path("/api/v1/admin/dlq")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class DLQManagementResource @Inject constructor(
    @Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
    private val objectMapper: ObjectMapper,
    @ConfigProperty(name = "rabbitmq-host") private val rabbitHost: String,
    @ConfigProperty(name = "rabbitmq-port") private val rabbitPort: Int,
    @ConfigProperty(name = "rabbitmq-username") private val rabbitUsername: String,
    @ConfigProperty(name = "rabbitmq-password") private val rabbitPassword: String
) {

    @POST
    @Path("/validation/retry/{submissionId}")
    @RolesAllowed("admin.dlq.retry")
    fun retryValidationJob(@PathParam("submissionId") submissionId: UUID): Response {
        // Re-queue message from DLQ
        validationEmitter.send(ValidationJobMessage(submissionId = submissionId))

        logger.info("Re-queued validation job for submission: $submissionId")

        return Response.accepted().build()
    }

    @GET
    @Path("/validation/list")
    @RolesAllowed("admin.dlq.view")
    fun listValidationDLQ(): Response {
        val messages = mutableListOf<DLQMessage>()

        val factory = ConnectionFactory().apply {
            host = rabbitHost
            port = rabbitPort
            username = rabbitUsername
            password = rabbitPassword
        }

        factory.newConnection().use { connection ->
            connection.createChannel().use { channel ->
                var messageCount = channel.queueDeclarePassive("validation-dlq").messageCount

                while (messageCount > 0 && messages.size < 100) {
                    val response = channel.basicGet("validation-dlq", false)
                    if (response != null) {
                        val message = objectMapper.readValue(
                            response.body,
                            ValidationJobMessage::class.java
                        )
                        messages.add(DLQMessage(
                            submissionId = message.submissionId,
                            deliveryTag = response.envelope.deliveryTag,
                            redelivered = response.envelope.isRedeliver
                        ))

                        // NACK the message to keep it in the queue
                        channel.basicNack(response.envelope.deliveryTag, false, true)
                    }
                    messageCount--
                }
            }
        }

        return Response.ok(messages).build()
    }
}

data class DLQMessage(
    val submissionId: UUID,
    val deliveryTag: Long,
    val redelivered: Boolean
)


Monitoring and Alerting

Key Metrics to Monitor

  1. Ingestion Rate: Submissions received per minute
  2. Idempotency Cache Hit Rate: % of requests served from cache
  3. Validation Success Rate: % of submissions passing validation
  4. Processing Success Rate: % of submissions processed successfully
  5. Queue Depth: Number of messages in validation/processing queues
  6. DLQ Depth: Number of messages in dead letter queues
  7. Average Processing Time: Time from RECEIVED to PROCESSED
  8. Error Rate: 4xx and 5xx responses per minute

Metrics Implementation

Micrometer Metrics:

@ApplicationScoped
class SubmissionIngestionService(
    private val meterRegistry: MeterRegistry
) {
    private val ingestCounter = meterRegistry.counter("submissions.ingested", "status", "success")
    private val duplicateCounter = meterRegistry.counter("submissions.duplicate")
    private val errorCounter = meterRegistry.counter("submissions.ingested", "status", "error")
    private val ingestionTimer = meterRegistry.timer("submissions.ingestion.duration")

    @Transactional
    fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
        return ingestionTimer.recordCallable {
            try {
                val response = doIngest(request, tenantId, userId)

                if (response.isNewSubmission) {
                    ingestCounter.increment()
                } else {
                    duplicateCounter.increment()
                }

                response
            } catch (e: Exception) {
                errorCounter.increment()
                throw e
            }
        }!!
    }
}

Alerting Rules

Prometheus Alerts:

groups:
  - name: ingestion_pipeline
    interval: 30s
    rules:
      - alert: HighErrorRate
        expr: rate(submissions_ingested_total{status="error"}[5m]) > 0.05
        for: 5m
        labels:
          severity: critical
        annotations:
          summary: "High error rate in ingestion pipeline"
          description: "Error rate is {{ $value }} (threshold: 0.05)"

      - alert: HighDLQDepth
        expr: rabbitmq_queue_messages{queue=~".*-dlq"} > 10
        for: 2m
        labels:
          severity: high
        annotations:
          summary: "Dead letter queue depth exceeds threshold"
          description: "DLQ {{ $labels.queue }} has {{ $value }} messages"

      - alert: LowValidationSuccessRate
        expr: rate(validation_success_total[5m]) / rate(validation_total[5m]) < 0.90
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "Validation success rate below 90%"
          description: "Success rate is {{ $value }}"

      - alert: SlowProcessing
        expr: histogram_quantile(0.95, rate(submissions_ingestion_duration_seconds_bucket[5m])) > 5
        for: 10m
        labels:
          severity: warning
        annotations:
          summary: "95th percentile ingestion time exceeds 5 seconds"
          description: "P95 latency is {{ $value }}s"

Logging Strategy

Structured Logging:

@ApplicationScoped
class SubmissionIngestionService(
    private val logger: KLogger = KotlinLogging.logger {}
) {
    fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
        logger.info {
            mapOf(
                "event" to "submission_ingestion_started",
                "submissionUuid" to request.submissionUuid,
                "tenantId" to tenantId,
                "userId" to userId,
                "metricDefinitionId" to request.metricDefinitionId
            )
        }

        // ... ingestion logic ...

        logger.info {
            mapOf(
                "event" to "submission_ingestion_completed",
                "submissionUuid" to request.submissionUuid,
                "submissionId" to response.id,
                "isNewSubmission" to response.isNewSubmission,
                "durationMs" to duration
            )
        }

        return response
    }
}


Performance Considerations

Database Optimization

Indexes:

-- Primary index for deduplication checks
CREATE UNIQUE INDEX idx_submissions_uuid
ON metric_submissions (tenant_id, submission_uuid);

-- Index for queue job lookups
CREATE INDEX idx_submissions_state
ON metric_submissions (tenant_id, state, created_at);

-- Index for audit log queries
CREATE INDEX idx_audit_logs_submission
ON submission_audit_logs (submission_id, created_at DESC);

-- Partial index for active queue
CREATE INDEX idx_submissions_queue
ON metric_submissions (tenant_id, created_at)
WHERE state IN ('RECEIVED', 'VALIDATED');

Connection Pooling:

# application.properties
# Quarkus uses Agroal connection pool by default

# Datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=${DB_USERNAME:esg_user}
quarkus.datasource.password=${DB_PASSWORD}
quarkus.datasource.jdbc.url=jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:5432}/${DB_NAME:esg_db}

# Agroal connection pool settings
quarkus.datasource.jdbc.max-size=20
quarkus.datasource.jdbc.min-size=5
quarkus.datasource.jdbc.acquisition-timeout=30s
quarkus.datasource.jdbc.idle-removal-interval=10m
quarkus.datasource.jdbc.max-lifetime=30m

# Profile-specific overrides
%dev.quarkus.datasource.jdbc.max-size=5
%test.quarkus.datasource.jdbc.max-size=10

Caching Strategy

Redis Configuration:

# Quarkus Redis Client (quarkus-redis-client extension)
quarkus.redis.hosts=redis://${REDIS_HOST:redis.internal}:${REDIS_PORT:6379}
quarkus.redis.password=${REDIS_PASSWORD}
quarkus.redis.timeout=2s
quarkus.redis.max-pool-size=20
quarkus.redis.max-waiting-handlers=10

# Profile-specific Redis configuration
%dev.quarkus.redis.hosts=redis://localhost:6379
%dev.quarkus.redis.password=
%test.quarkus.redis.hosts=redis://localhost:6379

Redis Client Usage:

@ApplicationScoped
class IdempotencyCacheService(
    private val redisClient: RedisDataSource
) {
    fun checkIdempotency(idempotencyKey: String): Boolean {
        val commands = redisClient.value(String::class.java)
        return commands.get(idempotencyKey) != null
    }

    fun cacheSubmission(idempotencyKey: String, submissionId: UUID, ttl: Duration) {
        val commands = redisClient.value(String::class.java)
        commands.setex(idempotencyKey, ttl.seconds, submissionId.toString())
    }
}

Cache Hit Rate Target: > 95% for idempotency cache

Async Processing

All validation and processing should be async: - HTTP response time: < 200ms (just save to database) - Validation time: 1-5 seconds (async job) - Processing time: 5-30 seconds (async job)

Quarkus Async Patterns:

1. Reactive Messaging (SmallRye) - Recommended for message-driven async:

import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional

@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
    private val submissionRepository: MetricSubmissionRepository,
    @Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>
) {
    @Transactional
    fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
        // Synchronous: Save to database (fast)
        val submission = submissionRepository.persist(newSubmission)

        // Asynchronous: Queue validation job using Emitter (don't wait)
        validationEmitter.send(ValidationJobMessage(submissionId = submission.id))

        // Return immediately
        return SubmissionResponse.fromEntity(submission)
    }
}

2. Virtual Threads (@RunOnVirtualThread) - For blocking I/O operations:

import io.quarkus.virtual.threads.VirtualThreads
import io.smallrye.common.annotation.RunOnVirtualThread
import jakarta.enterprise.context.ApplicationScoped
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType

@Path("/api/v1/reports")
@Produces(MediaType.APPLICATION_JSON)
class ReportGenerationResource @Inject constructor(
    private val reportService: ReportService
) {

    @GET
    @Path("/{reportId}/generate")
    @RunOnVirtualThread  // Runs on virtual thread - can block without issue
    fun generateReport(@PathParam("reportId") reportId: UUID): Report {
        // This can perform blocking I/O operations
        // Virtual threads are lightweight and numerous (millions possible)
        return reportService.generateBlockingReport(reportId)
    }
}

@ApplicationScoped
class ReportService {

    @RunOnVirtualThread
    fun generateBlockingReport(reportId: UUID): Report {
        // Blocking operations are fine on virtual threads
        val data = fetchDataFromDatabase()  // Blocks
        val processed = processData(data)    // Blocks
        return createReport(processed)       // Blocks
    }
}

3. Mutiny (Reactive Streams) - For reactive, non-blocking operations:

import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.Multi
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject

@ApplicationScoped
class ReactiveSubmissionService @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val calculationService: CalculationService
) {

    // Uni<T> - Single async result (like CompletableFuture)
    fun processSubmissionAsync(submissionId: UUID): Uni<SubmissionResult> {
        return submissionRepository.findByIdOptional(submissionId)
            .onItem().ifNull().failWith { NotFoundException("Submission not found") }
            .onItem().transformToUni { submission ->
                // Chain async operations
                calculationService.calculateAsync(submission)
                    .onItem().transform { calculatedMetrics ->
                        submission.calculatedValues = calculatedMetrics
                        submission.state = SubmissionState.PROCESSED
                        submission
                    }
            }
            .onItem().transformToUni { submission ->
                submissionRepository.persist(submission)
            }
            .onItem().transform { submission ->
                SubmissionResult.fromEntity(submission)
            }
    }

    // Multi<T> - Stream of async results
    fun streamSubmissionsAsync(tenantId: UUID): Multi<Submission> {
        return submissionRepository.streamByTenantId(tenantId)
            .onItem().transform { submission ->
                // Process each item asynchronously
                enrichSubmissionData(submission)
            }
    }
}

4. CDI Events (@ObservesAsync) - For async event handling:

import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Event
import jakarta.enterprise.event.ObservesAsync
import jakarta.inject.Inject

// Event payload
data class SubmissionReceivedEvent(
    val submissionId: UUID,
    val tenantId: UUID,
    val timestamp: Instant
)

@ApplicationScoped
class SubmissionEventPublisher @Inject constructor(
    private val event: Event<SubmissionReceivedEvent>
) {
    fun publishSubmissionReceived(submission: MetricSubmission) {
        // Fire event asynchronously
        event.fireAsync(SubmissionReceivedEvent(
            submissionId = submission.id,
            tenantId = submission.tenantId,
            timestamp = Instant.now()
        ))
    }
}

@ApplicationScoped
class SubmissionEventListener @Inject constructor(
    private val notificationService: NotificationService,
    private val analyticsService: AnalyticsService
) {

    // Async observer - runs on different thread
    fun onSubmissionReceived(@ObservesAsync event: SubmissionReceivedEvent) {
        // Send notification
        notificationService.notifyDataTeam(event.submissionId)

        // Update analytics
        analyticsService.trackSubmission(event.tenantId, event.timestamp)

        logger.info("Processed submission received event: ${event.submissionId}")
    }
}

5. ManagedExecutor - For custom thread pools:

import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Produces
import org.eclipse.microprofile.context.ManagedExecutor
import org.eclipse.microprofile.context.ThreadContext
import java.util.concurrent.CompletableFuture

@ApplicationScoped
class ExecutorConfiguration {

    @Produces
    @ApplicationScoped
    fun createManagedExecutor(): ManagedExecutor {
        return ManagedExecutor.builder()
            .maxAsync(50)       // Max concurrent tasks
            .maxQueued(100)     // Max queued tasks
            .build()
    }
}

@ApplicationScoped
class AsyncReportService @Inject constructor(
    private val executor: ManagedExecutor
) {

    fun generateReportAsync(reportId: UUID): CompletableFuture<Report> {
        return executor.supplyAsync {
            // Runs on managed thread pool
            // Context propagation handled automatically
            generateReport(reportId)
        }
    }
}

Pattern Selection Guide: | Use Case | Pattern | When to Use | |----------|---------|-------------| | Message queues | Reactive Messaging | RabbitMQ, Kafka, AMQP | | Blocking I/O | @RunOnVirtualThread | Database calls, file I/O, external APIs | | Reactive pipelines | Mutiny Uni/Multi | Non-blocking chains, streams | | Event bus | CDI @ObservesAsync | Internal application events | | Custom threading | ManagedExecutor | Fine-grained thread pool control |


Security Requirements

Tenant Isolation

CRITICAL: Every query must include tenant_id filter.

Enforcement via Hibernate Filter:

@Entity
@Table(name = "metric_submissions")
@FilterDef(name = "tenantFilter", parameters = [ParamDef(name = "tenantId", type = "uuid")])
@Filter(name = "tenantFilter", condition = "tenant_id = :tenantId")
class MetricSubmission(
    @Id
    val id: UUID,

    @Column(name = "tenant_id", nullable = false)
    val tenantId: UUID,

    // ... other fields
)

Activate Filter on Every Request (JAX-RS ContainerRequestFilter):

import jakarta.ws.rs.container.ContainerRequestContext
import jakarta.ws.rs.container.ContainerRequestFilter
import jakarta.ws.rs.container.ContainerResponseContext
import jakarta.ws.rs.container.ContainerResponseFilter
import jakarta.ws.rs.ext.Provider
import jakarta.persistence.EntityManager
import jakarta.inject.Inject
import io.quarkus.security.identity.SecurityIdentity

@Provider
class TenantIsolationFilter @Inject constructor(
    private val entityManager: EntityManager,
    private val securityIdentity: SecurityIdentity
) : ContainerRequestFilter, ContainerResponseFilter {

    override fun filter(requestContext: ContainerRequestContext) {
        if (securityIdentity.isAnonymous) {
            return
        }

        // Extract tenant ID from authenticated user
        val tenantId = securityIdentity.getAttribute<UUID>("tenantId")

        if (tenantId != null) {
            // Enable Hibernate filter for tenant isolation
            val session = entityManager.unwrap(org.hibernate.Session::class.java)
            session.enableFilter("tenantFilter").setParameter("tenantId", tenantId)

            logger.debug("Enabled tenant filter for tenant: $tenantId")
        }
    }

    override fun filter(
        requestContext: ContainerRequestContext,
        responseContext: ContainerResponseContext
    ) {
        // Disable filter after request completes
        if (!securityIdentity.isAnonymous) {
            val session = entityManager.unwrap(org.hibernate.Session::class.java)
            session.disableFilter("tenantFilter")
        }
    }
}

Idempotency Cache Security

Prevent cross-tenant access:

fun getCacheKey(submissionUuid: UUID, tenantId: UUID): String {
    return "idempotency:$tenantId:$submissionUuid"
}

Audit Logging

Log all ingestion events:

@ApplicationScoped
class AuditLogService {
    fun logSubmissionReceived(submission: MetricSubmission, userId: UUID) {
        auditLogRepository.save(SubmissionAuditLog(
            id = UUID.randomUUID(),
            submissionId = submission.id,
            tenantId = submission.tenantId,
            action = AuditAction.SUBMISSION_RECEIVED,
            actorUserId = userId,
            timestamp = Instant.now(),
            ipAddress = getCurrentRequest().remoteAddr,
            userAgent = getCurrentRequest().getHeader("User-Agent"),
            changes = objectMapper.writeValueAsString(submission)
        ))
    }
}


Testing Strategy

Unit Tests

Test Idempotency:

@Test
fun `test duplicate submission returns existing submission`() {
    val submissionUuid = UUID.randomUUID()
    val tenantId = UUID.randomUUID()
    val userId = UUID.randomUUID()

    // First submission
    val request1 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
    val response1 = ingestionService.ingestSubmission(request1, tenantId, userId)

    assertThat(response1.isNewSubmission).isTrue()

    // Duplicate submission (same UUID)
    val request2 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
    val response2 = ingestionService.ingestSubmission(request2, tenantId, userId)

    assertThat(response2.isNewSubmission).isFalse()
    assertThat(response2.id).isEqualTo(response1.id)

    // Verify only one row in database
    val count = submissionRepository.countByTenantIdAndSubmissionUuid(tenantId, submissionUuid)
    assertThat(count).isEqualTo(1)
}

Test Tenant Isolation:

@Test
fun `test tenant isolation prevents cross-tenant access`() {
    val submissionUuid = UUID.randomUUID()
    val tenant1 = UUID.randomUUID()
    val tenant2 = UUID.randomUUID()

    // Tenant 1 creates submission
    val request1 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
    ingestionService.ingestSubmission(request1, tenant1, UUID.randomUUID())

    // Tenant 2 tries to create submission with same UUID (should succeed - different tenant)
    val request2 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
    val response2 = ingestionService.ingestSubmission(request2, tenant2, UUID.randomUUID())

    assertThat(response2.isNewSubmission).isTrue()

    // Verify two rows exist (one per tenant)
    val count = submissionRepository.countBySubmissionUuid(submissionUuid)
    assertThat(count).isEqualTo(2)
}

Integration Tests

Test End-to-End Flow:

@QuarkusTest
@TestProfile(TestProfile.class)
class IngestionPipelineIntegrationTest {

    @Inject
    lateinit var restClient: RestClient

    @Test
    fun `test submission flows through entire pipeline`() {
        // Step 1: Submit metric
        val request = SubmissionRequest(submissionUuid = UUID.randomUUID(), /* ... */)
        val response = given()
            .contentType(ContentType.JSON)
            .body(request)
            .`when`()
            .post("/api/v1/collector/submissions")
            .then()
            .statusCode(201)
            .extract()
            .`as`(SubmissionResponse::class.java)

        val submissionId = response.id

        // Step 2: Wait for validation job to complete (async)
        await().atMost(Duration.ofSeconds(10)).until {
            val submission = submissionRepository.findById(submissionId).orElse(null)
            submission?.state == SubmissionState.VALIDATED
        }

        // Step 3: Wait for processing job to complete
        await().atMost(Duration.ofSeconds(10)).until {
            val submission = submissionRepository.findById(submissionId).orElse(null)
            submission?.state == SubmissionState.PROCESSED
        }

        // Step 4: Verify final state
        val finalSubmission = submissionRepository.findById(submissionId).orElseThrow()
        assertThat(finalSubmission.state).isEqualTo(SubmissionState.PROCESSED)
        assertThat(finalSubmission.calculatedValues).isNotNull()
    }
}

Load Tests

Test Throughput:

@Test
fun `test ingestion handles 1000 concurrent submissions`() {
    val tenantId = UUID.randomUUID()
    val latch = CountDownLatch(1000)

    val executor = Executors.newFixedThreadPool(100)

    repeat(1000) { i ->
        executor.submit {
            try {
                val request = SubmissionRequest(submissionUuid = UUID.randomUUID(), /* ... */)
                ingestionService.ingestSubmission(request, tenantId, UUID.randomUUID())
            } finally {
                latch.countDown()
            }
        }
    }

    // Wait for all submissions to complete (max 60 seconds)
    val completed = latch.await(60, TimeUnit.SECONDS)
    assertThat(completed).isTrue()

    // Verify all submissions were created
    val count = submissionRepository.countByTenantId(tenantId)
    assertThat(count).isEqualTo(1000)
}


Implementation Guide

Quarkus JAX-RS Resource

import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.validation.Valid
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import io.quarkus.security.identity.SecurityIdentity

@Path("/api/v1/collector")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@RolesAllowed("collector.submissions.create")
class CollectorSubmissionResource @Inject constructor(
    private val ingestionService: SubmissionIngestionService,
    private val securityIdentity: SecurityIdentity
) {

    @POST
    @Path("/submissions")
    fun submitMetric(@Valid request: SubmissionRequest): Response {
        // Extract tenant and user from security context
        val tenantId = securityIdentity.getAttribute<UUID>("tenantId")
            ?: throw ForbiddenException("Tenant ID not found in security context")
        val userId = securityIdentity.getAttribute<UUID>("userId")
            ?: throw ForbiddenException("User ID not found in security context")

        val response = ingestionService.ingestSubmission(request, tenantId, userId)

        val status = if (response.isNewSubmission) Response.Status.CREATED else Response.Status.OK

        return Response.status(status)
            .header("X-Submission-Id", response.id.toString())
            .header("X-Submission-UUID", response.submissionUuid.toString())
            .entity(response)
            .build()
    }
}

Service Layer

import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Duration
import java.time.Instant
import java.util.UUID

@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val idempotencyCache: IdempotencyCache,
    @Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
    private val auditLogService: AuditLogService,
    private val objectMapper: ObjectMapper
) {

    @Transactional
    fun ingestSubmission(
        request: SubmissionRequest,
        tenantId: UUID,
        userId: UUID
    ): SubmissionResponse {

        // Check idempotency cache
        idempotencyCache.get(request.submissionUuid)?.let { return it.toResponse(false) }

        // Check database with pessimistic lock
        val existing = submissionRepository.findByTenantIdAndSubmissionUuidForUpdate(
            tenantId, request.submissionUuid
        )

        if (existing != null) {
            idempotencyCache.put(request.submissionUuid, existing, Duration.ofHours(24))
            return existing.toResponse(false)
        }

        // Create new submission
        val submission = MetricSubmission(
            id = UUID.randomUUID(),
            tenantId = tenantId,
            submissionUuid = request.submissionUuid,
            reportingPeriodId = request.reportingPeriodId,
            siteId = request.siteId,
            metricDefinitionId = request.metricDefinitionId,
            value = request.value,
            unitOfMeasure = request.unitOfMeasure,
            metadata = request.metadata,
            state = SubmissionState.RECEIVED,
            submittedByUserId = userId,
            submittedAt = Instant.now(),
            rawData = objectMapper.writeValueAsString(request)
        )

        val saved = submissionRepository.persist(submission)

        // Cache for idempotency
        idempotencyCache.put(request.submissionUuid, saved, Duration.ofHours(24))

        // Audit log (async with CDI events)
        auditLogService.logSubmissionReceived(saved, userId)

        // Queue validation (Quarkus Reactive Messaging)
        validationEmitter.send(ValidationJobMessage(submissionId = saved.id))

        return saved.toResponse(true)
    }
}

Background Jobs

import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import java.time.Instant

@ApplicationScoped
class ValidationJobListener @Inject constructor(
    private val submissionRepository: SubmissionRepository,
    private val validationEngine: ValidationEngine,
    @Channel("processing-events") private val processingEmitter: Emitter<ProcessingJobMessage>
) {

    @Incoming("validation-queue")
    @Blocking
    @Transactional
    @Retry(
        maxRetries = 3,
        delay = 2000,
        maxDuration = 30000,
        jitter = 200
    )
    @Fallback(fallbackMethod = "handleValidationFailure")
    fun handleValidation(message: ValidationJobMessage) {
        val submission = submissionRepository.findByIdOptional(message.submissionId)
            .orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }

        val result = validationEngine.validate(submission)

        if (result.isValid) {
            submission.state = SubmissionState.VALIDATED
            submission.validatedAt = Instant.now()
            submissionRepository.persist(submission)

            // Queue processing (Quarkus Reactive Messaging)
            processingEmitter.send(ProcessingJobMessage(submissionId = submission.id))

            logger.info("Submission ${submission.id} validated successfully")
        } else {
            submission.state = SubmissionState.VALIDATION_FAILED
            submission.validationErrors = result.errors
            submissionRepository.persist(submission)

            logger.warn("Submission ${submission.id} validation failed: ${result.errors}")
        }
    }

    // Fallback method - called after all retries exhausted
    @Transactional
    private fun handleValidationFailure(message: ValidationJobMessage) {
        logger.error("Validation failed after all retries for submission: ${message.submissionId}")

        val submission = submissionRepository.findByIdOptional(message.submissionId).orElse(null)
        submission?.let {
            it.state = SubmissionState.FAILED
            it.errorMessage = "Validation failed after maximum retry attempts"
            submissionRepository.persist(it)
        }
    }
}

Cross-References


Change Log

Version Date Author Changes
1.0 2026-01-03 Senior Product Architect Initial ingestion pipeline specification
2.0 2026-01-11 Ralph Agent Comprehensive expansion with UUID deduplication, idempotency implementation, retry mechanisms, message queue architecture, DLQ management, monitoring, security, testing, and complete Kotlin/Spring Boot examples