diff --git a/src/Services/Products/Distribt.Services.Products.Api.Write/Program.cs b/src/Services/Products/Distribt.Services.Products.Api.Write/Program.cs index ff2e18a..2b087b0 100644 --- a/src/Services/Products/Distribt.Services.Products.Api.Write/Program.cs +++ b/src/Services/Products/Distribt.Services.Products.Api.Write/Program.cs @@ -1,5 +1,7 @@ using Distribt.Services.Products.BusinessLogic.DataAccess; using Distribt.Services.Products.BusinessLogic.UseCases; +using Distribt.Services.Products.BusinessLogic.Outbox; +using Distribt.Services.Products.BusinessLogic.Services; WebApplication app = DefaultDistribtWebApplication.Create(args, builder => { @@ -9,6 +11,8 @@ .AddScoped() .AddScoped() //testing purposes .AddScoped() //testing purposes + .AddScoped() + .AddHostedService() .AddServiceBusDomainPublisher(builder.Configuration); }); diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/OutboxMessage.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/OutboxMessage.cs new file mode 100644 index 0000000..b80ce9a --- /dev/null +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/OutboxMessage.cs @@ -0,0 +1,29 @@ +using System.ComponentModel.DataAnnotations; + +namespace Distribt.Services.Products.BusinessLogic.DataAccess; + +public class OutboxMessage +{ + public int Id { get; set; } + + [Required] + [MaxLength(500)] + public string EventType { get; set; } = null!; + + [Required] + public string EventData { get; set; } = null!; + + [Required] + [MaxLength(50)] + public string RoutingKey { get; set; } = null!; + + public DateTime CreatedAt { get; set; } + + public DateTime? ProcessedAt { get; set; } + + public bool IsProcessed { get; set; } + + public string? ErrorMessage { get; set; } + + public int RetryCount { get; set; } +} \ No newline at end of file diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/ProductsWriteStore.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/ProductsWriteStore.cs index 313cc07..46c9fc4 100644 --- a/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/ProductsWriteStore.cs +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/DataAccess/ProductsWriteStore.cs @@ -1,5 +1,6 @@ using Distribt.Services.Products.Dtos; using Microsoft.EntityFrameworkCore; +using System.Text.Json; namespace Distribt.Services.Products.BusinessLogic.DataAccess; @@ -8,11 +9,24 @@ public interface IProductsWriteStore { Task UpdateProduct(int id, ProductDetails details); Task CreateRecord(ProductDetails details); + Task CreateRecordWithOutboxMessage(ProductDetails details, string eventType, object eventData, string routingKey); + Task UpdateProductWithOutboxMessage(int id, ProductDetails details, string eventType, object eventData, string routingKey); + Task> GetUnprocessedMessages(int batchSize = 100); + Task MarkAsProcessed(int messageId); + Task MarkAsFailed(int messageId, string errorMessage); +} + +public class ProductDetailEntity +{ + public int? Id { get; set; } + public string? Name { get; set; } + public string? Description { get; set; } } public class ProductsWriteStore : DbContext, IProductsWriteStore { private DbSet Products { get; set; } = null!; + private DbSet OutboxMessages { get; set; } = null!; public ProductsWriteStore(DbContextOptions options) : base(options) { @@ -39,13 +53,186 @@ public async Task CreateRecord(ProductDetails details) return result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db"); } - - - private class ProductDetailEntity + + public async Task> GetUnprocessedMessages(int batchSize = 100) + { + return await OutboxMessages + .Where(m => !m.IsProcessed && m.RetryCount < 3) + .OrderBy(m => m.CreatedAt) + .Take(batchSize) + .ToListAsync(); + } + + public async Task MarkAsProcessed(int messageId) + { + var message = await OutboxMessages.FindAsync(messageId); + if (message != null) + { + message.IsProcessed = true; + message.ProcessedAt = DateTime.UtcNow; + await SaveChangesAsync(); + } + } + + public async Task MarkAsFailed(int messageId, string errorMessage) + { + var message = await OutboxMessages.FindAsync(messageId); + if (message != null) + { + message.RetryCount++; + message.ErrorMessage = errorMessage; + if (message.RetryCount >= 3) + { + message.IsProcessed = true; // Mark as processed to stop retrying + message.ProcessedAt = DateTime.UtcNow; + } + await SaveChangesAsync(); + } + } + + public async Task CreateRecordWithOutboxMessage(ProductDetails details, string eventType, object eventData, string routingKey) { - public int? Id { get; set; } - public string? Name { get; set; } - public string? Description { get; set; } + // Check if we're using in-memory database (for tests) + var isInMemoryDatabase = Database.ProviderName == "Microsoft.EntityFrameworkCore.InMemory"; + + if (isInMemoryDatabase) + { + // For in-memory database, we don't need transactions + ProductDetailEntity newProduct = new ProductDetailEntity() + { + Description = details.Description, + Name = details.Name + }; + + var result = await Products.AddAsync(newProduct); + await SaveChangesAsync(); + + var productId = result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db"); + + // Update the event data with the actual product ID if it's a ProductCreated event + if (eventData is ProductCreated productCreated) + { + eventData = new ProductCreated(productId, productCreated.ProductRequest); + } + + var outboxMessage = new OutboxMessage + { + EventType = eventType, + EventData = JsonSerializer.Serialize(eventData), + RoutingKey = routingKey, + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await OutboxMessages.AddAsync(outboxMessage); + await SaveChangesAsync(); + + return productId; + } + else + { + // For real databases, use transactions + using var transaction = await Database.BeginTransactionAsync(); + try + { + ProductDetailEntity newProduct = new ProductDetailEntity() + { + Description = details.Description, + Name = details.Name + }; + + var result = await Products.AddAsync(newProduct); + await SaveChangesAsync(); + + var productId = result.Entity.Id ?? throw new ApplicationException("the record has not been inserted in the db"); + + // Update the event data with the actual product ID if it's a ProductCreated event + if (eventData is ProductCreated productCreated) + { + eventData = new ProductCreated(productId, productCreated.ProductRequest); + } + + var outboxMessage = new OutboxMessage + { + EventType = eventType, + EventData = JsonSerializer.Serialize(eventData), + RoutingKey = routingKey, + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await OutboxMessages.AddAsync(outboxMessage); + await SaveChangesAsync(); + + await transaction.CommitAsync(); + return productId; + } + catch + { + await transaction.RollbackAsync(); + throw; + } + } + } + + public async Task UpdateProductWithOutboxMessage(int id, ProductDetails details, string eventType, object eventData, string routingKey) + { + // Check if we're using in-memory database (for tests) + var isInMemoryDatabase = Database.ProviderName == "Microsoft.EntityFrameworkCore.InMemory"; + + if (isInMemoryDatabase) + { + // For in-memory database, we don't need transactions + var product = await Products.SingleAsync(a => a.Id == id); + product.Description = details.Description; + product.Name = details.Name; + + var outboxMessage = new OutboxMessage + { + EventType = eventType, + EventData = JsonSerializer.Serialize(eventData), + RoutingKey = routingKey, + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await OutboxMessages.AddAsync(outboxMessage); + await SaveChangesAsync(); + } + else + { + // For real databases, use transactions + using var transaction = await Database.BeginTransactionAsync(); + try + { + var product = await Products.SingleAsync(a => a.Id == id); + product.Description = details.Description; + product.Name = details.Name; + + var outboxMessage = new OutboxMessage + { + EventType = eventType, + EventData = JsonSerializer.Serialize(eventData), + RoutingKey = routingKey, + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await OutboxMessages.AddAsync(outboxMessage); + await SaveChangesAsync(); + + await transaction.CommitAsync(); + } + catch + { + await transaction.RollbackAsync(); + throw; + } + } } } diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/IOutboxProcessor.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/IOutboxProcessor.cs new file mode 100644 index 0000000..27f835c --- /dev/null +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/IOutboxProcessor.cs @@ -0,0 +1,6 @@ +namespace Distribt.Services.Products.BusinessLogic.Outbox; + +public interface IOutboxProcessor +{ + Task ProcessPendingMessages(CancellationToken cancellationToken = default); +} \ No newline at end of file diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/OutboxProcessor.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/OutboxProcessor.cs new file mode 100644 index 0000000..2d34175 --- /dev/null +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Outbox/OutboxProcessor.cs @@ -0,0 +1,70 @@ +using Distribt.Services.Products.BusinessLogic.DataAccess; +using Distribt.Shared.Communication.Publisher.Domain; +using Microsoft.Extensions.Logging; + +namespace Distribt.Services.Products.BusinessLogic.Outbox; + +public class OutboxProcessor : IOutboxProcessor +{ + private readonly IProductsWriteStore _writeStore; + private readonly IDomainMessagePublisher _domainMessagePublisher; + private readonly ILogger _logger; + + public OutboxProcessor( + IProductsWriteStore writeStore, + IDomainMessagePublisher domainMessagePublisher, + ILogger logger) + { + _writeStore = writeStore; + _domainMessagePublisher = domainMessagePublisher; + _logger = logger; + } + + public async Task ProcessPendingMessages(CancellationToken cancellationToken = default) + { + try + { + var messages = await _writeStore.GetUnprocessedMessages(); + + foreach (var message in messages) + { + if (cancellationToken.IsCancellationRequested) + break; + + try + { + // Deserialize and publish the message + var eventType = Type.GetType(message.EventType); + if (eventType == null) + { + _logger.LogError("Unknown event type: {EventType}", message.EventType); + await _writeStore.MarkAsFailed(message.Id, $"Unknown event type: {message.EventType}"); + continue; + } + + var eventData = System.Text.Json.JsonSerializer.Deserialize(message.EventData, eventType); + if (eventData == null) + { + _logger.LogError("Failed to deserialize event data for message {MessageId}", message.Id); + await _writeStore.MarkAsFailed(message.Id, "Failed to deserialize event data"); + continue; + } + + await _domainMessagePublisher.Publish(eventData, routingKey: message.RoutingKey); + await _writeStore.MarkAsProcessed(message.Id); + + _logger.LogDebug("Successfully processed outbox message {MessageId}", message.Id); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing outbox message {MessageId}", message.Id); + await _writeStore.MarkAsFailed(message.Id, ex.Message); + } + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Error processing outbox messages"); + } + } +} \ No newline at end of file diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/Services/OutboxBackgroundService.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Services/OutboxBackgroundService.cs new file mode 100644 index 0000000..993884f --- /dev/null +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/Services/OutboxBackgroundService.cs @@ -0,0 +1,45 @@ +using Distribt.Services.Products.BusinessLogic.Outbox; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace Distribt.Services.Products.BusinessLogic.Services; + +public class OutboxBackgroundService : BackgroundService +{ + private readonly ILogger _logger; + private readonly IServiceProvider _serviceProvider; + private readonly TimeSpan _processingInterval = TimeSpan.FromSeconds(10); + + public OutboxBackgroundService( + ILogger logger, + IServiceProvider serviceProvider) + { + _logger = logger; + _serviceProvider = serviceProvider; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + _logger.LogInformation("Outbox background service started"); + + while (!stoppingToken.IsCancellationRequested) + { + try + { + using var scope = _serviceProvider.CreateScope(); + var outboxProcessor = scope.ServiceProvider.GetRequiredService(); + + await outboxProcessor.ProcessPendingMessages(stoppingToken); + } + catch (Exception ex) + { + _logger.LogError(ex, "Error in outbox background service"); + } + + await Task.Delay(_processingInterval, stoppingToken); + } + + _logger.LogInformation("Outbox background service stopped"); + } +} \ No newline at end of file diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/CreateProductDetails.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/CreateProductDetails.cs index 5b69040..247a6d0 100644 --- a/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/CreateProductDetails.cs +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/CreateProductDetails.cs @@ -1,6 +1,5 @@ using Distribt.Services.Products.BusinessLogic.DataAccess; using Distribt.Services.Products.Dtos; -using Distribt.Shared.Communication.Publisher.Domain; using Distribt.Shared.Discovery; namespace Distribt.Services.Products.BusinessLogic.UseCases; @@ -14,15 +13,13 @@ public interface ICreateProductDetails public class CreateProductDetails : ICreateProductDetails { private readonly IProductsWriteStore _writeStore; - private readonly IDomainMessagePublisher _domainMessagePublisher; private readonly IServiceDiscovery _discovery; private readonly IStockApi _stockApi; private readonly IWarehouseApi _warehouseApi; - public CreateProductDetails(IProductsWriteStore writeStore, IDomainMessagePublisher domainMessagePublisher, IServiceDiscovery discovery, IStockApi stockApi, IWarehouseApi warehouseApi) + public CreateProductDetails(IProductsWriteStore writeStore, IServiceDiscovery discovery, IStockApi stockApi, IWarehouseApi warehouseApi) { _writeStore = writeStore; - _domainMessagePublisher = domainMessagePublisher; _discovery = discovery; _stockApi = stockApi; _warehouseApi = warehouseApi; @@ -31,14 +28,16 @@ public CreateProductDetails(IProductsWriteStore writeStore, IDomainMessagePublis public async Task Execute(CreateProductRequest productRequest) { - int productId = await _writeStore.CreateRecord(productRequest.Details); + int productId = await _writeStore.CreateRecordWithOutboxMessage( + productRequest.Details, + typeof(ProductCreated).AssemblyQualifiedName!, + new ProductCreated(0, productRequest), // productId will be set correctly in the actual event + "internal"); await _stockApi.AddStockToProduct(productId, productRequest.Stock); await _warehouseApi.ModifySalesPrice(productId, productRequest.Price); - await _domainMessagePublisher.Publish(new ProductCreated(productId, productRequest), routingKey: "internal"); - string getUrl = await _discovery.GetFullAddress(DiscoveryServices.Microservices.ProductsApi.ApiRead); return new CreateProductResponse($"{getUrl}/product/{productId}"); diff --git a/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/UpdateProductDetails.cs b/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/UpdateProductDetails.cs index 278b26f..dc7bd95 100644 --- a/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/UpdateProductDetails.cs +++ b/src/Services/Products/Distribt.Services.Products.BusinessLogic/UseCases/UpdateProductDetails.cs @@ -1,6 +1,5 @@ using Distribt.Services.Products.BusinessLogic.DataAccess; using Distribt.Services.Products.Dtos; -using Distribt.Shared.Communication.Publisher.Domain; namespace Distribt.Services.Products.BusinessLogic.UseCases; @@ -12,19 +11,20 @@ public interface IUpdateProductDetails public class UpdateProductDetails : IUpdateProductDetails { private readonly IProductsWriteStore _writeStore; - private readonly IDomainMessagePublisher _domainMessagePublisher; - public UpdateProductDetails(IProductsWriteStore writeStore, IDomainMessagePublisher domainMessagePublisher) + public UpdateProductDetails(IProductsWriteStore writeStore) { _writeStore = writeStore; - _domainMessagePublisher = domainMessagePublisher; } public async Task Execute(int id, ProductDetails productDetails) { - await _writeStore.UpdateProduct(id, productDetails); - - await _domainMessagePublisher.Publish(new ProductUpdated(id, productDetails), routingKey: "internal"); + await _writeStore.UpdateProductWithOutboxMessage( + id, + productDetails, + typeof(ProductUpdated).AssemblyQualifiedName!, + new ProductUpdated(id, productDetails), + "internal"); return true; } diff --git a/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Distribt.Tests.Services.Products.BusinessLogicTests.csproj b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Distribt.Tests.Services.Products.BusinessLogicTests.csproj new file mode 100644 index 0000000..a6e043d --- /dev/null +++ b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Distribt.Tests.Services.Products.BusinessLogicTests.csproj @@ -0,0 +1,30 @@ + + + + net8.0 + enable + enable + false + Distribt.Tests.Services.Products.BusinessLogic + + + + + + + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + runtime; build; native; contentfiles; analyzers; buildtransitive + all + + + + + + + + \ No newline at end of file diff --git a/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Outbox/OutboxProcessorTests.cs b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Outbox/OutboxProcessorTests.cs new file mode 100644 index 0000000..4231733 --- /dev/null +++ b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/Outbox/OutboxProcessorTests.cs @@ -0,0 +1,172 @@ +using Distribt.Services.Products.BusinessLogic.DataAccess; +using Distribt.Services.Products.BusinessLogic.Outbox; +using Distribt.Services.Products.Dtos; +using Distribt.Shared.Communication.Publisher.Domain; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Microsoft.Extensions.Logging; +using Moq; +using System.Text.Json; +using Xunit; + +namespace Distribt.Tests.Services.Products.BusinessLogic.Outbox; + +public class OutboxProcessorTests +{ + private ProductsWriteStore CreateInMemoryDbContext() + { + var options = new DbContextOptionsBuilder() + .UseInMemoryDatabase(databaseName: Guid.NewGuid().ToString()) + .Options; + return new ProductsWriteStore(options); + } + + [Fact] + public async Task CreateRecord_ShouldCreateProduct() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + var productDetails = new ProductDetails("Test Product", "Test Description"); + + // Act + var productId = await dbContext.CreateRecord(productDetails); + + // Assert + Assert.True(productId > 0); + + var product = await dbContext.Set().FirstAsync(); + Assert.Equal("Test Product", product.Name); + Assert.Equal("Test Description", product.Description); + } + + [Fact] + public async Task CreateRecordWithOutboxMessage_ShouldCreateProductAndOutboxMessage() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + var productDetails = new ProductDetails("Test Product", "Test Description"); + var productRequest = new CreateProductRequest(productDetails, 10, 100m); + + // Act + var productId = await dbContext.CreateRecordWithOutboxMessage( + productDetails, + typeof(ProductCreated).AssemblyQualifiedName!, + new ProductCreated(0, productRequest), // Will be updated with actual ID + "internal"); + + // Assert + Assert.True(productId > 0); + + var product = await dbContext.Set().FirstAsync(); + Assert.Equal("Test Product", product.Name); + Assert.Equal("Test Description", product.Description); + + var outboxMessage = await dbContext.Set().FirstAsync(); + Assert.Equal(typeof(ProductCreated).AssemblyQualifiedName, outboxMessage.EventType); + Assert.Equal("internal", outboxMessage.RoutingKey); + Assert.False(outboxMessage.IsProcessed); + + var deserializedEvent = JsonSerializer.Deserialize(outboxMessage.EventData); + Assert.Equal(productId, deserializedEvent!.Id); + } + + [Fact] + public async Task OutboxProcessor_ShouldProcessUnprocessedMessages() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + var mockPublisher = new Mock(); + var mockLogger = new Mock>(); + + var productCreated = new ProductCreated(1, new CreateProductRequest(new ProductDetails("Test", "Desc"), 10, 100m)); + var outboxMessage = new OutboxMessage + { + EventType = typeof(ProductCreated).AssemblyQualifiedName!, + EventData = JsonSerializer.Serialize(productCreated), + RoutingKey = "internal", + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await dbContext.Set().AddAsync(outboxMessage); + await dbContext.SaveChangesAsync(); + + var processor = new OutboxProcessor(dbContext, mockPublisher.Object, mockLogger.Object); + + // Act + await processor.ProcessPendingMessages(); + + // Assert + mockPublisher.Verify(p => p.Publish(It.IsAny(), null, "internal", default), Times.Once); + + var processedMessage = await dbContext.Set().FirstAsync(); + Assert.True(processedMessage.IsProcessed); + Assert.NotNull(processedMessage.ProcessedAt); + } + + [Fact] + public async Task GetUnprocessedMessages_ShouldReturnOnlyUnprocessedMessages() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + + var processedMessage = new OutboxMessage + { + EventType = "Test", + EventData = "{}", + RoutingKey = "test", + CreatedAt = DateTime.UtcNow, + IsProcessed = true, + RetryCount = 0 + }; + + var unprocessedMessage = new OutboxMessage + { + EventType = "Test", + EventData = "{}", + RoutingKey = "test", + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await dbContext.Set().AddRangeAsync(processedMessage, unprocessedMessage); + await dbContext.SaveChangesAsync(); + + // Act + var result = await dbContext.GetUnprocessedMessages(); + + // Assert + Assert.Single(result); + Assert.False(result[0].IsProcessed); + } + + [Fact] + public async Task MarkAsProcessed_ShouldUpdateMessage() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + + var message = new OutboxMessage + { + EventType = "Test", + EventData = "{}", + RoutingKey = "test", + CreatedAt = DateTime.UtcNow, + IsProcessed = false, + RetryCount = 0 + }; + + await dbContext.Set().AddAsync(message); + await dbContext.SaveChangesAsync(); + + // Act + await dbContext.MarkAsProcessed(message.Id); + + // Assert + var updatedMessage = await dbContext.Set().FirstAsync(); + Assert.True(updatedMessage.IsProcessed); + Assert.NotNull(updatedMessage.ProcessedAt); + } +} \ No newline at end of file diff --git a/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/UseCases/ProductUseCasesIntegrationTests.cs b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/UseCases/ProductUseCasesIntegrationTests.cs new file mode 100644 index 0000000..819ddea --- /dev/null +++ b/src/Tests/Services/Products/Distribt.Tests.Services.Products.BusinessLogicTests/UseCases/ProductUseCasesIntegrationTests.cs @@ -0,0 +1,101 @@ +using Distribt.Services.Products.BusinessLogic.DataAccess; +using Distribt.Services.Products.BusinessLogic.UseCases; +using Distribt.Services.Products.Dtos; +using Microsoft.EntityFrameworkCore; +using Microsoft.EntityFrameworkCore.Diagnostics; +using Xunit; + +namespace Distribt.Tests.Services.Products.BusinessLogic.UseCases; + +public class ProductUseCasesIntegrationTests +{ + private ProductsWriteStore CreateInMemoryDbContext() + { + var options = new DbContextOptionsBuilder() + .UseInMemoryDatabase(databaseName: Guid.NewGuid().ToString()) + .Options; + return new ProductsWriteStore(options); + } + + [Fact] + public async Task CreateProductDetails_ShouldCreateProductAndOutboxMessage() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + var mockStockApi = new ProductsDependencyFakeType(); + var mockWarehouseApi = new ProductsDependencyFakeType(); + var mockDiscovery = new MockServiceDiscovery(); + + var useCase = new CreateProductDetails(dbContext, mockDiscovery, mockStockApi, mockWarehouseApi); + + var request = new CreateProductRequest( + new ProductDetails("Test Product", "Test Description"), + 10, + 99.99m); + + // Act + var result = await useCase.Execute(request); + + // Assert + Assert.NotNull(result); + Assert.Contains("product/", result.Url); + + // Verify product was created + var product = await dbContext.Set().FirstAsync(); + Assert.Equal("Test Product", product.Name); + Assert.Equal("Test Description", product.Description); + + // Verify outbox message was created + var outboxMessage = await dbContext.Set().FirstAsync(); + Assert.Equal(typeof(ProductCreated).AssemblyQualifiedName, outboxMessage.EventType); + Assert.Equal("internal", outboxMessage.RoutingKey); + Assert.False(outboxMessage.IsProcessed); + Assert.Contains("Test Product", outboxMessage.EventData); + } + + [Fact] + public async Task UpdateProductDetails_ShouldUpdateProductAndCreateOutboxMessage() + { + // Arrange + using var dbContext = CreateInMemoryDbContext(); + + // Create initial product + var initialProduct = new ProductDetailEntity { Name = "Original", Description = "Original Desc" }; + await dbContext.Set().AddAsync(initialProduct); + await dbContext.SaveChangesAsync(); + + var useCase = new UpdateProductDetails(dbContext); + var updatedDetails = new ProductDetails("Updated Product", "Updated Description"); + + // Act + var result = await useCase.Execute(initialProduct.Id!.Value, updatedDetails); + + // Assert + Assert.True(result); + + // Verify product was updated + var product = await dbContext.Set().FirstAsync(); + Assert.Equal("Updated Product", product.Name); + Assert.Equal("Updated Description", product.Description); + + // Verify outbox message was created + var outboxMessage = await dbContext.Set().FirstAsync(); + Assert.Equal(typeof(ProductUpdated).AssemblyQualifiedName, outboxMessage.EventType); + Assert.Equal("internal", outboxMessage.RoutingKey); + Assert.False(outboxMessage.IsProcessed); + Assert.Contains("Updated Product", outboxMessage.EventData); + } +} + +public class MockServiceDiscovery : Distribt.Shared.Discovery.IServiceDiscovery +{ + public Task GetFullAddress(string serviceKey, CancellationToken cancellationToken = default) + { + return Task.FromResult("http://localhost:5000"); + } + + public Task GetDiscoveryData(string serviceKey, CancellationToken cancellationToken = default) + { + return Task.FromResult(new Distribt.Shared.Discovery.DiscoveryData("localhost", 5000)); + } +} \ No newline at end of file diff --git a/tools/mysql/init.sql b/tools/mysql/init.sql index b990be2..9b37946 100644 --- a/tools/mysql/init.sql +++ b/tools/mysql/init.sql @@ -7,6 +7,20 @@ CREATE TABLE `Products` ( PRIMARY KEY (`Id`) ) AUTO_INCREMENT = 1; +CREATE TABLE `OutboxMessages` ( + `Id` int NOT NULL AUTO_INCREMENT, + `EventType` VARCHAR(500) NOT NULL, + `EventData` TEXT NOT NULL, + `RoutingKey` VARCHAR(50) NOT NULL, + `CreatedAt` DATETIME NOT NULL, + `ProcessedAt` DATETIME NULL, + `IsProcessed` BOOLEAN NOT NULL DEFAULT FALSE, + `ErrorMessage` TEXT NULL, + `RetryCount` int NOT NULL DEFAULT 0, + PRIMARY KEY (`Id`), + INDEX `IX_OutboxMessages_IsProcessed_CreatedAt` (`IsProcessed`, `CreatedAt`) +) AUTO_INCREMENT = 1; + INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('1', 'Producto 1', 'La descripción dice qu es el primer producto'); INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('2', 'Segundo producto', 'Este es el producto numero 2'); INSERT INTO `distribt`.`Products` (`Id`, `Name`, `Description`) VALUES ('3', 'Tercer', 'Terceras Partes nunca fueron buenas');