Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using Distribt.Services.Products.BusinessLogic.DataAccess;
using Distribt.Services.Products.BusinessLogic.UseCases;
using Distribt.Services.Products.BusinessLogic.Outbox;
using Distribt.Services.Products.BusinessLogic.Services;

WebApplication app = DefaultDistribtWebApplication.Create(args, builder =>
{
Expand All @@ -9,6 +11,8 @@
.AddScoped<ICreateProductDetails, CreateProductDetails>()
.AddScoped<IStockApi,ProductsDependencyFakeType>() //testing purposes
.AddScoped<IWarehouseApi, ProductsDependencyFakeType>() //testing purposes
.AddScoped<IOutboxProcessor, OutboxProcessor>()
.AddHostedService<OutboxBackgroundService>()
.AddServiceBusDomainPublisher(builder.Configuration);
});

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
using System.ComponentModel.DataAnnotations;

namespace Distribt.Services.Products.BusinessLogic.DataAccess;

public class OutboxMessage
{
public int Id { get; set; }

[Required]
[MaxLength(500)]
public string EventType { get; set; } = null!;

[Required]
public string EventData { get; set; } = null!;

[Required]
[MaxLength(50)]
public string RoutingKey { get; set; } = null!;

public DateTime CreatedAt { get; set; }

public DateTime? ProcessedAt { get; set; }

public bool IsProcessed { get; set; }

public string? ErrorMessage { get; set; }

public int RetryCount { get; set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using Distribt.Services.Products.Dtos;
using Microsoft.EntityFrameworkCore;
using System.Text.Json;

namespace Distribt.Services.Products.BusinessLogic.DataAccess;

Expand All @@ -8,11 +9,24 @@ public interface IProductsWriteStore
{
Task UpdateProduct(int id, ProductDetails details);
Task<int> CreateRecord(ProductDetails details);
Task<int> CreateRecordWithOutboxMessage(ProductDetails details, string eventType, object eventData, string routingKey);
Task UpdateProductWithOutboxMessage(int id, ProductDetails details, string eventType, object eventData, string routingKey);
Task<List<OutboxMessage>> GetUnprocessedMessages(int batchSize = 100);
Task MarkAsProcessed(int messageId);
Task MarkAsFailed(int messageId, string errorMessage);
}

public class ProductDetailEntity
{
public int? Id { get; set; }
public string? Name { get; set; }
public string? Description { get; set; }
}

public class ProductsWriteStore : DbContext, IProductsWriteStore
{
private DbSet<ProductDetailEntity> Products { get; set; } = null!;
private DbSet<OutboxMessage> OutboxMessages { get; set; } = null!;

public ProductsWriteStore(DbContextOptions<ProductsWriteStore> options) : base(options)
{
Expand All @@ -39,13 +53,186 @@ public async Task<int> CreateRecord(ProductDetails details)

return result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db");
}


private class ProductDetailEntity

public async Task<List<OutboxMessage>> GetUnprocessedMessages(int batchSize = 100)
{
return await OutboxMessages
.Where(m => !m.IsProcessed && m.RetryCount < 3)
.OrderBy(m => m.CreatedAt)
.Take(batchSize)
.ToListAsync();
}

public async Task MarkAsProcessed(int messageId)
{
var message = await OutboxMessages.FindAsync(messageId);
if (message != null)
{
message.IsProcessed = true;
message.ProcessedAt = DateTime.UtcNow;
await SaveChangesAsync();
}
}

public async Task MarkAsFailed(int messageId, string errorMessage)
{
var message = await OutboxMessages.FindAsync(messageId);
if (message != null)
{
message.RetryCount++;
message.ErrorMessage = errorMessage;
if (message.RetryCount >= 3)
{
message.IsProcessed = true; // Mark as processed to stop retrying
message.ProcessedAt = DateTime.UtcNow;
}
await SaveChangesAsync();
}
}

public async Task<int> CreateRecordWithOutboxMessage(ProductDetails details, string eventType, object eventData, string routingKey)
{
public int? Id { get; set; }
public string? Name { get; set; }
public string? Description { get; set; }
// Check if we're using in-memory database (for tests)
var isInMemoryDatabase = Database.ProviderName == "Microsoft.EntityFrameworkCore.InMemory";

if (isInMemoryDatabase)
{
// For in-memory database, we don't need transactions
ProductDetailEntity newProduct = new ProductDetailEntity()
{
Description = details.Description,
Name = details.Name
};

var result = await Products.AddAsync(newProduct);
await SaveChangesAsync();

var productId = result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db");

// Update the event data with the actual product ID if it's a ProductCreated event
if (eventData is ProductCreated productCreated)
{
eventData = new ProductCreated(productId, productCreated.ProductRequest);
}

var outboxMessage = new OutboxMessage
{
EventType = eventType,
EventData = JsonSerializer.Serialize(eventData),
RoutingKey = routingKey,
CreatedAt = DateTime.UtcNow,
IsProcessed = false,
RetryCount = 0
};

await OutboxMessages.AddAsync(outboxMessage);
await SaveChangesAsync();

return productId;
}
else
{
// For real databases, use transactions
using var transaction = await Database.BeginTransactionAsync();
try
{
ProductDetailEntity newProduct = new ProductDetailEntity()
{
Description = details.Description,
Name = details.Name
};

var result = await Products.AddAsync(newProduct);
await SaveChangesAsync();

var productId = result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db");

// Update the event data with the actual product ID if it's a ProductCreated event
if (eventData is ProductCreated productCreated)
{
eventData = new ProductCreated(productId, productCreated.ProductRequest);
}

var outboxMessage = new OutboxMessage
{
EventType = eventType,
EventData = JsonSerializer.Serialize(eventData),
RoutingKey = routingKey,
CreatedAt = DateTime.UtcNow,
IsProcessed = false,
RetryCount = 0
};

await OutboxMessages.AddAsync(outboxMessage);
await SaveChangesAsync();

await transaction.CommitAsync();
return productId;
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}

public async Task UpdateProductWithOutboxMessage(int id, ProductDetails details, string eventType, object eventData, string routingKey)
{
// Check if we're using in-memory database (for tests)
var isInMemoryDatabase = Database.ProviderName == "Microsoft.EntityFrameworkCore.InMemory";

if (isInMemoryDatabase)
{
// For in-memory database, we don't need transactions
var product = await Products.SingleAsync(a => a.Id == id);
product.Description = details.Description;
product.Name = details.Name;

var outboxMessage = new OutboxMessage
{
EventType = eventType,
EventData = JsonSerializer.Serialize(eventData),
RoutingKey = routingKey,
CreatedAt = DateTime.UtcNow,
IsProcessed = false,
RetryCount = 0
};

await OutboxMessages.AddAsync(outboxMessage);
await SaveChangesAsync();
}
else
{
// For real databases, use transactions
using var transaction = await Database.BeginTransactionAsync();
try
{
var product = await Products.SingleAsync(a => a.Id == id);
product.Description = details.Description;
product.Name = details.Name;

var outboxMessage = new OutboxMessage
{
EventType = eventType,
EventData = JsonSerializer.Serialize(eventData),
RoutingKey = routingKey,
CreatedAt = DateTime.UtcNow,
IsProcessed = false,
RetryCount = 0
};

await OutboxMessages.AddAsync(outboxMessage);
await SaveChangesAsync();

await transaction.CommitAsync();
}
catch
{
await transaction.RollbackAsync();
throw;
}
}
}

}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
namespace Distribt.Services.Products.BusinessLogic.Outbox;

public interface IOutboxProcessor
{
Task ProcessPendingMessages(CancellationToken cancellationToken = default);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using Distribt.Services.Products.BusinessLogic.DataAccess;
using Distribt.Shared.Communication.Publisher.Domain;
using Microsoft.Extensions.Logging;

namespace Distribt.Services.Products.BusinessLogic.Outbox;

public class OutboxProcessor : IOutboxProcessor
{
private readonly IProductsWriteStore _writeStore;
private readonly IDomainMessagePublisher _domainMessagePublisher;
private readonly ILogger<OutboxProcessor> _logger;

public OutboxProcessor(
IProductsWriteStore writeStore,
IDomainMessagePublisher domainMessagePublisher,
ILogger<OutboxProcessor> logger)
{
_writeStore = writeStore;
_domainMessagePublisher = domainMessagePublisher;
_logger = logger;
}

public async Task ProcessPendingMessages(CancellationToken cancellationToken = default)
{
try
{
var messages = await _writeStore.GetUnprocessedMessages();

foreach (var message in messages)
{
if (cancellationToken.IsCancellationRequested)
break;

try
{
// Deserialize and publish the message
var eventType = Type.GetType(message.EventType);
if (eventType == null)
{
_logger.LogError("Unknown event type: {EventType}", message.EventType);
await _writeStore.MarkAsFailed(message.Id, $"Unknown event type: {message.EventType}");
continue;
}

var eventData = System.Text.Json.JsonSerializer.Deserialize(message.EventData, eventType);
if (eventData == null)
{
_logger.LogError("Failed to deserialize event data for message {MessageId}", message.Id);
await _writeStore.MarkAsFailed(message.Id, "Failed to deserialize event data");
continue;
}

await _domainMessagePublisher.Publish(eventData, routingKey: message.RoutingKey);
await _writeStore.MarkAsProcessed(message.Id);

_logger.LogDebug("Successfully processed outbox message {MessageId}", message.Id);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox message {MessageId}", message.Id);
await _writeStore.MarkAsFailed(message.Id, ex.Message);
}
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error processing outbox messages");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
using Distribt.Services.Products.BusinessLogic.Outbox;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace Distribt.Services.Products.BusinessLogic.Services;

public class OutboxBackgroundService : BackgroundService
{
private readonly ILogger<OutboxBackgroundService> _logger;
private readonly IServiceProvider _serviceProvider;
private readonly TimeSpan _processingInterval = TimeSpan.FromSeconds(10);

public OutboxBackgroundService(
ILogger<OutboxBackgroundService> logger,
IServiceProvider serviceProvider)
{
_logger = logger;
_serviceProvider = serviceProvider;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Outbox background service started");

while (!stoppingToken.IsCancellationRequested)
{
try
{
using var scope = _serviceProvider.CreateScope();
var outboxProcessor = scope.ServiceProvider.GetRequiredService<IOutboxProcessor>();

await outboxProcessor.ProcessPendingMessages(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex, "Error in outbox background service");
}

await Task.Delay(_processingInterval, stoppingToken);
}

_logger.LogInformation("Outbox background service stopped");
}
}
Loading
Loading