Ingestion Pipeline: Idempotent Data Ingestion
Status: Final Version: 2.0
Table of Contents
- Purpose
- Pipeline Overview
- UUID-Based Deduplication
- Idempotency Implementation
- Pipeline Stages
- Error Recovery Procedures
- Retry Mechanisms
- Message Queue Architecture
- Dead Letter Queue Management
- Monitoring and Alerting
- Performance Considerations
- Security Requirements
- Testing Strategy
- Implementation Guide
Purpose
The ingestion pipeline is a mission-critical backend system that handles metric submissions from mobile collectors with:
- Idempotency Guarantees: Same submission UUID never creates duplicates
- UUID-Based Deduplication: Client-generated UUIDs prevent race conditions
- Automatic Retry: Failed jobs retry with exponential backoff
- Error Recovery: Dead letter queue captures permanent failures
- Tenant Isolation: Multi-tenant architecture with strict data isolation
- Audit Trail: Complete chain-of-custody for compliance
This pipeline ensures data integrity for ESG regulatory reporting where duplicate or lost submissions could result in compliance violations.
Pipeline Overview
High-Level Architecture
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ Mobile │──────▶│ API │──────▶│ Database │
│ Collector │◀──────│ Gateway │◀──────│ (Primary) │
└─────────────┘ └──────────────┘ └──────────────┘
│ │
▼ ▼
┌──────────────┐ ┌──────────────┐
│ Idempotency │ │ RabbitMQ │
│ Cache │ │ Queue │
│ (Redis) │ └──────────────┘
└──────────────┘ │
▼
┌──────────────┐
│ Validation │
│ Worker │
└──────────────┘
│
▼
┌──────────────┐
│ Processing │
│ Worker │
└──────────────┘
Data Flow Stages
1. HTTP Request Received
↓
2. Authentication & Authorization (JWT validation)
↓
3. Tenant Isolation Check (tenant_id from token)
↓
4. Idempotency Check (Redis cache lookup)
↓ (cache miss)
5. Database Transaction Begin
↓
6. UUID Deduplication Check (SELECT ... FOR UPDATE)
↓ (not exists)
7. Insert Raw Submission (state: RECEIVED)
↓
8. Queue Validation Job (RabbitMQ)
↓
9. Transaction Commit
↓
10. Return 201 Created Response
↓
11. ASYNC: Validation Job Executes
↓
12. ASYNC: Processing Job Executes
↓
13. ASYNC: Notification to Mobile App
If Duplicate Detected at Step 4 or 6:
Idempotency Cache Hit (Step 4) → Return 200 OK with existing submission
OR
Database UUID Exists (Step 6) → Return 200 OK with existing submission
UUID-Based Deduplication
Core Principle
Mobile app generates UUIDs (not the server). This enables: - Idempotent retries (same UUID = same submission) - Offline-first architecture (no server roundtrip needed) - Race condition prevention (unique constraint enforcement)
UUID Generation
Mobile (Kotlin/Android):
import java.util.UUID
data class SubmissionDraft(
val submissionUuid: UUID = UUID.randomUUID(), // Client-generated
val metricDefinitionId: UUID,
val reportingPeriodId: UUID,
val siteId: UUID,
val value: Double,
val unitOfMeasure: String,
val metadata: Map<String, Any> = emptyMap()
)
// Example usage in ViewModel
fun createDraft() {
val draft = SubmissionDraft(
submissionUuid = UUID.randomUUID(), // Generate once when draft created
metricDefinitionId = selectedMetric.id,
reportingPeriodId = selectedPeriod.id,
siteId = selectedSite.id,
value = 123.45,
unitOfMeasure = "kWh"
)
// Save to local database with this UUID
database.submissionDao().insert(draft)
}
CRITICAL: The UUID must be generated when the draft is created (not when submitted). This ensures the same UUID is used for all retry attempts.
Database Constraint
-- PostgreSQL migration
CREATE UNIQUE INDEX idx_metric_submissions_uuid_unique
ON metric_submissions (tenant_id, submission_uuid);
-- This prevents duplicates even if application-level checks fail
-- The database will enforce uniqueness via constraint violation
Why Composite Key (tenant_id, submission_uuid)? - Tenant isolation: Different tenants can use same UUID (extremely unlikely but possible) - Performance: Index on tenant_id is needed for all queries anyway - Security: Prevents cross-tenant UUID collision attacks
Deduplication Algorithm
// Quarkus Service (Kotlin)
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.fasterxml.jackson.databind.ObjectMapper
@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val idempotencyCache: IdempotencyCache,
@Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
private val auditLogService: AuditLogService,
private val objectMapper: ObjectMapper
) {
@Transactional
fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
// Step 1: Check idempotency cache (fast path)
idempotencyCache.get(request.submissionUuid)?.let { cachedSubmission ->
logger.info("Idempotency cache hit for UUID: ${request.submissionUuid}")
return SubmissionResponse.fromEntity(cachedSubmission)
}
// Step 2: Check database with pessimistic lock (prevents race conditions)
val existing = submissionRepository.findByTenantIdAndSubmissionUuidForUpdate(
tenantId = tenantId,
submissionUuid = request.submissionUuid
)
if (existing != null) {
// Duplicate detected in database
logger.info("Duplicate submission UUID detected: ${request.submissionUuid}")
// Cache for future requests (24-hour TTL)
idempotencyCache.put(request.submissionUuid, existing, Duration.ofHours(24))
return SubmissionResponse.fromEntity(existing)
}
// Step 3: Create new submission (no duplicate exists)
val submission = MetricSubmission(
id = UUID.randomUUID(), // Database primary key
tenantId = tenantId,
submissionUuid = request.submissionUuid, // Client-generated UUID
reportingPeriodId = request.reportingPeriodId,
siteId = request.siteId,
metricDefinitionId = request.metricDefinitionId,
value = request.value,
unitOfMeasure = request.unitOfMeasure,
metadata = request.metadata,
state = SubmissionState.RECEIVED,
submittedByUserId = userId,
submittedAt = Instant.now(),
rawData = objectMapper.writeValueAsString(request) // Store original request
)
// Step 4: Persist to database
val savedSubmission = submissionRepository.persist(submission)
// Step 5: Cache for idempotency
idempotencyCache.put(request.submissionUuid, savedSubmission, Duration.ofHours(24))
// Step 6: Audit log (async with CDI events)
auditLogService.logSubmissionReceived(savedSubmission, userId)
// Step 7: Queue validation job using Emitter (Quarkus Reactive Messaging)
validationEmitter.send(ValidationJobMessage(submissionId = savedSubmission.id))
logger.info("Submission ingested successfully: ${savedSubmission.id} (UUID: ${request.submissionUuid})")
return SubmissionResponse.fromEntity(savedSubmission)
}
}
Repository Implementation
import io.quarkus.hibernate.orm.panache.kotlin.PanacheRepositoryBase
import jakarta.enterprise.context.ApplicationScoped
import jakarta.persistence.LockModeType
@ApplicationScoped
class SubmissionRepository : PanacheRepositoryBase<MetricSubmission, UUID> {
/**
* Find submission by tenant and UUID with pessimistic write lock.
* This prevents race conditions when multiple requests arrive simultaneously.
*/
fun findByTenantIdAndSubmissionUuidForUpdate(
tenantId: UUID,
submissionUuid: UUID
): MetricSubmission? {
return find(
"tenantId = ?1 AND submissionUuid = ?2",
tenantId,
submissionUuid
)
.withLock(LockModeType.PESSIMISTIC_WRITE)
.firstResult()
}
}
Idempotency Implementation
Why Two-Level Idempotency?
- Redis Cache (Fast Path):
- O(1) lookup, ~1ms response time
- Handles 99% of duplicate requests (client retries)
-
24-hour TTL (covers typical retry window)
-
Database Check (Slow Path):
- O(log n) lookup via B-tree index, ~10ms response time
- Handles cache misses (first request, cache eviction, cold start)
- Permanent record (survives cache expiry)
Idempotency Cache Service
import io.quarkus.redis.datasource.RedisDataSource
import io.quarkus.redis.datasource.value.ValueCommands
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Duration
@ApplicationScoped
class IdempotencyCache @Inject constructor(
private val redisDataSource: RedisDataSource,
private val objectMapper: ObjectMapper
) {
private val cacheTTL = Duration.ofHours(24)
private val valueCommands: ValueCommands<String, String> = redisDataSource.value(String::class.java)
fun get(submissionUuid: UUID): MetricSubmission? {
val key = "idempotency:submission:$submissionUuid"
val cached = valueCommands.get(key) ?: return null
return try {
objectMapper.readValue(cached, MetricSubmission::class.java)
} catch (e: Exception) {
logger.error("Failed to deserialize cached submission", e)
null
}
}
fun put(submissionUuid: UUID, submission: MetricSubmission, ttl: Duration = cacheTTL) {
val key = "idempotency:submission:$submissionUuid"
val value = objectMapper.writeValueAsString(submission)
valueCommands.setex(key, ttl.seconds, value)
}
fun invalidate(submissionUuid: UUID) {
val key = "idempotency:submission:$submissionUuid"
valueCommands.getdel(key)
}
}
Cache Invalidation Strategy
When to invalidate: - Submission state changes (RECEIVED → VALIDATED → PROCESSED → APPROVED) - Submission is rejected or resubmitted - Admin manually modifies submission
Why invalidate? - Ensure clients get latest state on retry - Cache contains stale state after updates
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
@ApplicationScoped
class SubmissionStateService @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val idempotencyCache: IdempotencyCache
) {
@Transactional
fun updateState(submissionId: UUID, newState: SubmissionState, reason: String?) {
val submission = submissionRepository.findByIdOptional(submissionId)
.orElseThrow { NotFoundException("Submission not found: $submissionId") }
submission.state = newState
submission.updatedAt = Instant.now()
submissionRepository.persist(submission)
// Invalidate cache to force clients to fetch updated state
idempotencyCache.invalidate(submission.submissionUuid)
logger.info("Submission ${submission.id} state updated: $newState")
}
}
Pipeline Stages
Stage 1: HTTP Request Reception
Controller Endpoint:
import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.validation.Valid
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import io.quarkus.security.identity.SecurityIdentity
@Path("/api/v1/collector")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class CollectorSubmissionResource @Inject constructor(
private val ingestionService: SubmissionIngestionService,
private val securityIdentity: SecurityIdentity
) {
@POST
@Path("/submissions")
@RolesAllowed("collector.submissions.create")
fun submitMetric(
@Valid request: SubmissionRequest,
@HeaderParam("Idempotency-Key") idempotencyKey: String? // Optional header
): Response {
val principal = securityIdentity.principal as AuthenticatedUser
// Resolve tenant from JWT
val tenantId = principal.tenantId
val userId = principal.userId
// Ingest submission
val response = ingestionService.ingestSubmission(request, tenantId, userId)
// Return 201 Created for new submission, 200 OK for duplicate
val status = if (response.isNewSubmission) 201 else 200
return Response.status(status)
.header("X-Submission-Id", response.id.toString())
.header("X-Submission-UUID", response.submissionUuid.toString())
.entity(response)
.build()
}
}
Request Validation:
data class SubmissionRequest(
@field:NotNull(message = "submission_uuid is required")
val submissionUuid: UUID,
@field:NotNull(message = "reporting_period_id is required")
val reportingPeriodId: UUID,
@field:NotNull(message = "site_id is required")
val siteId: UUID,
@field:NotNull(message = "metric_definition_id is required")
val metricDefinitionId: UUID,
@field:NotNull(message = "value is required")
@field:DecimalMin(value = "0.0", message = "value must be non-negative")
val value: Double,
@field:NotBlank(message = "unit_of_measure is required")
val unitOfMeasure: String,
val metadata: Map<String, Any> = emptyMap(),
val evidenceFileIds: List<UUID> = emptyList()
)
Stage 2: Validation Job
Quarkus Reactive Messaging Consumer (SmallRye):
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class ValidationJobListener @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val validationEngine: ValidationEngine,
@Channel("processing-queue") private val processingEmitter: Emitter<ProcessingJobMessage>
) {
@Incoming("validation-queue")
@Blocking // Process on worker thread, not event loop
fun handleValidationJob(message: ValidationJobMessage) {
logger.info("Processing validation job for submission: ${message.submissionId}")
val submission = submissionRepository.findByIdOptional(message.submissionId)
.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
try {
// Run all 6 validation types
val validationResult = validationEngine.validate(submission)
if (validationResult.isValid) {
// Update state to VALIDATED
submission.state = SubmissionState.VALIDATED
submission.validatedAt = Instant.now()
submissionRepository.persist(submission)
// Queue processing job using Emitter
processingEmitter.send(ProcessingJobMessage(submissionId = submission.id))
logger.info("Submission ${submission.id} validated successfully")
} else {
// Validation failed
submission.state = SubmissionState.VALIDATION_FAILED
submission.validationErrors = validationResult.errors
submissionRepository.persist(submission)
logger.warn("Submission ${submission.id} validation failed: ${validationResult.errors}")
}
} catch (e: Exception) {
logger.error("Validation job failed for submission ${submission.id}", e)
throw e // Re-throw to trigger retry
}
}
}
Retry Configuration (application.properties):
# Quarkus Reactive Messaging with RabbitMQ connector
# Validation queue configuration
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.failure-strategy=dead-letter-queue
mp.messaging.incoming.validation-queue.dead-letter-queue.name=validation-dlq
mp.messaging.incoming.validation-queue.dead-letter-exchange.name=dlx-exchange
mp.messaging.incoming.validation-queue.max-retries=3
mp.messaging.incoming.validation-queue.retry-attempts=3
mp.messaging.incoming.validation-queue.retry-interval=2000
mp.messaging.incoming.validation-queue.retry-multiplier=2.0
mp.messaging.incoming.validation-queue.retry-max-interval=30000
# Processing queue configuration
mp.messaging.outgoing.processing-queue.connector=smallrye-rabbitmq
mp.messaging.outgoing.processing-queue.exchange.name=processing-exchange
mp.messaging.outgoing.processing-queue.exchange.type=topic
mp.messaging.outgoing.processing-queue.routing-keys=processing.submission.validated
mp.messaging.outgoing.processing-queue.default-routing-key=processing.submission.validated
mp.messaging.outgoing.processing-queue.durable=true
# RabbitMQ connection
rabbitmq-host=${RABBITMQ_HOST:localhost}
rabbitmq-port=${RABBITMQ_PORT:5672}
rabbitmq-username=${RABBITMQ_USERNAME:guest}
rabbitmq-password=${RABBITMQ_PASSWORD:guest}
rabbitmq-virtual-host=${RABBITMQ_VHOST:/}
# Profile-specific overrides
%dev.rabbitmq-host=localhost
%test.rabbitmq-host=localhost
SmallRye Fault Tolerance for Retry:
import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import jakarta.enterprise.context.ApplicationScoped
@ApplicationScoped
class ValidationService {
@Retry(
maxRetries = 3,
delay = 2000, // 2 seconds
maxDuration = 30000, // 30 seconds max
jitter = 200 // Random jitter to prevent thundering herd
)
@Fallback(fallbackMethod = "fallbackValidation")
fun validateSubmissionWithRetry(submissionId: UUID): ValidationResult {
return validationEngine.validate(submissionRepository.findByIdOptional(submissionId).orElseThrow())
}
private fun fallbackValidation(submissionId: UUID): ValidationResult {
logger.error("Validation failed after retries for submission: $submissionId")
return ValidationResult(isValid = false, errors = listOf("Validation service unavailable"))
}
}
Stage 3: Processing Job
Processing Job Listener (Quarkus Reactive Messaging):
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class ProcessingJobListener @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val calculationService: CalculationService,
private val notificationService: NotificationService
) {
@Incoming("processing-queue")
@Blocking // Process on worker thread for blocking I/O operations
fun handleProcessingJob(message: ProcessingJobMessage) {
logger.info("Processing job for submission: ${message.submissionId}")
val submission = submissionRepository.findByIdOptional(message.submissionId)
.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
try {
// Perform calculations (e.g., carbon emission factors)
val calculatedMetrics = calculationService.calculate(submission)
// Update submission with calculated values
submission.state = SubmissionState.PROCESSED
submission.calculatedValues = calculatedMetrics
submission.processedAt = Instant.now()
submissionRepository.persist(submission)
// Notify mobile app (async with virtual threads)
notificationService.notifySubmissionProcessed(submission)
logger.info("Submission ${submission.id} processed successfully")
} catch (e: Exception) {
logger.error("Processing job failed for submission ${submission.id}", e)
throw e // Re-throw to trigger retry
}
}
}
/**
* Alternative: Non-blocking reactive processing with Mutiny
*/
@ApplicationScoped
class ReactiveProcessingJobListener @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val calculationService: CalculationService,
private val notificationService: NotificationService
) {
@Incoming("processing-queue")
fun handleProcessingJobReactive(message: ProcessingJobMessage): Uni<Void> {
logger.info("Processing job for submission: ${message.submissionId}")
return submissionRepository.findByIdOptional(message.submissionId)
.map { submission ->
submission.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
}
.onItem().transformToUni { submission ->
// Perform calculations asynchronously
calculationService.calculateAsync(submission)
.onItem().transform { calculatedMetrics ->
submission.state = SubmissionState.PROCESSED
submission.calculatedValues = calculatedMetrics
submission.processedAt = Instant.now()
submission
}
}
.onItem().transformToUni { submission ->
submissionRepository.persist(submission)
.onItem().transformToUni {
// Notify mobile app asynchronously
notificationService.notifySubmissionProcessedAsync(submission)
}
}
.onItem().invoke {
logger.info("Submission ${message.submissionId} processed successfully")
}
.onFailure().invoke { e ->
logger.error("Processing job failed for submission ${message.submissionId}", e)
}
.replaceWithVoid()
}
}
Error Recovery Procedures
Scenario 1: Network Timeout During Upload
Problem: Mobile app sends POST request, but network drops before response received.
Client Behavior:
// Mobile app (Kotlin)
suspend fun uploadSubmission(submission: SubmissionDraft): Result<SubmissionResponse> {
return withContext(Dispatchers.IO) {
retryWithExponentialBackoff(
maxRetries = 5,
initialDelay = 2000L,
maxDelay = 32000L
) {
try {
// Use SAME UUID for all retries
val response = apiService.submitMetric(submission)
// Mark as uploaded locally
database.submissionDao().updateState(
submissionUuid = submission.submissionUuid,
state = SubmissionState.RECEIVED,
serverId = response.id
)
Result.success(response)
} catch (e: SocketTimeoutException) {
// Network timeout - retry with same UUID
logger.warn("Network timeout for submission ${submission.submissionUuid}, retrying...")
throw e // Trigger retry
}
}
}
}
Server Behavior: - First request: Creates submission, returns 201 Created - Retry requests: Idempotency check returns existing submission, returns 200 OK - Outcome: No duplicate submission created
Scenario 2: Validation Job Fails (Transient Error)
Problem: Validation job encounters transient error (e.g., database connection timeout).
Retry Strategy: - Attempt 1: Immediate execution - Attempt 2: After 2 seconds (exponential backoff) - Attempt 3: After 4 seconds - After 3 failures: Move to Dead Letter Queue
Code (Quarkus Fault Tolerance):
import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class ValidationJobListener @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val validationEngine: ValidationEngine,
private val adminNotificationService: AdminNotificationService
) {
@Incoming("validation-queue")
@Blocking
@Retry(
maxRetries = 3,
delay = 2000, // Initial delay: 2 seconds
maxDuration = 30000, // Max duration: 30 seconds
jitter = 200, // Random jitter: 200ms
retryOn = [TransientDataAccessException::class, TimeoutException::class]
)
@Fallback(fallbackMethod = "recoverFromValidationFailure")
fun handleValidationJob(message: ValidationJobMessage) {
logger.info("Processing validation job for submission: ${message.submissionId}")
val submission = submissionRepository.findByIdOptional(message.submissionId)
.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
// Validation logic
val result = validationEngine.validate(submission)
if (result.isValid) {
submission.state = SubmissionState.VALIDATED
submission.validatedAt = Instant.now()
submissionRepository.persist(submission)
} else {
submission.state = SubmissionState.VALIDATION_FAILED
submission.validationErrors = result.errors
submissionRepository.persist(submission)
}
}
// Fallback method - called after all retries exhausted
private fun recoverFromValidationFailure(message: ValidationJobMessage) {
logger.error("Validation job failed after 3 attempts: ${message.submissionId}")
// Update submission state
val submission = submissionRepository.findByIdOptional(message.submissionId).orElse(null)
submission?.let {
it.state = SubmissionState.FAILED
it.errorMessage = "Validation failed after maximum retry attempts"
submissionRepository.persist(it)
}
// Notify admin
adminNotificationService.notifyValidationFailure(message.submissionId,
RuntimeException("Max retries exceeded"))
}
}
Scenario 3: Database Deadlock
Problem: Multiple concurrent requests cause database deadlock.
Detection:
try {
submissionRepository.save(submission)
} catch (e: CannotAcquireLockException) {
// PostgreSQL deadlock detected
logger.warn("Deadlock detected for submission ${submission.submissionUuid}, rolling back")
throw ServiceUnavailableException(
code = "SYSTEM_DEADLOCK",
message = "Database is temporarily busy. Please retry in a few seconds.",
retryAfter = 5 // seconds
)
}
Client Behavior:
// Mobile app handles 503 Service Unavailable
if (response.statusCode == 503) {
val retryAfter = response.headers["Retry-After"]?.toIntOrNull() ?: 5
delay(retryAfter * 1000L)
// Retry with SAME UUID
return uploadSubmission(submission)
}
Scenario 4: Duplicate Submission from Multiple Devices
Problem: User logs in on two devices, both upload same submission.
Prevention: - Mobile app generates UUID when draft is created (not when submitted) - Both devices use SAME UUID (stored in local database) - Server idempotency check prevents duplicate
Flow:
Device A: POST /submissions (UUID: abc-123) → Server creates submission
Device B: POST /submissions (UUID: abc-123) → Server returns existing submission (200 OK)
Retry Mechanisms
Client-Side Retry (Mobile App)
Exponential Backoff Implementation:
suspend fun <T> retryWithExponentialBackoff(
maxRetries: Int = 5,
initialDelay: Long = 2000L, // 2 seconds
maxDelay: Long = 32000L, // 32 seconds
factor: Double = 2.0,
block: suspend () -> T
): T {
var currentDelay = initialDelay
repeat(maxRetries) { attempt ->
try {
return block()
} catch (e: Exception) {
if (attempt == maxRetries - 1) throw e
// Log retry attempt
logger.info("Retry attempt ${attempt + 1}/$maxRetries after ${currentDelay}ms")
// Wait before retry
delay(currentDelay)
// Increase delay exponentially
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
}
}
throw IllegalStateException("Retry logic failed")
}
Retry Decision Matrix:
| Error Type | HTTP Status | Retry? | Max Retries | Notes |
|---|---|---|---|---|
| Network timeout | N/A | ✅ Yes | 5 | Use exponential backoff |
| DNS failure | N/A | ✅ Yes | 3 | May indicate network issue |
| SSL error | N/A | ❌ No | 0 | Certificate problem |
| 400 Bad Request | 400 | ❌ No | 0 | Client error, fix request |
| 401 Unauthorized | 401 | ✅ Yes | 1 | Refresh token, retry once |
| 403 Forbidden | 403 | ❌ No | 0 | Permission denied |
| 404 Not Found | 404 | ❌ No | 0 | Resource doesn't exist |
| 409 Conflict | 409 | ❌ No | 0 | Duplicate detected |
| 422 Validation | 422 | ❌ No | 0 | Invalid data |
| 429 Rate Limit | 429 | ✅ Yes | 3 | Respect Retry-After header |
| 500 Internal Error | 500 | ✅ Yes | 5 | Server error, retry |
| 503 Unavailable | 503 | ✅ Yes | 5 | Server overload, retry |
Server-Side Retry (Background Jobs)
Quarkus Reactive Messaging Configuration (application.properties):
# Validation queue with retry and DLQ
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.validation-queue.queue.x-dead-letter-routing-key=validation.dlq
mp.messaging.incoming.validation-queue.queue.x-message-ttl=300000
mp.messaging.incoming.validation-queue.failure-strategy=dead-letter-queue
mp.messaging.incoming.validation-queue.dead-letter-queue.exchange.name=dlx-exchange
mp.messaging.incoming.validation-queue.dead-letter-queue.routing-key=validation.dlq
mp.messaging.incoming.validation-queue.max-retries=3
# Processing queue with retry and DLQ
mp.messaging.incoming.processing-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-queue.queue.name=processing-queue
mp.messaging.incoming.processing-queue.queue.durable=true
mp.messaging.incoming.processing-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.processing-queue.queue.x-dead-letter-routing-key=processing.dlq
mp.messaging.incoming.processing-queue.queue.x-message-ttl=600000
mp.messaging.incoming.processing-queue.failure-strategy=dead-letter-queue
Job Retry Logic with Acknowledgment Strategy:
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Message
import io.smallrye.reactive.messaging.rabbitmq.IncomingRabbitMQMessage
import jakarta.enterprise.context.ApplicationScoped
import java.util.concurrent.CompletionStage
import java.util.concurrent.CompletableFuture
@ApplicationScoped
class ValidationJobProcessor @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val validationEngine: ValidationEngine
) {
@Incoming("validation-queue")
fun process(message: Message<ValidationJobMessage>): CompletionStage<Void> {
val payload = message.payload
return try {
// Process validation
validateSubmission(payload.submissionId)
// Acknowledge success
message.ack()
} catch (e: TransientDataAccessException) {
// Transient error - NACK to trigger retry
logger.warn("Transient error, will retry: ${e.message}")
message.nack(e) // Quarkus will requeue based on failure-strategy
CompletableFuture.completedFuture(null)
} catch (e: PermanentValidationException) {
// Permanent error - acknowledge but mark as failed
logger.error("Permanent error, moving to DLQ: ${e.message}")
message.nack(e) // Will go to DLQ after retries exhausted
CompletableFuture.completedFuture(null)
}
}
private fun validateSubmission(submissionId: UUID) {
val submission = submissionRepository.findByIdOptional(submissionId)
.orElseThrow { NotFoundException("Submission not found: $submissionId") }
val result = validationEngine.validate(submission)
if (result.isValid) {
submission.state = SubmissionState.VALIDATED
submission.validatedAt = Instant.now()
} else {
submission.state = SubmissionState.VALIDATION_FAILED
submission.validationErrors = result.errors
}
submissionRepository.persist(submission)
}
}
Message Queue Architecture
Queue Topology
┌──────────────────┐
│ validation- │
│ exchange │──┐
│ (topic) │ │
└──────────────────┘ │
├──▶ validation-queue ──▶ ValidationWorker
│ (3 retries)
│
└──▶ validation-dlq ──▶ ManualReview
(no retry)
┌──────────────────┐
│ processing- │
│ exchange │──┐
│ (topic) │ │
└──────────────────┘ │
├──▶ processing-queue ──▶ ProcessingWorker
│ (3 retries)
│
└──▶ processing-dlq ──▶ ManualReview
(no retry)
Queue Configuration
Validation Queue (application.properties):
# Validation incoming channel (consumer)
mp.messaging.incoming.validation-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-queue.queue.name=validation-queue
mp.messaging.incoming.validation-queue.queue.durable=true
mp.messaging.incoming.validation-queue.queue.declare=true
mp.messaging.incoming.validation-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.validation-queue.queue.x-dead-letter-routing-key=validation.dlq
mp.messaging.incoming.validation-queue.queue.x-message-ttl=300000
mp.messaging.incoming.validation-queue.queue.x-max-priority=10
mp.messaging.incoming.validation-queue.exchange.name=validation-exchange
mp.messaging.incoming.validation-queue.exchange.type=topic
mp.messaging.incoming.validation-queue.routing-keys=validation.#
# Validation outgoing channel (producer)
mp.messaging.outgoing.validation-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.validation-events.exchange.name=validation-exchange
mp.messaging.outgoing.validation-events.exchange.type=topic
mp.messaging.outgoing.validation-events.exchange.durable=true
mp.messaging.outgoing.validation-events.default-routing-key=validation.submission.received
Processing Queue (application.properties):
# Processing incoming channel
mp.messaging.incoming.processing-queue.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-queue.queue.name=processing-queue
mp.messaging.incoming.processing-queue.queue.durable=true
mp.messaging.incoming.processing-queue.queue.declare=true
mp.messaging.incoming.processing-queue.queue.x-dead-letter-exchange=dlx-exchange
mp.messaging.incoming.processing-queue.queue.x-dead-letter-routing-key=processing.dlq
mp.messaging.incoming.processing-queue.queue.x-message-ttl=600000
mp.messaging.incoming.processing-queue.exchange.name=processing-exchange
mp.messaging.incoming.processing-queue.exchange.type=topic
mp.messaging.incoming.processing-queue.routing-keys=processing.#
# Processing outgoing channel
mp.messaging.outgoing.processing-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.processing-events.exchange.name=processing-exchange
mp.messaging.outgoing.processing-events.exchange.type=topic
mp.messaging.outgoing.processing-events.exchange.durable=true
mp.messaging.outgoing.processing-events.default-routing-key=processing.submission.validated
Priority Queuing
Use Case: Urgent submissions (e.g., near deadline) should process first.
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.reactive.messaging.Message
import io.smallrye.reactive.messaging.rabbitmq.OutgoingRabbitMQMetadata
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class SubmissionQueueService @Inject constructor(
@Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>
) {
fun queueValidationJob(submission: MetricSubmission) {
val priority = if (submission.isUrgent) 9 else 5
// Create message with priority metadata
val metadata = OutgoingRabbitMQMetadata.builder()
.withPriority(priority)
.withRoutingKey("validation.submission.received")
.build()
val message = Message.of(ValidationJobMessage(submissionId = submission.id))
.addMetadata(metadata)
validationEmitter.send(message)
logger.info("Queued validation job for submission ${submission.id} with priority $priority")
}
}
Dead Letter Queue Management
DLQ Purpose
Dead Letter Queue (DLQ) captures messages that: - Failed after max retries (3 attempts) - Expired (TTL exceeded) - Rejected by consumer (permanent errors)
DLQ Configuration
application.properties:
# Dead Letter Exchange
mp.messaging.outgoing.dlx-events.connector=smallrye-rabbitmq
mp.messaging.outgoing.dlx-events.exchange.name=dlx-exchange
mp.messaging.outgoing.dlx-events.exchange.type=direct
mp.messaging.outgoing.dlx-events.exchange.durable=true
# Validation DLQ
mp.messaging.incoming.validation-dlq.connector=smallrye-rabbitmq
mp.messaging.incoming.validation-dlq.queue.name=validation-dlq
mp.messaging.incoming.validation-dlq.queue.durable=true
mp.messaging.incoming.validation-dlq.queue.declare=true
mp.messaging.incoming.validation-dlq.queue.x-queue-mode=lazy
mp.messaging.incoming.validation-dlq.exchange.name=dlx-exchange
mp.messaging.incoming.validation-dlq.exchange.type=direct
mp.messaging.incoming.validation-dlq.routing-keys=validation.dlq
# Processing DLQ
mp.messaging.incoming.processing-dlq.connector=smallrye-rabbitmq
mp.messaging.incoming.processing-dlq.queue.name=processing-dlq
mp.messaging.incoming.processing-dlq.queue.durable=true
mp.messaging.incoming.processing-dlq.queue.declare=true
mp.messaging.incoming.processing-dlq.queue.x-queue-mode=lazy
mp.messaging.incoming.processing-dlq.exchange.name=dlx-exchange
mp.messaging.incoming.processing-dlq.exchange.type=direct
mp.messaging.incoming.processing-dlq.routing-keys=processing.dlq
DLQ Consumer (for monitoring/reprocessing):
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Message
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class ValidationDLQConsumer @Inject constructor(
private val alertService: AlertService,
private val auditLogService: AuditLogService
) {
@Incoming("validation-dlq")
@Blocking
fun handleDLQMessage(message: Message<ValidationJobMessage>): CompletionStage<Void> {
val payload = message.payload
logger.error("Message in validation DLQ: ${payload.submissionId}")
// Log to audit trail
auditLogService.logDLQMessage(
queueName = "validation-queue",
submissionId = payload.submissionId,
timestamp = Instant.now()
)
// Send alert
alertService.sendAlert(
severity = AlertSeverity.HIGH,
message = "Submission ${payload.submissionId} failed validation after max retries",
tags = listOf("dlq", "validation")
)
return message.ack()
}
}
DLQ Monitoring
Scheduled Job to Check DLQ Depth (Quarkus Scheduler):
import io.quarkus.scheduler.Scheduled
import com.rabbitmq.client.ConnectionFactory
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import org.eclipse.microprofile.config.inject.ConfigProperty
@ApplicationScoped
class DLQMonitor @Inject constructor(
private val alertService: AlertService,
@ConfigProperty(name = "rabbitmq-host") private val rabbitHost: String,
@ConfigProperty(name = "rabbitmq-port") private val rabbitPort: Int,
@ConfigProperty(name = "rabbitmq-username") private val rabbitUsername: String,
@ConfigProperty(name = "rabbitmq-password") private val rabbitPassword: String
) {
@Scheduled(every = "60s") // Every minute
fun checkDLQDepth() {
val factory = ConnectionFactory().apply {
host = rabbitHost
port = rabbitPort
username = rabbitUsername
password = rabbitPassword
}
factory.newConnection().use { connection ->
connection.createChannel().use { channel ->
// Get validation DLQ depth
val validationDLQDepth = try {
channel.queueDeclarePassive("validation-dlq").messageCount
} catch (e: Exception) {
logger.warn("Failed to check validation DLQ depth", e)
0
}
// Get processing DLQ depth
val processingDLQDepth = try {
channel.queueDeclarePassive("processing-dlq").messageCount
} catch (e: Exception) {
logger.warn("Failed to check processing DLQ depth", e)
0
}
// Alert if DLQ depth exceeds threshold
if (validationDLQDepth > 10) {
alertService.sendAlert(
severity = AlertSeverity.HIGH,
message = "Validation DLQ depth is $validationDLQDepth (threshold: 10)",
tags = listOf("dlq", "validation")
)
}
if (processingDLQDepth > 10) {
alertService.sendAlert(
severity = AlertSeverity.HIGH,
message = "Processing DLQ depth is $processingDLQDepth (threshold: 10)",
tags = listOf("dlq", "processing")
)
}
logger.debug("DLQ depths - validation: $validationDLQDepth, processing: $processingDLQDepth")
}
}
}
}
Manual DLQ Reprocessing
Admin Endpoint to Retry Failed Jobs (JAX-RS):
import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.rabbitmq.client.ConnectionFactory
import com.fasterxml.jackson.databind.ObjectMapper
import org.eclipse.microprofile.config.inject.ConfigProperty
@Path("/api/v1/admin/dlq")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
class DLQManagementResource @Inject constructor(
@Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
private val objectMapper: ObjectMapper,
@ConfigProperty(name = "rabbitmq-host") private val rabbitHost: String,
@ConfigProperty(name = "rabbitmq-port") private val rabbitPort: Int,
@ConfigProperty(name = "rabbitmq-username") private val rabbitUsername: String,
@ConfigProperty(name = "rabbitmq-password") private val rabbitPassword: String
) {
@POST
@Path("/validation/retry/{submissionId}")
@RolesAllowed("admin.dlq.retry")
fun retryValidationJob(@PathParam("submissionId") submissionId: UUID): Response {
// Re-queue message from DLQ
validationEmitter.send(ValidationJobMessage(submissionId = submissionId))
logger.info("Re-queued validation job for submission: $submissionId")
return Response.accepted().build()
}
@GET
@Path("/validation/list")
@RolesAllowed("admin.dlq.view")
fun listValidationDLQ(): Response {
val messages = mutableListOf<DLQMessage>()
val factory = ConnectionFactory().apply {
host = rabbitHost
port = rabbitPort
username = rabbitUsername
password = rabbitPassword
}
factory.newConnection().use { connection ->
connection.createChannel().use { channel ->
var messageCount = channel.queueDeclarePassive("validation-dlq").messageCount
while (messageCount > 0 && messages.size < 100) {
val response = channel.basicGet("validation-dlq", false)
if (response != null) {
val message = objectMapper.readValue(
response.body,
ValidationJobMessage::class.java
)
messages.add(DLQMessage(
submissionId = message.submissionId,
deliveryTag = response.envelope.deliveryTag,
redelivered = response.envelope.isRedeliver
))
// NACK the message to keep it in the queue
channel.basicNack(response.envelope.deliveryTag, false, true)
}
messageCount--
}
}
}
return Response.ok(messages).build()
}
}
data class DLQMessage(
val submissionId: UUID,
val deliveryTag: Long,
val redelivered: Boolean
)
Monitoring and Alerting
Key Metrics to Monitor
- Ingestion Rate: Submissions received per minute
- Idempotency Cache Hit Rate: % of requests served from cache
- Validation Success Rate: % of submissions passing validation
- Processing Success Rate: % of submissions processed successfully
- Queue Depth: Number of messages in validation/processing queues
- DLQ Depth: Number of messages in dead letter queues
- Average Processing Time: Time from RECEIVED to PROCESSED
- Error Rate: 4xx and 5xx responses per minute
Metrics Implementation
Micrometer Metrics:
@ApplicationScoped
class SubmissionIngestionService(
private val meterRegistry: MeterRegistry
) {
private val ingestCounter = meterRegistry.counter("submissions.ingested", "status", "success")
private val duplicateCounter = meterRegistry.counter("submissions.duplicate")
private val errorCounter = meterRegistry.counter("submissions.ingested", "status", "error")
private val ingestionTimer = meterRegistry.timer("submissions.ingestion.duration")
@Transactional
fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
return ingestionTimer.recordCallable {
try {
val response = doIngest(request, tenantId, userId)
if (response.isNewSubmission) {
ingestCounter.increment()
} else {
duplicateCounter.increment()
}
response
} catch (e: Exception) {
errorCounter.increment()
throw e
}
}!!
}
}
Alerting Rules
Prometheus Alerts:
groups:
- name: ingestion_pipeline
interval: 30s
rules:
- alert: HighErrorRate
expr: rate(submissions_ingested_total{status="error"}[5m]) > 0.05
for: 5m
labels:
severity: critical
annotations:
summary: "High error rate in ingestion pipeline"
description: "Error rate is {{ $value }} (threshold: 0.05)"
- alert: HighDLQDepth
expr: rabbitmq_queue_messages{queue=~".*-dlq"} > 10
for: 2m
labels:
severity: high
annotations:
summary: "Dead letter queue depth exceeds threshold"
description: "DLQ {{ $labels.queue }} has {{ $value }} messages"
- alert: LowValidationSuccessRate
expr: rate(validation_success_total[5m]) / rate(validation_total[5m]) < 0.90
for: 10m
labels:
severity: warning
annotations:
summary: "Validation success rate below 90%"
description: "Success rate is {{ $value }}"
- alert: SlowProcessing
expr: histogram_quantile(0.95, rate(submissions_ingestion_duration_seconds_bucket[5m])) > 5
for: 10m
labels:
severity: warning
annotations:
summary: "95th percentile ingestion time exceeds 5 seconds"
description: "P95 latency is {{ $value }}s"
Logging Strategy
Structured Logging:
@ApplicationScoped
class SubmissionIngestionService(
private val logger: KLogger = KotlinLogging.logger {}
) {
fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
logger.info {
mapOf(
"event" to "submission_ingestion_started",
"submissionUuid" to request.submissionUuid,
"tenantId" to tenantId,
"userId" to userId,
"metricDefinitionId" to request.metricDefinitionId
)
}
// ... ingestion logic ...
logger.info {
mapOf(
"event" to "submission_ingestion_completed",
"submissionUuid" to request.submissionUuid,
"submissionId" to response.id,
"isNewSubmission" to response.isNewSubmission,
"durationMs" to duration
)
}
return response
}
}
Performance Considerations
Database Optimization
Indexes:
-- Primary index for deduplication checks
CREATE UNIQUE INDEX idx_submissions_uuid
ON metric_submissions (tenant_id, submission_uuid);
-- Index for queue job lookups
CREATE INDEX idx_submissions_state
ON metric_submissions (tenant_id, state, created_at);
-- Index for audit log queries
CREATE INDEX idx_audit_logs_submission
ON submission_audit_logs (submission_id, created_at DESC);
-- Partial index for active queue
CREATE INDEX idx_submissions_queue
ON metric_submissions (tenant_id, created_at)
WHERE state IN ('RECEIVED', 'VALIDATED');
Connection Pooling:
# application.properties
# Quarkus uses Agroal connection pool by default
# Datasource configuration
quarkus.datasource.db-kind=postgresql
quarkus.datasource.username=${DB_USERNAME:esg_user}
quarkus.datasource.password=${DB_PASSWORD}
quarkus.datasource.jdbc.url=jdbc:postgresql://${DB_HOST:localhost}:${DB_PORT:5432}/${DB_NAME:esg_db}
# Agroal connection pool settings
quarkus.datasource.jdbc.max-size=20
quarkus.datasource.jdbc.min-size=5
quarkus.datasource.jdbc.acquisition-timeout=30s
quarkus.datasource.jdbc.idle-removal-interval=10m
quarkus.datasource.jdbc.max-lifetime=30m
# Profile-specific overrides
%dev.quarkus.datasource.jdbc.max-size=5
%test.quarkus.datasource.jdbc.max-size=10
Caching Strategy
Redis Configuration:
# Quarkus Redis Client (quarkus-redis-client extension)
quarkus.redis.hosts=redis://${REDIS_HOST:redis.internal}:${REDIS_PORT:6379}
quarkus.redis.password=${REDIS_PASSWORD}
quarkus.redis.timeout=2s
quarkus.redis.max-pool-size=20
quarkus.redis.max-waiting-handlers=10
# Profile-specific Redis configuration
%dev.quarkus.redis.hosts=redis://localhost:6379
%dev.quarkus.redis.password=
%test.quarkus.redis.hosts=redis://localhost:6379
Redis Client Usage:
@ApplicationScoped
class IdempotencyCacheService(
private val redisClient: RedisDataSource
) {
fun checkIdempotency(idempotencyKey: String): Boolean {
val commands = redisClient.value(String::class.java)
return commands.get(idempotencyKey) != null
}
fun cacheSubmission(idempotencyKey: String, submissionId: UUID, ttl: Duration) {
val commands = redisClient.value(String::class.java)
commands.setex(idempotencyKey, ttl.seconds, submissionId.toString())
}
}
Cache Hit Rate Target: > 95% for idempotency cache
Async Processing
All validation and processing should be async: - HTTP response time: < 200ms (just save to database) - Validation time: 1-5 seconds (async job) - Processing time: 5-30 seconds (async job)
Quarkus Async Patterns:
1. Reactive Messaging (SmallRye) - Recommended for message-driven async:
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
private val submissionRepository: MetricSubmissionRepository,
@Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>
) {
@Transactional
fun ingestSubmission(request: SubmissionRequest, tenantId: UUID, userId: UUID): SubmissionResponse {
// Synchronous: Save to database (fast)
val submission = submissionRepository.persist(newSubmission)
// Asynchronous: Queue validation job using Emitter (don't wait)
validationEmitter.send(ValidationJobMessage(submissionId = submission.id))
// Return immediately
return SubmissionResponse.fromEntity(submission)
}
}
2. Virtual Threads (@RunOnVirtualThread) - For blocking I/O operations:
import io.quarkus.virtual.threads.VirtualThreads
import io.smallrye.common.annotation.RunOnVirtualThread
import jakarta.enterprise.context.ApplicationScoped
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
@Path("/api/v1/reports")
@Produces(MediaType.APPLICATION_JSON)
class ReportGenerationResource @Inject constructor(
private val reportService: ReportService
) {
@GET
@Path("/{reportId}/generate")
@RunOnVirtualThread // Runs on virtual thread - can block without issue
fun generateReport(@PathParam("reportId") reportId: UUID): Report {
// This can perform blocking I/O operations
// Virtual threads are lightweight and numerous (millions possible)
return reportService.generateBlockingReport(reportId)
}
}
@ApplicationScoped
class ReportService {
@RunOnVirtualThread
fun generateBlockingReport(reportId: UUID): Report {
// Blocking operations are fine on virtual threads
val data = fetchDataFromDatabase() // Blocks
val processed = processData(data) // Blocks
return createReport(processed) // Blocks
}
}
3. Mutiny (Reactive Streams) - For reactive, non-blocking operations:
import io.smallrye.mutiny.Uni
import io.smallrye.mutiny.Multi
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
@ApplicationScoped
class ReactiveSubmissionService @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val calculationService: CalculationService
) {
// Uni<T> - Single async result (like CompletableFuture)
fun processSubmissionAsync(submissionId: UUID): Uni<SubmissionResult> {
return submissionRepository.findByIdOptional(submissionId)
.onItem().ifNull().failWith { NotFoundException("Submission not found") }
.onItem().transformToUni { submission ->
// Chain async operations
calculationService.calculateAsync(submission)
.onItem().transform { calculatedMetrics ->
submission.calculatedValues = calculatedMetrics
submission.state = SubmissionState.PROCESSED
submission
}
}
.onItem().transformToUni { submission ->
submissionRepository.persist(submission)
}
.onItem().transform { submission ->
SubmissionResult.fromEntity(submission)
}
}
// Multi<T> - Stream of async results
fun streamSubmissionsAsync(tenantId: UUID): Multi<Submission> {
return submissionRepository.streamByTenantId(tenantId)
.onItem().transform { submission ->
// Process each item asynchronously
enrichSubmissionData(submission)
}
}
}
4. CDI Events (@ObservesAsync) - For async event handling:
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.event.Event
import jakarta.enterprise.event.ObservesAsync
import jakarta.inject.Inject
// Event payload
data class SubmissionReceivedEvent(
val submissionId: UUID,
val tenantId: UUID,
val timestamp: Instant
)
@ApplicationScoped
class SubmissionEventPublisher @Inject constructor(
private val event: Event<SubmissionReceivedEvent>
) {
fun publishSubmissionReceived(submission: MetricSubmission) {
// Fire event asynchronously
event.fireAsync(SubmissionReceivedEvent(
submissionId = submission.id,
tenantId = submission.tenantId,
timestamp = Instant.now()
))
}
}
@ApplicationScoped
class SubmissionEventListener @Inject constructor(
private val notificationService: NotificationService,
private val analyticsService: AnalyticsService
) {
// Async observer - runs on different thread
fun onSubmissionReceived(@ObservesAsync event: SubmissionReceivedEvent) {
// Send notification
notificationService.notifyDataTeam(event.submissionId)
// Update analytics
analyticsService.trackSubmission(event.tenantId, event.timestamp)
logger.info("Processed submission received event: ${event.submissionId}")
}
}
5. ManagedExecutor - For custom thread pools:
import jakarta.enterprise.context.ApplicationScoped
import jakarta.enterprise.inject.Produces
import org.eclipse.microprofile.context.ManagedExecutor
import org.eclipse.microprofile.context.ThreadContext
import java.util.concurrent.CompletableFuture
@ApplicationScoped
class ExecutorConfiguration {
@Produces
@ApplicationScoped
fun createManagedExecutor(): ManagedExecutor {
return ManagedExecutor.builder()
.maxAsync(50) // Max concurrent tasks
.maxQueued(100) // Max queued tasks
.build()
}
}
@ApplicationScoped
class AsyncReportService @Inject constructor(
private val executor: ManagedExecutor
) {
fun generateReportAsync(reportId: UUID): CompletableFuture<Report> {
return executor.supplyAsync {
// Runs on managed thread pool
// Context propagation handled automatically
generateReport(reportId)
}
}
}
Pattern Selection Guide: | Use Case | Pattern | When to Use | |----------|---------|-------------| | Message queues | Reactive Messaging | RabbitMQ, Kafka, AMQP | | Blocking I/O | @RunOnVirtualThread | Database calls, file I/O, external APIs | | Reactive pipelines | Mutiny Uni/Multi | Non-blocking chains, streams | | Event bus | CDI @ObservesAsync | Internal application events | | Custom threading | ManagedExecutor | Fine-grained thread pool control |
Security Requirements
Tenant Isolation
CRITICAL: Every query must include tenant_id filter.
Enforcement via Hibernate Filter:
@Entity
@Table(name = "metric_submissions")
@FilterDef(name = "tenantFilter", parameters = [ParamDef(name = "tenantId", type = "uuid")])
@Filter(name = "tenantFilter", condition = "tenant_id = :tenantId")
class MetricSubmission(
@Id
val id: UUID,
@Column(name = "tenant_id", nullable = false)
val tenantId: UUID,
// ... other fields
)
Activate Filter on Every Request (JAX-RS ContainerRequestFilter):
import jakarta.ws.rs.container.ContainerRequestContext
import jakarta.ws.rs.container.ContainerRequestFilter
import jakarta.ws.rs.container.ContainerResponseContext
import jakarta.ws.rs.container.ContainerResponseFilter
import jakarta.ws.rs.ext.Provider
import jakarta.persistence.EntityManager
import jakarta.inject.Inject
import io.quarkus.security.identity.SecurityIdentity
@Provider
class TenantIsolationFilter @Inject constructor(
private val entityManager: EntityManager,
private val securityIdentity: SecurityIdentity
) : ContainerRequestFilter, ContainerResponseFilter {
override fun filter(requestContext: ContainerRequestContext) {
if (securityIdentity.isAnonymous) {
return
}
// Extract tenant ID from authenticated user
val tenantId = securityIdentity.getAttribute<UUID>("tenantId")
if (tenantId != null) {
// Enable Hibernate filter for tenant isolation
val session = entityManager.unwrap(org.hibernate.Session::class.java)
session.enableFilter("tenantFilter").setParameter("tenantId", tenantId)
logger.debug("Enabled tenant filter for tenant: $tenantId")
}
}
override fun filter(
requestContext: ContainerRequestContext,
responseContext: ContainerResponseContext
) {
// Disable filter after request completes
if (!securityIdentity.isAnonymous) {
val session = entityManager.unwrap(org.hibernate.Session::class.java)
session.disableFilter("tenantFilter")
}
}
}
Idempotency Cache Security
Prevent cross-tenant access:
fun getCacheKey(submissionUuid: UUID, tenantId: UUID): String {
return "idempotency:$tenantId:$submissionUuid"
}
Audit Logging
Log all ingestion events:
@ApplicationScoped
class AuditLogService {
fun logSubmissionReceived(submission: MetricSubmission, userId: UUID) {
auditLogRepository.save(SubmissionAuditLog(
id = UUID.randomUUID(),
submissionId = submission.id,
tenantId = submission.tenantId,
action = AuditAction.SUBMISSION_RECEIVED,
actorUserId = userId,
timestamp = Instant.now(),
ipAddress = getCurrentRequest().remoteAddr,
userAgent = getCurrentRequest().getHeader("User-Agent"),
changes = objectMapper.writeValueAsString(submission)
))
}
}
Testing Strategy
Unit Tests
Test Idempotency:
@Test
fun `test duplicate submission returns existing submission`() {
val submissionUuid = UUID.randomUUID()
val tenantId = UUID.randomUUID()
val userId = UUID.randomUUID()
// First submission
val request1 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
val response1 = ingestionService.ingestSubmission(request1, tenantId, userId)
assertThat(response1.isNewSubmission).isTrue()
// Duplicate submission (same UUID)
val request2 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
val response2 = ingestionService.ingestSubmission(request2, tenantId, userId)
assertThat(response2.isNewSubmission).isFalse()
assertThat(response2.id).isEqualTo(response1.id)
// Verify only one row in database
val count = submissionRepository.countByTenantIdAndSubmissionUuid(tenantId, submissionUuid)
assertThat(count).isEqualTo(1)
}
Test Tenant Isolation:
@Test
fun `test tenant isolation prevents cross-tenant access`() {
val submissionUuid = UUID.randomUUID()
val tenant1 = UUID.randomUUID()
val tenant2 = UUID.randomUUID()
// Tenant 1 creates submission
val request1 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
ingestionService.ingestSubmission(request1, tenant1, UUID.randomUUID())
// Tenant 2 tries to create submission with same UUID (should succeed - different tenant)
val request2 = SubmissionRequest(submissionUuid = submissionUuid, /* ... */)
val response2 = ingestionService.ingestSubmission(request2, tenant2, UUID.randomUUID())
assertThat(response2.isNewSubmission).isTrue()
// Verify two rows exist (one per tenant)
val count = submissionRepository.countBySubmissionUuid(submissionUuid)
assertThat(count).isEqualTo(2)
}
Integration Tests
Test End-to-End Flow:
@QuarkusTest
@TestProfile(TestProfile.class)
class IngestionPipelineIntegrationTest {
@Inject
lateinit var restClient: RestClient
@Test
fun `test submission flows through entire pipeline`() {
// Step 1: Submit metric
val request = SubmissionRequest(submissionUuid = UUID.randomUUID(), /* ... */)
val response = given()
.contentType(ContentType.JSON)
.body(request)
.`when`()
.post("/api/v1/collector/submissions")
.then()
.statusCode(201)
.extract()
.`as`(SubmissionResponse::class.java)
val submissionId = response.id
// Step 2: Wait for validation job to complete (async)
await().atMost(Duration.ofSeconds(10)).until {
val submission = submissionRepository.findById(submissionId).orElse(null)
submission?.state == SubmissionState.VALIDATED
}
// Step 3: Wait for processing job to complete
await().atMost(Duration.ofSeconds(10)).until {
val submission = submissionRepository.findById(submissionId).orElse(null)
submission?.state == SubmissionState.PROCESSED
}
// Step 4: Verify final state
val finalSubmission = submissionRepository.findById(submissionId).orElseThrow()
assertThat(finalSubmission.state).isEqualTo(SubmissionState.PROCESSED)
assertThat(finalSubmission.calculatedValues).isNotNull()
}
}
Load Tests
Test Throughput:
@Test
fun `test ingestion handles 1000 concurrent submissions`() {
val tenantId = UUID.randomUUID()
val latch = CountDownLatch(1000)
val executor = Executors.newFixedThreadPool(100)
repeat(1000) { i ->
executor.submit {
try {
val request = SubmissionRequest(submissionUuid = UUID.randomUUID(), /* ... */)
ingestionService.ingestSubmission(request, tenantId, UUID.randomUUID())
} finally {
latch.countDown()
}
}
}
// Wait for all submissions to complete (max 60 seconds)
val completed = latch.await(60, TimeUnit.SECONDS)
assertThat(completed).isTrue()
// Verify all submissions were created
val count = submissionRepository.countByTenantId(tenantId)
assertThat(count).isEqualTo(1000)
}
Implementation Guide
Quarkus JAX-RS Resource
import jakarta.annotation.security.RolesAllowed
import jakarta.inject.Inject
import jakarta.validation.Valid
import jakarta.ws.rs.*
import jakarta.ws.rs.core.MediaType
import jakarta.ws.rs.core.Response
import io.quarkus.security.identity.SecurityIdentity
@Path("/api/v1/collector")
@Produces(MediaType.APPLICATION_JSON)
@Consumes(MediaType.APPLICATION_JSON)
@RolesAllowed("collector.submissions.create")
class CollectorSubmissionResource @Inject constructor(
private val ingestionService: SubmissionIngestionService,
private val securityIdentity: SecurityIdentity
) {
@POST
@Path("/submissions")
fun submitMetric(@Valid request: SubmissionRequest): Response {
// Extract tenant and user from security context
val tenantId = securityIdentity.getAttribute<UUID>("tenantId")
?: throw ForbiddenException("Tenant ID not found in security context")
val userId = securityIdentity.getAttribute<UUID>("userId")
?: throw ForbiddenException("User ID not found in security context")
val response = ingestionService.ingestSubmission(request, tenantId, userId)
val status = if (response.isNewSubmission) Response.Status.CREATED else Response.Status.OK
return Response.status(status)
.header("X-Submission-Id", response.id.toString())
.header("X-Submission-UUID", response.submissionUuid.toString())
.entity(response)
.build()
}
}
Service Layer
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import com.fasterxml.jackson.databind.ObjectMapper
import java.time.Duration
import java.time.Instant
import java.util.UUID
@ApplicationScoped
class SubmissionIngestionService @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val idempotencyCache: IdempotencyCache,
@Channel("validation-events") private val validationEmitter: Emitter<ValidationJobMessage>,
private val auditLogService: AuditLogService,
private val objectMapper: ObjectMapper
) {
@Transactional
fun ingestSubmission(
request: SubmissionRequest,
tenantId: UUID,
userId: UUID
): SubmissionResponse {
// Check idempotency cache
idempotencyCache.get(request.submissionUuid)?.let { return it.toResponse(false) }
// Check database with pessimistic lock
val existing = submissionRepository.findByTenantIdAndSubmissionUuidForUpdate(
tenantId, request.submissionUuid
)
if (existing != null) {
idempotencyCache.put(request.submissionUuid, existing, Duration.ofHours(24))
return existing.toResponse(false)
}
// Create new submission
val submission = MetricSubmission(
id = UUID.randomUUID(),
tenantId = tenantId,
submissionUuid = request.submissionUuid,
reportingPeriodId = request.reportingPeriodId,
siteId = request.siteId,
metricDefinitionId = request.metricDefinitionId,
value = request.value,
unitOfMeasure = request.unitOfMeasure,
metadata = request.metadata,
state = SubmissionState.RECEIVED,
submittedByUserId = userId,
submittedAt = Instant.now(),
rawData = objectMapper.writeValueAsString(request)
)
val saved = submissionRepository.persist(submission)
// Cache for idempotency
idempotencyCache.put(request.submissionUuid, saved, Duration.ofHours(24))
// Audit log (async with CDI events)
auditLogService.logSubmissionReceived(saved, userId)
// Queue validation (Quarkus Reactive Messaging)
validationEmitter.send(ValidationJobMessage(submissionId = saved.id))
return saved.toResponse(true)
}
}
Background Jobs
import io.smallrye.reactive.messaging.annotations.Blocking
import org.eclipse.microprofile.reactive.messaging.Incoming
import org.eclipse.microprofile.reactive.messaging.Channel
import org.eclipse.microprofile.reactive.messaging.Emitter
import org.eclipse.microprofile.faulttolerance.Retry
import org.eclipse.microprofile.faulttolerance.Fallback
import jakarta.enterprise.context.ApplicationScoped
import jakarta.inject.Inject
import jakarta.transaction.Transactional
import java.time.Instant
@ApplicationScoped
class ValidationJobListener @Inject constructor(
private val submissionRepository: SubmissionRepository,
private val validationEngine: ValidationEngine,
@Channel("processing-events") private val processingEmitter: Emitter<ProcessingJobMessage>
) {
@Incoming("validation-queue")
@Blocking
@Transactional
@Retry(
maxRetries = 3,
delay = 2000,
maxDuration = 30000,
jitter = 200
)
@Fallback(fallbackMethod = "handleValidationFailure")
fun handleValidation(message: ValidationJobMessage) {
val submission = submissionRepository.findByIdOptional(message.submissionId)
.orElseThrow { NotFoundException("Submission not found: ${message.submissionId}") }
val result = validationEngine.validate(submission)
if (result.isValid) {
submission.state = SubmissionState.VALIDATED
submission.validatedAt = Instant.now()
submissionRepository.persist(submission)
// Queue processing (Quarkus Reactive Messaging)
processingEmitter.send(ProcessingJobMessage(submissionId = submission.id))
logger.info("Submission ${submission.id} validated successfully")
} else {
submission.state = SubmissionState.VALIDATION_FAILED
submission.validationErrors = result.errors
submissionRepository.persist(submission)
logger.warn("Submission ${submission.id} validation failed: ${result.errors}")
}
}
// Fallback method - called after all retries exhausted
@Transactional
private fun handleValidationFailure(message: ValidationJobMessage) {
logger.error("Validation failed after all retries for submission: ${message.submissionId}")
val submission = submissionRepository.findByIdOptional(message.submissionId).orElse(null)
submission?.let {
it.state = SubmissionState.FAILED
it.errorMessage = "Validation failed after maximum retry attempts"
submissionRepository.persist(it)
}
}
}
Cross-References
- Collector Workflow - Mobile app submission flow
- Validation Engine - 6 validation types
- Collector API - HTTP endpoint specifications
- Error Handling - Error codes and responses
- Data Model - Database schema
Change Log
| Version | Date | Author | Changes |
|---|---|---|---|
| 1.0 | 2026-01-03 | Senior Product Architect | Initial ingestion pipeline specification |
| 2.0 | 2026-01-11 | Ralph Agent | Comprehensive expansion with UUID deduplication, idempotency implementation, retry mechanisms, message queue architecture, DLQ management, monitoring, security, testing, and complete Kotlin/Spring Boot examples |