AI Prompt: Forma3D.Connect — Phase 5c: Critical Technical Debt Resolution¶
Purpose: This prompt instructs an AI to resolve critical technical debt items identified in the technical debt register
Estimated Effort: 2-3 days (~10-15 hours)
Prerequisites: Phase 5b completed (Domain Boundary Separation)
Output: Database-backed webhook idempotency with automatic cleanup, updated documentation, passing tests
Status: 🟡 PENDING
🎯 Mission¶
You are continuing development of Forma3D.Connect, building on the Phase 5b foundation. Your task is to implement Phase 5c: Critical Technical Debt Resolution — specifically addressing TD-001 (In-Memory Webhook Idempotency Cache) from the technical debt register.
Why This Matters:
The current implementation uses an in-memory Set<string> for webhook idempotency tracking:
// apps/api/src/shopify/shopify.service.ts:12
private readonly processedWebhooks = new Set<string>();
Critical Problems:
- Horizontal Scaling Failure: In a multi-instance deployment, each API instance has its own cache. Webhooks may be processed multiple times across instances.
- Memory Leak: The Set grows unbounded as webhooks are processed, causing memory pressure in long-running instances.
- Restart Data Loss: All idempotency data is lost on application restart, allowing duplicate processing during restarts.
Phase 5c delivers:
- Database-backed webhook idempotency (no Redis required)
- Automatic cleanup of expired webhook records
- Scheduled cleanup job with configurable retention
- Updated documentation
- All existing tests continue to pass
📋 Context: Technical Debt Item¶
TD-001: In-Memory Webhook Idempotency Cache¶
| Attribute | Value |
|---|---|
| Type | Architecture Debt |
| Priority | Critical |
| Location | apps/api/src/shopify/shopify.service.ts:12 |
| Interest Rate | High (risk of duplicate order processing) |
| Principal (Effort) | 2-3 days |
Chosen Solution: Database Table¶
Per project requirements, we implement the Database Table solution instead of Redis:
| Option | Pros | Cons | Selected |
|---|---|---|---|
| Redis Cache | Distributed, TTL support, fast | Additional infrastructure | No |
| Database Table | Simple, no new infra, transactional | Requires cleanup job | Yes |
| Distributed Lock | Works with existing DB | More complex implementation | No |
🛠️ Implementation Phases¶
Phase 1: Database Schema (1 hour)¶
Priority: Critical | Impact: Critical | Dependencies: None
1. Update Prisma Schema¶
Add the ProcessedWebhook model to prisma/schema.prisma:
// ============================================================================
// WEBHOOK IDEMPOTENCY (Phase 5c: Critical Technical Debt)
// ============================================================================
model ProcessedWebhook {
id String @id @default(uuid())
webhookId String @unique // The Shopify webhook ID (X-Shopify-Webhook-Id)
webhookType String // e.g., "orders/create", "orders/updated"
processedAt DateTime @default(now())
expiresAt DateTime // When this record can be cleaned up
// Optional metadata for debugging
orderId String? // Associated order if applicable
@@index([expiresAt]) // For cleanup job queries
@@index([processedAt]) // For monitoring/debugging
}
2. Create Migration¶
Run the Prisma migration:
pnpm prisma migrate dev --name add_processed_webhook
Expected migration SQL:
-- CreateTable
CREATE TABLE "ProcessedWebhook" (
"id" TEXT NOT NULL,
"webhookId" TEXT NOT NULL,
"webhookType" TEXT NOT NULL,
"processedAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP,
"expiresAt" TIMESTAMP(3) NOT NULL,
"orderId" TEXT,
CONSTRAINT "ProcessedWebhook_pkey" PRIMARY KEY ("id")
);
-- CreateIndex
CREATE UNIQUE INDEX "ProcessedWebhook_webhookId_key" ON "ProcessedWebhook"("webhookId");
-- CreateIndex
CREATE INDEX "ProcessedWebhook_expiresAt_idx" ON "ProcessedWebhook"("expiresAt");
-- CreateIndex
CREATE INDEX "ProcessedWebhook_processedAt_idx" ON "ProcessedWebhook"("processedAt");
Phase 2: Webhook Idempotency Repository (1 hour)¶
Priority: Critical | Impact: Critical | Dependencies: Phase 1
1. Create Repository¶
Create apps/api/src/shopify/webhook-idempotency.repository.ts:
import { Injectable, Logger } from '@nestjs/common';
import { PrismaService } from '../database/prisma.service';
export interface WebhookIdempotencyRecord {
webhookId: string;
webhookType: string;
processedAt: Date;
expiresAt: Date;
orderId?: string;
}
@Injectable()
export class WebhookIdempotencyRepository {
private readonly logger = new Logger(WebhookIdempotencyRepository.name);
constructor(private readonly prisma: PrismaService) {}
/**
* Check if a webhook has already been processed.
* Uses a unique constraint on webhookId for race-condition safety.
*
* @param webhookId - The Shopify webhook ID from X-Shopify-Webhook-Id header
* @param webhookType - The webhook type (e.g., "orders/create")
* @param ttlHours - Hours until the record expires (default: 24)
* @param orderId - Optional order ID for debugging
* @returns true if webhook was already processed, false if this is first processing
*/
async isProcessedOrMark(
webhookId: string,
webhookType: string,
ttlHours = 24,
orderId?: string,
): Promise<boolean> {
const expiresAt = new Date(Date.now() + ttlHours * 60 * 60 * 1000);
try {
// Try to insert - if it fails due to unique constraint, webhook was already processed
await this.prisma.processedWebhook.create({
data: {
webhookId,
webhookType,
expiresAt,
orderId,
},
});
this.logger.debug(`Marked webhook ${webhookId} as processed`);
return false; // Not previously processed, now marked
} catch (error) {
// Prisma unique constraint violation code
if ((error as { code?: string }).code === 'P2002') {
this.logger.debug(`Webhook ${webhookId} already processed, skipping`);
return true; // Already processed
}
// Re-throw unexpected errors
throw error;
}
}
/**
* Check if a webhook has been processed without marking it.
* Useful for read-only checks.
*
* @param webhookId - The Shopify webhook ID
* @returns true if processed, false otherwise
*/
async isProcessed(webhookId: string): Promise<boolean> {
const record = await this.prisma.processedWebhook.findUnique({
where: { webhookId },
select: { id: true },
});
return record !== null;
}
/**
* Delete expired webhook records.
* Called by the cleanup scheduled job.
*
* @returns Number of records deleted
*/
async deleteExpired(): Promise<number> {
const result = await this.prisma.processedWebhook.deleteMany({
where: {
expiresAt: {
lt: new Date(),
},
},
});
if (result.count > 0) {
this.logger.log(`Cleaned up ${result.count} expired webhook records`);
}
return result.count;
}
/**
* Get statistics about processed webhooks.
* Useful for monitoring and debugging.
*/
async getStats(): Promise<{
total: number;
last24Hours: number;
expiringSoon: number;
}> {
const now = new Date();
const last24Hours = new Date(now.getTime() - 24 * 60 * 60 * 1000);
const next1Hour = new Date(now.getTime() + 60 * 60 * 1000);
const [total, recentCount, expiringCount] = await Promise.all([
this.prisma.processedWebhook.count(),
this.prisma.processedWebhook.count({
where: { processedAt: { gte: last24Hours } },
}),
this.prisma.processedWebhook.count({
where: { expiresAt: { lt: next1Hour, gte: now } },
}),
]);
return {
total,
last24Hours: recentCount,
expiringSoon: expiringCount,
};
}
}
2. Create Repository Tests¶
Create apps/api/src/shopify/__tests__/webhook-idempotency.repository.spec.ts:
import { Test, TestingModule } from '@nestjs/testing';
import { WebhookIdempotencyRepository } from '../webhook-idempotency.repository';
import { PrismaService } from '../../database/prisma.service';
import { createMock, DeepMocked } from '@golevelup/ts-jest';
describe('WebhookIdempotencyRepository', () => {
let repository: WebhookIdempotencyRepository;
let prisma: DeepMocked<PrismaService>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
WebhookIdempotencyRepository,
{
provide: PrismaService,
useValue: createMock<PrismaService>(),
},
],
}).compile();
repository = module.get<WebhookIdempotencyRepository>(
WebhookIdempotencyRepository,
);
prisma = module.get(PrismaService);
// Mock the processedWebhook property
prisma.processedWebhook = {
create: jest.fn(),
findUnique: jest.fn(),
deleteMany: jest.fn(),
count: jest.fn(),
} as unknown as typeof prisma.processedWebhook;
});
afterEach(() => {
jest.clearAllMocks();
});
describe('isProcessedOrMark', () => {
it('should return false and create record for new webhook', async () => {
(prisma.processedWebhook.create as jest.Mock).mockResolvedValue({
id: 'uuid',
webhookId: 'webhook-123',
webhookType: 'orders/create',
processedAt: new Date(),
expiresAt: new Date(),
});
const result = await repository.isProcessedOrMark(
'webhook-123',
'orders/create',
);
expect(result).toBe(false);
expect(prisma.processedWebhook.create).toHaveBeenCalledWith({
data: expect.objectContaining({
webhookId: 'webhook-123',
webhookType: 'orders/create',
}),
});
});
it('should return true for already processed webhook', async () => {
const uniqueError = new Error('Unique constraint violation');
(uniqueError as Error & { code: string }).code = 'P2002';
(prisma.processedWebhook.create as jest.Mock).mockRejectedValue(
uniqueError,
);
const result = await repository.isProcessedOrMark(
'webhook-123',
'orders/create',
);
expect(result).toBe(true);
});
it('should re-throw unexpected errors', async () => {
const unexpectedError = new Error('Database connection failed');
(prisma.processedWebhook.create as jest.Mock).mockRejectedValue(
unexpectedError,
);
await expect(
repository.isProcessedOrMark('webhook-123', 'orders/create'),
).rejects.toThrow('Database connection failed');
});
it('should include orderId when provided', async () => {
(prisma.processedWebhook.create as jest.Mock).mockResolvedValue({});
await repository.isProcessedOrMark(
'webhook-123',
'orders/create',
24,
'order-456',
);
expect(prisma.processedWebhook.create).toHaveBeenCalledWith({
data: expect.objectContaining({
orderId: 'order-456',
}),
});
});
it('should calculate expiry time based on TTL', async () => {
(prisma.processedWebhook.create as jest.Mock).mockResolvedValue({});
const beforeCall = Date.now();
await repository.isProcessedOrMark('webhook-123', 'orders/create', 48);
const afterCall = Date.now();
const createCall = (prisma.processedWebhook.create as jest.Mock).mock
.calls[0][0];
const expiresAt = createCall.data.expiresAt as Date;
// Expiry should be ~48 hours from now
const expectedMinExpiry = beforeCall + 48 * 60 * 60 * 1000;
const expectedMaxExpiry = afterCall + 48 * 60 * 60 * 1000;
expect(expiresAt.getTime()).toBeGreaterThanOrEqual(expectedMinExpiry);
expect(expiresAt.getTime()).toBeLessThanOrEqual(expectedMaxExpiry);
});
});
describe('isProcessed', () => {
it('should return true when webhook exists', async () => {
(prisma.processedWebhook.findUnique as jest.Mock).mockResolvedValue({
id: 'uuid',
});
const result = await repository.isProcessed('webhook-123');
expect(result).toBe(true);
expect(prisma.processedWebhook.findUnique).toHaveBeenCalledWith({
where: { webhookId: 'webhook-123' },
select: { id: true },
});
});
it('should return false when webhook does not exist', async () => {
(prisma.processedWebhook.findUnique as jest.Mock).mockResolvedValue(null);
const result = await repository.isProcessed('webhook-123');
expect(result).toBe(false);
});
});
describe('deleteExpired', () => {
it('should delete expired records and return count', async () => {
(prisma.processedWebhook.deleteMany as jest.Mock).mockResolvedValue({
count: 5,
});
const result = await repository.deleteExpired();
expect(result).toBe(5);
expect(prisma.processedWebhook.deleteMany).toHaveBeenCalledWith({
where: {
expiresAt: {
lt: expect.any(Date),
},
},
});
});
it('should return 0 when no records expired', async () => {
(prisma.processedWebhook.deleteMany as jest.Mock).mockResolvedValue({
count: 0,
});
const result = await repository.deleteExpired();
expect(result).toBe(0);
});
});
describe('getStats', () => {
it('should return webhook statistics', async () => {
(prisma.processedWebhook.count as jest.Mock)
.mockResolvedValueOnce(100) // total
.mockResolvedValueOnce(25) // last24Hours
.mockResolvedValueOnce(10); // expiringSoon
const stats = await repository.getStats();
expect(stats).toEqual({
total: 100,
last24Hours: 25,
expiringSoon: 10,
});
expect(prisma.processedWebhook.count).toHaveBeenCalledTimes(3);
});
});
});
Phase 3: Webhook Cleanup Service (1 hour)¶
Priority: High | Impact: High | Dependencies: Phase 2
1. Create Cleanup Service¶
Create apps/api/src/shopify/webhook-cleanup.service.ts:
import { Injectable, Logger, OnModuleInit } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { WebhookIdempotencyRepository } from './webhook-idempotency.repository';
import { EventLogService } from '../event-log/event-log.service';
@Injectable()
export class WebhookCleanupService implements OnModuleInit {
private readonly logger = new Logger(WebhookCleanupService.name);
constructor(
private readonly webhookIdempotencyRepository: WebhookIdempotencyRepository,
private readonly eventLogService: EventLogService,
) {}
/**
* Run initial cleanup on module initialization.
* This cleans up any expired records from before the last restart.
*/
async onModuleInit(): Promise<void> {
this.logger.log('Running initial webhook cleanup on startup');
await this.cleanupExpiredWebhooks();
}
/**
* Scheduled cleanup job - runs every hour.
* Removes webhook records that have passed their expiry time.
*
* The hourly schedule balances:
* - Timely cleanup (records removed within ~1 hour of expiry)
* - Database load (one cleanup query per hour)
* - Memory usage (prevents unbounded table growth)
*/
@Cron(CronExpression.EVERY_HOUR)
async cleanupExpiredWebhooks(): Promise<void> {
try {
const deletedCount = await this.webhookIdempotencyRepository.deleteExpired();
if (deletedCount > 0) {
await this.eventLogService.log({
eventType: 'system.webhook_cleanup',
severity: 'INFO',
message: `Cleaned up ${deletedCount} expired webhook idempotency records`,
metadata: {
deletedCount,
timestamp: new Date().toISOString(),
},
});
}
} catch (error) {
this.logger.error('Failed to cleanup expired webhooks', error);
await this.eventLogService.log({
eventType: 'system.webhook_cleanup_failed',
severity: 'ERROR',
message: 'Failed to cleanup expired webhook records',
metadata: {
error: (error as Error).message,
},
});
}
}
/**
* Get current cleanup statistics.
* Useful for health checks and monitoring dashboards.
*/
async getCleanupStats(): Promise<{
total: number;
last24Hours: number;
expiringSoon: number;
status: 'healthy' | 'warning' | 'critical';
}> {
const stats = await this.webhookIdempotencyRepository.getStats();
// Determine health status based on total records
// Warning if > 10,000 records, critical if > 50,000
let status: 'healthy' | 'warning' | 'critical' = 'healthy';
if (stats.total > 50000) {
status = 'critical';
} else if (stats.total > 10000) {
status = 'warning';
}
return {
...stats,
status,
};
}
}
2. Create Cleanup Service Tests¶
Create apps/api/src/shopify/__tests__/webhook-cleanup.service.spec.ts:
import { Test, TestingModule } from '@nestjs/testing';
import { WebhookCleanupService } from '../webhook-cleanup.service';
import { WebhookIdempotencyRepository } from '../webhook-idempotency.repository';
import { EventLogService } from '../../event-log/event-log.service';
import { createMock, DeepMocked } from '@golevelup/ts-jest';
describe('WebhookCleanupService', () => {
let service: WebhookCleanupService;
let repository: DeepMocked<WebhookIdempotencyRepository>;
let eventLogService: DeepMocked<EventLogService>;
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
WebhookCleanupService,
{
provide: WebhookIdempotencyRepository,
useValue: createMock<WebhookIdempotencyRepository>(),
},
{
provide: EventLogService,
useValue: createMock<EventLogService>(),
},
],
}).compile();
service = module.get<WebhookCleanupService>(WebhookCleanupService);
repository = module.get(WebhookIdempotencyRepository);
eventLogService = module.get(EventLogService);
});
afterEach(() => {
jest.clearAllMocks();
});
describe('onModuleInit', () => {
it('should run cleanup on startup', async () => {
repository.deleteExpired.mockResolvedValue(0);
await service.onModuleInit();
expect(repository.deleteExpired).toHaveBeenCalled();
});
});
describe('cleanupExpiredWebhooks', () => {
it('should log when records are deleted', async () => {
repository.deleteExpired.mockResolvedValue(5);
await service.cleanupExpiredWebhooks();
expect(eventLogService.log).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'system.webhook_cleanup',
severity: 'INFO',
metadata: expect.objectContaining({
deletedCount: 5,
}),
}),
);
});
it('should not log when no records deleted', async () => {
repository.deleteExpired.mockResolvedValue(0);
await service.cleanupExpiredWebhooks();
expect(eventLogService.log).not.toHaveBeenCalled();
});
it('should log error on failure', async () => {
repository.deleteExpired.mockRejectedValue(new Error('DB error'));
await service.cleanupExpiredWebhooks();
expect(eventLogService.log).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'system.webhook_cleanup_failed',
severity: 'ERROR',
}),
);
});
});
describe('getCleanupStats', () => {
it('should return healthy status for low record count', async () => {
repository.getStats.mockResolvedValue({
total: 500,
last24Hours: 50,
expiringSoon: 10,
});
const stats = await service.getCleanupStats();
expect(stats.status).toBe('healthy');
});
it('should return warning status for medium record count', async () => {
repository.getStats.mockResolvedValue({
total: 15000,
last24Hours: 1000,
expiringSoon: 500,
});
const stats = await service.getCleanupStats();
expect(stats.status).toBe('warning');
});
it('should return critical status for high record count', async () => {
repository.getStats.mockResolvedValue({
total: 60000,
last24Hours: 5000,
expiringSoon: 2000,
});
const stats = await service.getCleanupStats();
expect(stats.status).toBe('critical');
});
});
});
Phase 4: Update Shopify Service (1 hour)¶
Priority: Critical | Impact: Critical | Dependencies: Phase 2
1. Refactor ShopifyService¶
Update apps/api/src/shopify/shopify.service.ts:
import { Injectable, Logger } from '@nestjs/common';
import { OrdersService } from '../orders/orders.service';
import { ProductMappingsService } from '../product-mappings/product-mappings.service';
import { EventLogService } from '../event-log/event-log.service';
import { WebhookIdempotencyRepository } from './webhook-idempotency.repository';
import { OrderWebhookPayload } from './dto/shopify-webhook.dto';
import { ShopifyOrder, ShopifyLineItem } from '@forma3d/domain';
import { Decimal } from '@prisma/client/runtime/library';
/**
* Default TTL for webhook idempotency records in hours.
* 24 hours provides sufficient protection against Shopify's retry behavior
* while keeping storage requirements reasonable.
*/
const WEBHOOK_IDEMPOTENCY_TTL_HOURS = 24;
@Injectable()
export class ShopifyService {
private readonly logger = new Logger(ShopifyService.name);
constructor(
private readonly ordersService: OrdersService,
private readonly productMappingsService: ProductMappingsService,
private readonly eventLogService: EventLogService,
private readonly webhookIdempotencyRepository: WebhookIdempotencyRepository,
) {}
async handleOrderCreated(
payload: OrderWebhookPayload,
webhookId: string,
): Promise<void> {
// Database-backed idempotency check (race-condition safe)
const alreadyProcessed = await this.webhookIdempotencyRepository.isProcessedOrMark(
webhookId,
'orders/create',
WEBHOOK_IDEMPOTENCY_TTL_HOURS,
String(payload.id),
);
if (alreadyProcessed) {
this.logger.debug(`Webhook ${webhookId} already processed, skipping`);
return;
}
// Skip test orders in production
if (payload.test) {
this.logger.debug(`Skipping test order ${payload.name}`);
return;
}
// Extract SKUs and check for unmapped products
const skus = payload.line_items
.map((item) => item.sku)
.filter((sku): sku is string => sku !== null);
const unmappedSkus =
await this.productMappingsService.findUnmappedSkus(skus);
if (unmappedSkus.length > 0) {
this.logger.warn(
`Order ${payload.name} contains unmapped SKUs: ${unmappedSkus.join(', ')}`,
);
await this.eventLogService.log({
eventType: 'order.unmapped_products',
severity: 'WARNING',
message: `Order ${payload.name} contains unmapped products`,
metadata: {
shopifyOrderId: String(payload.id),
unmappedSkus,
},
});
}
// Create order
const order = await this.ordersService.createFromShopify(
{
shopifyOrderId: String(payload.id),
shopifyOrderNumber: payload.name,
customerName: this.extractCustomerName(payload),
customerEmail: payload.email || undefined,
shippingAddress: JSON.parse(
JSON.stringify(payload.shipping_address || {}),
),
totalPrice: new Decimal(payload.total_price),
currency: payload.currency,
lineItems: payload.line_items.map((item) => this.mapLineItem(item)),
},
webhookId,
);
this.logger.log(
`Processed order created webhook for ${order.shopifyOrderNumber}`,
);
}
async handleOrderUpdated(
payload: OrderWebhookPayload,
webhookId: string,
): Promise<void> {
// Idempotency check for update webhooks
const alreadyProcessed = await this.webhookIdempotencyRepository.isProcessedOrMark(
webhookId,
'orders/updated',
WEBHOOK_IDEMPOTENCY_TTL_HOURS,
String(payload.id),
);
if (alreadyProcessed) {
this.logger.debug(`Webhook ${webhookId} already processed, skipping`);
return;
}
const existingOrder = await this.ordersService.findByShopifyOrderId(
String(payload.id),
);
if (!existingOrder) {
// Order doesn't exist yet, treat as create
this.logger.debug(
`Order ${payload.id} not found, treating update as create`,
);
// Note: This will create its own idempotency record with 'orders/create' type
return this.handleOrderCreated(payload, `${webhookId}-as-create`);
}
// Log the update
await this.eventLogService.log({
orderId: existingOrder.id,
eventType: 'order.updated',
severity: 'INFO',
message: `Order updated in Shopify`,
metadata: {
webhookId,
financialStatus: payload.financial_status,
fulfillmentStatus: payload.fulfillment_status,
},
});
this.logger.log(
`Processed order update webhook for ${existingOrder.shopifyOrderNumber}`,
);
}
async handleOrderCancelled(
payload: OrderWebhookPayload,
webhookId: string,
): Promise<void> {
// Idempotency check for cancellation webhooks
const alreadyProcessed = await this.webhookIdempotencyRepository.isProcessedOrMark(
webhookId,
'orders/cancelled',
WEBHOOK_IDEMPOTENCY_TTL_HOURS,
String(payload.id),
);
if (alreadyProcessed) {
this.logger.debug(`Webhook ${webhookId} already processed, skipping`);
return;
}
const existingOrder = await this.ordersService.findByShopifyOrderId(
String(payload.id),
);
if (!existingOrder) {
this.logger.warn(
`Cannot cancel order ${payload.id} - not found in database`,
);
return;
}
await this.ordersService.cancelOrder(
existingOrder.id,
`Cancelled in Shopify at ${payload.cancelled_at}`,
);
this.logger.log(
`Processed order cancellation webhook for ${existingOrder.shopifyOrderNumber}`,
);
}
async handleOrderFulfilled(
payload: OrderWebhookPayload,
webhookId: string,
): Promise<void> {
// Idempotency check for fulfillment webhooks
const alreadyProcessed = await this.webhookIdempotencyRepository.isProcessedOrMark(
webhookId,
'orders/fulfilled',
WEBHOOK_IDEMPOTENCY_TTL_HOURS,
String(payload.id),
);
if (alreadyProcessed) {
this.logger.debug(`Webhook ${webhookId} already processed, skipping`);
return;
}
const existingOrder = await this.ordersService.findByShopifyOrderId(
String(payload.id),
);
if (!existingOrder) {
this.logger.warn(
`Cannot process fulfillment for order ${payload.id} - not found`,
);
return;
}
// This handles external fulfillments (not through our system)
await this.eventLogService.log({
orderId: existingOrder.id,
eventType: 'order.fulfilled_externally',
severity: 'INFO',
message: `Order fulfilled externally in Shopify`,
metadata: {
webhookId,
fulfillments: payload.fulfillments,
},
});
this.logger.log(
`Processed external fulfillment webhook for ${existingOrder.shopifyOrderNumber}`,
);
}
private extractCustomerName(order: ShopifyOrder): string {
if (order.customer) {
const firstName = order.customer.first_name || '';
const lastName = order.customer.last_name || '';
const fullName = `${firstName} ${lastName}`.trim();
if (fullName) return fullName;
}
if (order.shipping_address) {
const firstName = order.shipping_address.first_name || '';
const lastName = order.shipping_address.last_name || '';
const fullName = `${firstName} ${lastName}`.trim();
if (fullName) return fullName;
}
return 'Unknown Customer';
}
private mapLineItem(item: ShopifyLineItem) {
return {
shopifyLineItemId: String(item.id),
productSku: item.sku || `NOSKU-${item.id}`,
productName: item.title,
variantTitle: item.variant_title || undefined,
quantity: item.quantity,
unitPrice: new Decimal(item.price),
};
}
}
2. Update Shopify Module¶
Update apps/api/src/shopify/shopify.module.ts to include the new providers:
import { Module } from '@nestjs/common';
import { ShopifyService } from './shopify.service';
import { ShopifyController } from './shopify.controller';
import { OrdersModule } from '../orders/orders.module';
import { ProductMappingsModule } from '../product-mappings/product-mappings.module';
import { EventLogModule } from '../event-log/event-log.module';
import { DatabaseModule } from '../database/database.module';
import { WebhookIdempotencyRepository } from './webhook-idempotency.repository';
import { WebhookCleanupService } from './webhook-cleanup.service';
@Module({
imports: [
// Note: ScheduleModule.forRoot() is already registered in AppModule
DatabaseModule,
OrdersModule,
ProductMappingsModule,
EventLogModule,
],
controllers: [ShopifyController],
providers: [
ShopifyService,
WebhookIdempotencyRepository,
WebhookCleanupService,
],
exports: [ShopifyService],
})
export class ShopifyModule {}
Note: The
ScheduleModule.forRoot()is already registered globally inAppModule, so the@Crondecorator inWebhookCleanupServicewill work automatically.
Phase 5: Update Shopify Service Tests (1-2 hours)¶
Priority: Critical | Impact: Critical | Dependencies: Phase 4
1. Update Existing Tests¶
Update apps/api/src/shopify/__tests__/shopify.service.spec.ts to mock the new repository:
import { Test, TestingModule } from '@nestjs/testing';
import { ShopifyService } from '../shopify.service';
import { OrdersService } from '../../orders/orders.service';
import { ProductMappingsService } from '../../product-mappings/product-mappings.service';
import { EventLogService } from '../../event-log/event-log.service';
import { WebhookIdempotencyRepository } from '../webhook-idempotency.repository';
import { createMock, DeepMocked } from '@golevelup/ts-jest';
import { OrderWebhookPayload } from '../dto/shopify-webhook.dto';
describe('ShopifyService', () => {
let service: ShopifyService;
let ordersService: DeepMocked<OrdersService>;
let productMappingsService: DeepMocked<ProductMappingsService>;
let eventLogService: DeepMocked<EventLogService>;
let webhookIdempotencyRepository: DeepMocked<WebhookIdempotencyRepository>;
const createMockPayload = (
overrides: Partial<OrderWebhookPayload> = {},
): OrderWebhookPayload => ({
id: 123456789,
name: '#1001',
email: 'customer@example.com',
total_price: '99.99',
currency: 'EUR',
financial_status: 'paid',
fulfillment_status: null,
cancelled_at: null,
test: false,
customer: {
id: 1,
first_name: 'John',
last_name: 'Doe',
email: 'customer@example.com',
},
shipping_address: {
first_name: 'John',
last_name: 'Doe',
address1: '123 Main St',
city: 'Amsterdam',
province: 'NH',
country: 'Netherlands',
zip: '1012',
phone: '+31612345678',
},
line_items: [
{
id: 1,
title: 'Test Product',
sku: 'TEST-SKU',
quantity: 1,
price: '99.99',
variant_title: null,
},
],
fulfillments: [],
...overrides,
});
beforeEach(async () => {
const module: TestingModule = await Test.createTestingModule({
providers: [
ShopifyService,
{
provide: OrdersService,
useValue: createMock<OrdersService>(),
},
{
provide: ProductMappingsService,
useValue: createMock<ProductMappingsService>(),
},
{
provide: EventLogService,
useValue: createMock<EventLogService>(),
},
{
provide: WebhookIdempotencyRepository,
useValue: createMock<WebhookIdempotencyRepository>(),
},
],
}).compile();
service = module.get<ShopifyService>(ShopifyService);
ordersService = module.get(OrdersService);
productMappingsService = module.get(ProductMappingsService);
eventLogService = module.get(EventLogService);
webhookIdempotencyRepository = module.get(WebhookIdempotencyRepository);
// Default behavior: webhooks not previously processed
webhookIdempotencyRepository.isProcessedOrMark.mockResolvedValue(false);
});
afterEach(() => {
jest.clearAllMocks();
});
describe('handleOrderCreated', () => {
it('should skip already processed webhooks', async () => {
webhookIdempotencyRepository.isProcessedOrMark.mockResolvedValue(true);
await service.handleOrderCreated(createMockPayload(), 'webhook-123');
expect(ordersService.createFromShopify).not.toHaveBeenCalled();
});
it('should mark webhook as processed and create order', async () => {
const payload = createMockPayload();
productMappingsService.findUnmappedSkus.mockResolvedValue([]);
ordersService.createFromShopify.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.createFromShopify>>);
await service.handleOrderCreated(payload, 'webhook-123');
expect(webhookIdempotencyRepository.isProcessedOrMark).toHaveBeenCalledWith(
'webhook-123',
'orders/create',
24,
'123456789',
);
expect(ordersService.createFromShopify).toHaveBeenCalled();
});
it('should skip test orders', async () => {
const payload = createMockPayload({ test: true });
await service.handleOrderCreated(payload, 'webhook-123');
expect(ordersService.createFromShopify).not.toHaveBeenCalled();
});
it('should log warning for unmapped SKUs', async () => {
const payload = createMockPayload();
productMappingsService.findUnmappedSkus.mockResolvedValue(['UNKNOWN-SKU']);
ordersService.createFromShopify.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.createFromShopify>>);
await service.handleOrderCreated(payload, 'webhook-123');
expect(eventLogService.log).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'order.unmapped_products',
severity: 'WARNING',
}),
);
});
});
describe('handleOrderUpdated', () => {
it('should skip already processed webhooks', async () => {
webhookIdempotencyRepository.isProcessedOrMark.mockResolvedValue(true);
await service.handleOrderUpdated(createMockPayload(), 'webhook-123');
expect(ordersService.findByShopifyOrderId).not.toHaveBeenCalled();
});
it('should treat update as create if order not found', async () => {
ordersService.findByShopifyOrderId.mockResolvedValue(null);
productMappingsService.findUnmappedSkus.mockResolvedValue([]);
ordersService.createFromShopify.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.createFromShopify>>);
await service.handleOrderUpdated(createMockPayload(), 'webhook-123');
expect(ordersService.createFromShopify).toHaveBeenCalled();
});
it('should log update for existing order', async () => {
ordersService.findByShopifyOrderId.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.findByShopifyOrderId>>);
await service.handleOrderUpdated(createMockPayload(), 'webhook-123');
expect(eventLogService.log).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'order.updated',
}),
);
});
});
describe('handleOrderCancelled', () => {
it('should skip already processed webhooks', async () => {
webhookIdempotencyRepository.isProcessedOrMark.mockResolvedValue(true);
await service.handleOrderCancelled(createMockPayload(), 'webhook-123');
expect(ordersService.findByShopifyOrderId).not.toHaveBeenCalled();
});
it('should cancel existing order', async () => {
ordersService.findByShopifyOrderId.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.findByShopifyOrderId>>);
await service.handleOrderCancelled(
createMockPayload({ cancelled_at: '2026-01-17T12:00:00Z' }),
'webhook-123',
);
expect(ordersService.cancelOrder).toHaveBeenCalledWith(
'order-uuid',
expect.stringContaining('Cancelled in Shopify'),
);
});
it('should warn if order not found', async () => {
ordersService.findByShopifyOrderId.mockResolvedValue(null);
await service.handleOrderCancelled(createMockPayload(), 'webhook-123');
expect(ordersService.cancelOrder).not.toHaveBeenCalled();
});
});
describe('handleOrderFulfilled', () => {
it('should skip already processed webhooks', async () => {
webhookIdempotencyRepository.isProcessedOrMark.mockResolvedValue(true);
await service.handleOrderFulfilled(createMockPayload(), 'webhook-123');
expect(ordersService.findByShopifyOrderId).not.toHaveBeenCalled();
});
it('should log external fulfillment for existing order', async () => {
ordersService.findByShopifyOrderId.mockResolvedValue({
id: 'order-uuid',
shopifyOrderNumber: '#1001',
} as Awaited<ReturnType<typeof ordersService.findByShopifyOrderId>>);
await service.handleOrderFulfilled(createMockPayload(), 'webhook-123');
expect(eventLogService.log).toHaveBeenCalledWith(
expect.objectContaining({
eventType: 'order.fulfilled_externally',
}),
);
});
});
});
Phase 6: Documentation Updates (30 minutes)¶
Priority: Medium | Impact: Medium | Dependencies: Phase 5
1. Update Technical Debt Register¶
Update docs/04-development/techdebt/technical-debt-register.md:
In the Summary Dashboard section, update the Critical count:
## Summary Dashboard
| Category | Critical | High | Medium | Low | Total |
|----------|----------|------|--------|-----|-------|
| Architecture Debt | 0 | 3 | 2 | 1 | 6 |
| Code Debt | 0 | 4 | 5 | 3 | 12 |
| Test Debt | 1 | 2 | 1 | 0 | 4 |
| Documentation Debt | 0 | 1 | 2 | 1 | 4 |
| Infrastructure Debt | 0 | 1 | 2 | 1 | 4 |
| **Total** | **1** | **11** | **12** | **6** | **30** |
Update the TD-001 section to mark it as resolved:
### ~~TD-001: In-Memory Webhook Idempotency Cache~~ ✅ RESOLVED
**Type:** Architecture Debt
**Status:** ✅ **Resolved in Phase 5c**
**Resolution Date:** 2026-XX-XX
#### Resolution
Implemented database-backed webhook idempotency using the `ProcessedWebhook` table:
- **Storage:** PostgreSQL table with unique constraint on `webhookId`
- **TTL:** 24-hour expiry for idempotency records
- **Cleanup:** Hourly scheduled job removes expired records
- **Race-condition safe:** Uses database unique constraint for atomic check-and-mark
**Files Changed:**
- `prisma/schema.prisma` - Added `ProcessedWebhook` model
- `apps/api/src/shopify/webhook-idempotency.repository.ts` - New repository
- `apps/api/src/shopify/webhook-cleanup.service.ts` - Cleanup scheduled job
- `apps/api/src/shopify/shopify.service.ts` - Refactored to use repository
- `apps/api/src/shopify/shopify.module.ts` - Updated providers
---
Update the Debt Remediation Roadmap section:
## Debt Remediation Roadmap
### Phase 1: Critical Items (Next Sprint) ✅ PARTIALLY COMPLETE
1. ~~**TD-001**: Implement database-backed webhook idempotency~~ ✅ DONE
2. **TD-002**: Set up Vitest for frontend, add hook tests (PENDING)
2. Add ADR Entry¶
Add the following to docs/03-architecture/adr/ADR.md after ADR-032:
---
## ADR-033: Database-Backed Webhook Idempotency
| Attribute | Value |
| ----------- | --------------------------------------------------------- |
| **ID** | ADR-033 |
| **Status** | Accepted |
| **Date** | 2026-XX-XX |
| **Context** | In-memory webhook idempotency cache doesn't work in multi-instance deployments |
### Decision
Use a **PostgreSQL table** (`ProcessedWebhook`) for webhook idempotency instead of Redis or in-memory caching.
### Rationale
- **No additional infrastructure**: Uses existing PostgreSQL database
- **Transactional safety**: Database unique constraint ensures race-condition-safe idempotency
- **Simple cleanup**: Scheduled job removes expired records hourly
- **Debugging support**: Records include metadata (webhook type, order ID, timestamps)
- **Horizontal scaling**: Works correctly across multiple API instances
### Implementation
```typescript
// Atomic check-and-mark using unique constraint
async isProcessedOrMark(webhookId: string, type: string): Promise<boolean> {
try {
await this.prisma.processedWebhook.create({ data: { webhookId, webhookType: type, expiresAt } });
return false; // First time processing
} catch (error) {
if (error.code === 'P2002') return true; // Already processed
throw error;
}
}
Alternatives Considered¶
| Alternative | Pros | Cons | Decision |
|---|---|---|---|
| Redis | TTL support, fast | Additional infrastructure | Rejected |
| Distributed Lock | Works with DB | Complex, race conditions | Rejected |
| Database Table | Simple, no new infra | Needs cleanup job | Selected |
Consequences¶
- ✅ Works correctly in multi-instance deployments
- ✅ Survives application restarts
- ✅ No memory leaks
- ✅ Auditable (can query processed webhooks)
- ⚠️ Slightly higher latency than in-memory (< 10ms)
- ⚠️ Requires cleanup job (runs hourly)
---
## 📁 Files to Create/Modify
### New Files
prisma/migrations/ YYYYMMDDHHMMSS_add_processed_webhook/ migration.sql
### Modified Files
---
## 🧪 Testing Requirements
### Unit Tests
All new components must have comprehensive unit tests:
| Component | Test File | Coverage Target |
|-----------|-----------|-----------------|
| `WebhookIdempotencyRepository` | `webhook-idempotency.repository.spec.ts` | 100% |
| `WebhookCleanupService` | `webhook-cleanup.service.spec.ts` | 100% |
| `ShopifyService` (updated) | `shopify.service.spec.ts` | Maintain existing + new tests |
### Integration Tests
Verify the full webhook flow works with database idempotency:
```bash
# Run all Shopify-related tests
pnpm nx test api --testPathPattern=shopify
# Run all API tests
pnpm nx test api
E2E Tests¶
Existing acceptance tests should continue to pass:
pnpm nx run acceptance-tests:test
✅ Validation Checklist¶
Phase 1: Database Schema¶
-
ProcessedWebhookmodel added toprisma/schema.prisma - Migration created and applied
- Database has
ProcessedWebhooktable with correct indexes
Phase 2: Repository¶
-
WebhookIdempotencyRepositorycreated with all methods - Repository tests pass with 100% coverage
- Race-condition safety verified (unique constraint handling)
Phase 3: Cleanup Service¶
-
WebhookCleanupServicecreated with scheduled job - Cleanup runs on module init
- Cleanup runs hourly via cron
- Cleanup logs via EventLogService
- Cleanup service tests pass
Phase 4: Shopify Service¶
- In-memory
Set<string>removed -
WebhookIdempotencyRepositoryinjected - All webhook handlers use repository for idempotency
- Existing functionality preserved
Phase 5: Tests¶
- All existing tests updated with repository mocks
- New repository tests added
- New cleanup service tests added
- All tests pass:
pnpm nx test api
Phase 6: Documentation¶
- Technical debt register updated (TD-001 marked resolved)
- ADR-033 added to ADR document
- Summary dashboard counts updated
Final Verification¶
# Build passes
pnpm nx build api
# All API tests pass
pnpm nx test api
# E2E tests pass
pnpm nx run api-e2e:e2e
# Acceptance tests pass
pnpm nx run acceptance-tests:test
# Lint passes
pnpm nx lint api
# No in-memory Set usage
rg "processedWebhooks = new Set" apps/api/src/shopify/
# Should return 0 results
🚫 Constraints and Rules¶
MUST DO¶
- Use database unique constraint for race-condition safety
- Implement cleanup job to prevent unbounded table growth
- Set 24-hour TTL for webhook idempotency records
- Log cleanup activities via EventLogService
- Update all existing tests to use repository mocks
- Mark TD-001 as resolved in technical debt register
- Add ADR-033 documenting the decision
MUST NOT¶
- Use Redis (per project requirements)
- Use in-memory caching (the problem we're solving)
- Skip cleanup job (causes unbounded table growth)
- Break existing Shopify webhook functionality
- Remove or modify existing API contracts
- Leave failing tests
📊 Success Metrics¶
Before Phase 5c¶
| Issue | Status |
|---|---|
| Duplicate webhook processing in multi-instance | ❌ Possible |
| Memory leak from unbounded Set | ❌ Present |
| Data loss on restart | ❌ Present |
| Critical tech debt items | 2 |
After Phase 5c¶
| Issue | Status |
|---|---|
| Duplicate webhook processing in multi-instance | ✅ Prevented |
| Memory leak from unbounded Set | ✅ Eliminated |
| Data loss on restart | ✅ Prevented |
| Critical tech debt items | 1 (TD-002 remaining) |
🔮 Future Considerations¶
Once Phase 5c is complete:
- Monitoring: Add Prometheus metrics for webhook processing rate and cleanup stats
- Alerting: Alert on high webhook record counts (> 10,000)
- TD-002: Address missing frontend test coverage in next phase
END OF PROMPT
This prompt resolves TD-001 from the technical debt register using a database-backed solution. The implementation is horizontally scalable, handles race conditions correctly, and includes automatic cleanup to prevent unbounded growth.