CQRS Flow with LiteBus
Every write to the Formation API flows through the command side of a CQRS pipeline backed by LiteBus. Commands encode intent, handlers own the transaction, and events fan out to side-effects (query-view rebuilds, notifications, search re-indexing) after the main transaction commits.
This page walks the whole flow end-to-end using Create Scheme as the worked example, and explains the architectural choices that shape it.
Table of Contents
Section titled “Table of Contents”- Shape of the Flow
- Registration
- Commands
- Handlers
- Events
- Telemetry
- End-to-End Trace:
POST /Schemes CommandResult<T>- Gotchas
Shape of the Flow
Section titled “Shape of the Flow” ┌──────────────┐HTTP POST ────▶ │ WriteCtrl │ │ (thin shell) │ └──────┬───────┘ │ CommandMediator.SendAsync(CreateSchemeCommand) ▼ ┌───────────────────────────────────────┐ │ InstrumentedCommandMediator │◀── OpenTelemetry span │ → inner LiteBus mediator │ duration histogram └──────┬────────────────────────────────┘ │ dispatches by Command type ▼ ┌───────────────────────────────────────┐ │ CreateSchemeCommandHandler │ │ 1. Build entity from JSON │ │ 2. Validate (TryValidate + domain) │ │ 3. BEGIN TRANSACTION │ │ 4. Save + apply collection ops │ │ 5. COMMIT │ │ 6. Publish SchemeCreatedEvent │◀── outside the TX │ 7. Re-read for representation │ └──────┬────────────────────────────────┘ │ returns CommandResult<Scheme> ▼ WriteController │ ▼ if HasErrors → ToErrorResult() → ProblemDetails HTTP responseOnce the event is published, a separate fan-out runs:
┌───────────────────────────────────────┐ │ InstrumentedEventMediator │◀── one span per subscriber │ → one handler per IEventHandler<T> │ └──────┬────────────────────────────────┘ │ ┌──────▼────────────────────┐ │ SchemeCreatedEventHandler │── upserts [query].SchemeList row │ │── upserts [query].AddressList row │ │── upserts [query].CompanyList rows └───────────────────────────┘Registration
Section titled “Registration”LiteBus is wired up in Program.cs by scanning the API assembly for handler implementations:
// src/services/api/app/app.api/Program.cs:298-307builder.Services.AddLiteBus(liteBus =>{ // Use a type from the API assembly to ensure handlers are found when running tests var appAssembly = typeof(FormationDbContext).Assembly;
liteBus.AddCommandModule(module => module.RegisterFromAssembly(appAssembly)); liteBus.AddQueryModule (module => module.RegisterFromAssembly(appAssembly)); liteBus.AddEventModule (module => module.RegisterFromAssembly(appAssembly));});Any class implementing ICommandHandler<TCommand, TResult>, IQueryHandler<TQuery, TResult>, or IEventHandler<TEvent> under the API assembly is picked up automatically. There is no manual registration step per handler.
Immediately after the AddLiteBus block, Formation decorates the mediators with OpenTelemetry wrappers. LiteBus registers ICommandMediator and IEventMediator directly on the service collection; Formation replaces those registrations with factory functions that construct the real LiteBus mediator and wrap it in an instrumented decorator:
// src/services/api/app/app.api/Program.cs:312-334var cmdDescriptor = builder.Services.First(d => d.ServiceType == typeof(ICommandMediator));builder.Services.Remove(cmdDescriptor);builder.Services.Add(ServiceDescriptor.Describe( typeof(ICommandMediator), sp => { var inner = (ICommandMediator)ActivatorUtilities.CreateInstance( sp, cmdDescriptor.ImplementationType!); return new InstrumentedCommandMediator(inner); }, cmdDescriptor.Lifetime));
// (same pattern for IEventMediator → InstrumentedEventMediator)Every ICommandMediator / IEventMediator dependency resolves to the instrumented wrapper. Handlers themselves are oblivious to this.
Commands
Section titled “Commands”A command is an immutable record representing intent. It carries whatever payload the handler needs and declares its return type via the ICommand<TResult> interface:
public sealed record CreateSchemeCommand( JsonElement Body, bool ReturnRepresentation) : ICommand<CommandResult<Scheme>>;Commands live in src/common/models/Commands/, grouped by entity. Keeping them in the common/models assembly (rather than the API assembly) means other services — the load job, the completion-score job — can construct and dispatch the same commands without taking a dependency on the API project.
The pattern is:
| Verb | Command shape |
|---|---|
| Create | CreateFooCommand(JsonElement body, bool returnRepresentation) |
| Patch | PatchFooCommand(int id, JsonElement patchDoc, bool returnRepresentation) |
| Replace (PUT) | ReplaceFooCommand(int id, Foo entity, bool returnRepresentation) or ReplaceFooCommand(int id, JsonElement body, bool returnRepresentation) |
| Delete | DeleteFooCommand(int id) |
Replace has two variants because some entities go through a typed shape (e.g. Scheme) and others accept raw JSON (where the controller doesn’t need the entity projected upfront).
Handlers
Section titled “Handlers”The Handler Skeleton
Section titled “The Handler Skeleton”A command handler implements ICommandHandler<TCommand, TResult> and has a single method, HandleAsync:
// src/services/api/app/app.api/Handlers/Commands/Schemes/CreateSchemeCommandHandler.cs:17-37public sealed class CreateSchemeCommandHandler : ICommandHandler<CreateSchemeCommand, CommandResult<Scheme>>{ private readonly FormationDbContext _context; private readonly IPatchCreateFactory _createFactory; private readonly IPatchRewriter _patchRewriter; private readonly IEventMediator _eventMediator; private readonly ISchemeMarketBoundaryService _schemeMarketService;
public CreateSchemeCommandHandler( FormationDbContext context, IPatchCreateFactory createFactory, IPatchRewriter patchRewriter, IEventMediator eventMediator, ISchemeMarketBoundaryService schemeMarketService) { _context = context; _createFactory = createFactory; _patchRewriter = patchRewriter; _eventMediator = eventMediator; _schemeMarketService = schemeMarketService; }
public async Task<CommandResult<Scheme>> HandleAsync( CreateSchemeCommand command, CancellationToken cancellationToken = default) { … }}Dependencies: the DbContext for persistence, services that rewrite JSON into EF-friendly shapes (IPatchCreateFactory, IPatchRewriter), the event mediator for post-commit fan-out, and any domain services specific to this entity (here, ISchemeMarketBoundaryService for spatial sync).
Entity Construction via IPatchCreateFactory
Section titled “Entity Construction via IPatchCreateFactory”Create handlers don’t deserialise the body themselves. They delegate to IPatchCreateFactory.CreateFromBodyAsync<T>, which uses the same patch machinery as PATCH to convert the inbound JSON into a fully-hydrated entity with navigation properties attached or foreign keys set correctly:
// CreateSchemeCommandHandler.cs:46-50var scheme = await _createFactory.CreateFromBodyAsync<Scheme>( _context, _patchRewriter, command.Body);This step turns, e.g., "Address": { "Id": "AD03KwA" } into scheme.AddressId = 42. See the JSON Patch guide for the full translation rules.
Validation with TryValidate
Section titled “Validation with TryValidate”After construction, the entity is validated against its data annotations:
// CreateSchemeCommandHandler.cs:52-62var errors = new List<string>();
if (!scheme.TryValidate(out var validationResults, "Scheme")){ errors.AddRange(validationResults);}
if (errors.Count > 0){ return CommandResult<Scheme>.ValidationError(errors);}TryValidate (defined in EntityValidation.cs) wraps System.ComponentModel.DataAnnotations.Validator.TryValidateObject with recursive validation across navigation properties and collections. The second argument is a path prefix — every error is emitted with that prefix so the frontend can map errors back to fields. You get strings like:
Scheme.SchemeName: The SchemeName field is required.Scheme.Companies[0].PercentageShare: Value must be between 0 and 1Scheme.Developments[2].Type: The Type field is required.On failure, the handler returns CommandResult<T>.ValidationError(errors) before opening the transaction — there’s no point starting a DB transaction for a request we’re going to reject. The write controller’s base class converts this into a 400 ProblemDetails response via ToErrorResult().
The Transaction Boundary
Section titled “The Transaction Boundary”After validation, the handler opens an explicit database transaction, saves, commits, and only then publishes events:
// CreateSchemeCommandHandler.cs:64-95await using var tx = await _context.Database.BeginTransactionAsync(cancellationToken);
await _context.CreateAsync(scheme);
// Apply collection ops (Tags, Notes) from the request bodyvar bodyPatch = ObjectToPatch.FromBody(command.Body);var rewrittenBodyPatch = await _patchRewriter.RewriteAsync<Scheme>(_context, bodyPatch);await _context.PatchWithCollectionReplaceAsync(scheme, rewrittenBodyPatch);
// … domain rule checks (see next section) …
await _schemeMarketService.SynchroniseSchemeMarketBoundariesAsync( scheme.SchemeId, cancellationToken);await _context.SaveChangesAsync(cancellationToken);await tx.CommitAsync(cancellationToken);The transaction covers:
- The primary entity insert (
CreateAsyncstages it;SaveChangesAsyncflushes). - Collection replacements — Tags, Notes, ExternalLinks — applied by
PatchWithCollectionReplaceAsync. - Any domain sync that has to be atomic with the main entity (e.g.
SynchroniseSchemeMarketBoundariesAsyncreconcilesSchemeMarketBoundaryjoin rows).
It deliberately does not cover event publishing or the representation re-read. Those happen after the commit.
Domain Rules
Section titled “Domain Rules”Data-annotation validation covers field-level rules (required, range, regex, etc). Handlers add the cross-field and cross-row rules that annotations can’t express:
// CreateSchemeCommandHandler.cs:74-90// Validate: at most one unknown company per rolevar duplicateUnknownRoles = scheme.Companies .Where(c => c.IsUnknown) .GroupBy(c => c.SchemeCompanyRoleId) .Where(g => g.Count() > 1) .ToList();if (duplicateUnknownRoles.Count > 0) return CommandResult<Scheme>.ValidationError( ["Only one unknown company is allowed per role."]);
// Validate: total percentage share per role cannot exceed 100%var overAllocatedRoles = scheme.Companies .GroupBy(c => c.SchemeCompanyRoleId) .Where(g => g.Sum(c => c.PercentageShare ?? 0) > 1m) .ToList();if (overAllocatedRoles.Count > 0) return CommandResult<Scheme>.ValidationError( ["Total percentage share per role cannot exceed 100%."]);These rules live inside the transaction scope but after the entity is staged. That’s deliberate — they need the post-creation state (collection counts, FK resolution) that wouldn’t be visible pre-save.
Returning a CommandResult<T>.ValidationError(…) from inside a transaction scope aborts cleanly: tx.CommitAsync isn’t reached, so the await using disposal rolls the transaction back automatically. The entity changes are discarded; the DB is untouched.
Event Publishing — After the Commit
Section titled “Event Publishing — After the Commit”Events are published outside the transaction:
// CreateSchemeCommandHandler.cs:97-100await _eventMediator.PublishAsync( new SchemeCreatedEvent(scheme.SchemeId), cancellationToken);The rationale is reliability asymmetry:
- An event published inside the transaction that succeeds but whose parent transaction later aborts would produce a phantom event — subscribers would react to a creation that didn’t happen.
- An event published after commit means the worst case is that the event fires for a real row but one or more of its subscribers fails. That’s recoverable: query-view rebuilds are idempotent, and a later
RebuildQueryViewsWorkerrun will re-sync anyway.
The trade-off: if the process crashes between CommitAsync and PublishAsync, the event is lost. For Formation’s workload (query-view denormalisation, audit logging) this is acceptable because all downstream state can be rebuilt from the source tables. Use cases that need true “exactly-once” event delivery would need outbox-pattern machinery, which Formation does not currently have.
Re-reading for Representation
Section titled “Re-reading for Representation”When the caller asked for Prefer: return=representation, the handler re-queries the entity with a heavy Include graph to produce the response shape:
// CreateSchemeCommandHandler.cs:102-133var result = await _context.Schemes .AsNoTracking() .AsSplitQuery() .Include(s => s.Address) .Include(s => s.BuildingType) .ThenInclude(bt => bt!.ParentBuildingType) .Include(s => s.Developments) .ThenInclude(d => d.Type) .ThenInclude(dt => dt!.ParentDevelopmentType) // … ~15 more Includes … .FirstAsync(s => s.SchemeId == scheme.SchemeId, cancellationToken);
await _context.LoadPolymorphicCollectionsAsync( [result], LinkTableType.Scheme, s => s.SchemeId, (s, notes) => s.Notes = notes, (s, tags) => s.Tags = tags, (s, externalLinks) => s.ExternalLinks = externalLinks);
return CommandResult<Scheme>.Ok(result);This is expensive — it’s a second full read of the just-saved graph. Callers who don’t need the body should send Prefer: return=minimal (or omit the header) so the handler can return a lean CommandResult<Scheme>.Ok(scheme) without the re-query. The controller only enters this branch when command.ReturnRepresentation == true.
Events
Section titled “Events”Event Shape
Section titled “Event Shape”Events are immutable records in src/common/models/Events/. They carry just enough information to look up the affected entity — typically a primary key plus any denormalised data the subscribers need:
public sealed record SchemeCreatedEvent(int SchemeId) : IEvent;Events deliberately do not carry the entity itself. Subscribers that need the full state re-read it from the DB. This keeps the event payload small, avoids stale-snapshot bugs, and decouples the event shape from entity evolution.
Event Handler Responsibilities
Section titled “Event Handler Responsibilities”Event handlers implement IEventHandler<TEvent>. They do the post-write fan-out work:
public class SchemeCreatedEventHandler : IEventHandler<SchemeCreatedEvent>{ …
public async Task HandleAsync( SchemeCreatedEvent @event, CancellationToken cancellationToken = default) { _logger.LogInformation("Scheme created with ID: {SchemeId}", @event.SchemeId); await _schemeListViewService.UpsertSchemeAsync(@event.SchemeId, cancellationToken);
// Update address list view (counts and sectors changed) var addressId = await _context.Set<Scheme>() .Where(s => s.SchemeId == @event.SchemeId) .Select(s => s.AddressId) .FirstOrDefaultAsync(cancellationToken);
if (addressId > 0) await _addressListViewService.UpsertAddressAsync(addressId, cancellationToken);
// Update company list views for all companies linked to this scheme var companyIds = await _context.Set<SchemeCompany>() .Where(sc => sc.SchemeId == @event.SchemeId && sc.CompanyId != null) .Select(sc => sc.CompanyId!.Value) .Distinct() .ToListAsync(cancellationToken);
foreach (var companyId in companyIds) await _companyListViewService.UpsertCompanyAsync(companyId, cancellationToken); }}The typical shape is “update every query-view row that this change might have invalidated”. A new scheme changes the scheme list, the parent address’s scheme count, and the linked companies’ portfolio counts — so three query views get upserted. See Query Views for how those services write to [query].*List.
Other handler responsibilities (less common but pattern-compatible): audit logging, cache invalidation, notification dispatch. Each kind of side-effect is its own handler class; events can have multiple subscribers.
Fan-out and Failure Modes
Section titled “Fan-out and Failure Modes”LiteBus invokes every registered IEventHandler<T> for an event in sequence (per the current LiteBus default). Each handler runs in its own instrumented span. Key properties:
- Handlers don’t share a transaction. Each handler manages its own DB work. One handler’s failure does not roll back another handler’s successful upserts.
- Exceptions bubble up. If a handler throws, LiteBus surfaces the exception to the caller (
PublishAsync). The command handler has already completed and committed by this point, so an event-handler failure does not roll back the main entity save. - Retries are the caller’s problem. Formation does not retry failed event handlers. Recovery is via the query-view rebuild job, which reconstructs every row from source tables and can be run on demand.
- Ordering is not guaranteed. Don’t depend on handler A running before handler B. Write each handler as if it’s the only one.
Telemetry
Section titled “Telemetry”InstrumentedCommandMediator
Section titled “InstrumentedCommandMediator”Every command dispatch is wrapped in an Activity span and a duration histogram:
// src/services/api/app/app.api/Services/Telemetry/InstrumentedCommandMediator.cs:20-49public async Task<TCommandResult> SendAsync<TCommandResult>( ICommand<TCommandResult> command, CommandMediationSettings? settings = null, CancellationToken cancellationToken = default){ var commandName = command.GetType().Name; using var activity = FormationTelemetry.Handlers.StartActivity(commandName); activity?.SetTag("command.type", commandName);
var sw = Stopwatch.StartNew(); try { var result = await _inner.SendAsync(command, settings, cancellationToken); activity?.SetTag("command.success", true); return result; } catch (Exception ex) { activity?.SetStatus(ActivityStatusCode.Error, ex.Message); activity?.SetTag("command.success", false); throw; } finally { sw.Stop(); FormationTelemetry.HandlerDuration.Record( sw.Elapsed.TotalMilliseconds, new KeyValuePair<string, object?>("command", commandName)); }}Every command gets a span named after the command type (CreateSchemeCommand, PatchAddressCommand, …), tagged with success/failure and surfaced through Azure Monitor in deployed environments. The duration histogram is labelled by command name, so you can chart the p50/p95/p99 for each command separately.
InstrumentedEventMediator does the same for PublishAsync with activity names like "Event:SchemeCreatedEvent".
FormationTelemetry Sources and Meters
Section titled “FormationTelemetry Sources and Meters”Central registration lives in FormationTelemetry.cs:
public static readonly ActivitySource Handlers = new("Formation.Handlers");public static readonly Meter ApiMeter = new("Formation.API");
public static readonly Histogram<double> HandlerDuration = ApiMeter.CreateHistogram<double>("formation.handler.duration", "ms", "Duration of command handler execution");
public static readonly Histogram<double> EventDuration = ApiMeter.CreateHistogram<double>("formation.event.duration", "ms", "Duration of event handler execution");Formation.Handlers is one of the tracing sources added to OpenTelemetry:
// Program.cs:147-162 (paraphrased)builder.Services.AddOpenTelemetry() .WithTracing(tracing => tracing .AddSource("Formation.Browser") .AddSource("Formation.Handlers") .AddSource("Formation.Search")) .WithMetrics(metrics => metrics.AddMeter("Formation.API"));In production, these roll into Azure Monitor via UseAzureMonitor. In development, metrics go to the console. See Deployment Topology for the full observability pipeline.
End-to-End Trace: POST /Schemes
Section titled “End-to-End Trace: POST /Schemes”Putting it all together, here is every step that runs when a client creates a scheme:
1. HTTP POST /Schemes Headers: Authorization: Bearer …, Prefer: return=representation Body: { "SchemeName": "…", "Address": { "Id": "AD03KwA" }, … }
2. Authentication middleware validates the JWT and attaches claims.
3. AuthorizeFilter checks the default policy (authenticated + User/Admin role).
4. MVC routes POST /Schemes to SchemesWriteController.Post (inherited from EntityWriteControllerBase).
5. Post() reads the Prefer header and calls CreateCommand(body, true) to build a CreateSchemeCommand(body, returnRepresentation: true).
6. CommandMediator.SendAsync is called. → resolves to InstrumentedCommandMediator → starts span "CreateSchemeCommand", tags command.type → delegates to the inner LiteBus mediator
7. LiteBus dispatches to CreateSchemeCommandHandler by command type.
8. CreateSchemeCommandHandler.HandleAsync: a. _createFactory.CreateFromBodyAsync builds a Scheme entity, rewriting /Address/Id → AddressId, /BuildingType/Id → BuildingTypeId. b. scheme.TryValidate(out errors, "Scheme") runs data-annotation validation recursively. Any failures return CommandResult .ValidationError(errors) immediately. c. _context.Database.BeginTransactionAsync opens a SQL transaction. d. _context.CreateAsync(scheme) stages the insert. e. PatchWithCollectionReplaceAsync applies Tags/Notes from the body. f. Domain rule checks (unique unknown companies, percentage share ≤ 100%). Failure → ValidationError → tx auto-rolls back. g. SchemeMarketBoundaryService synchronises spatial joins. h. _context.SaveChangesAsync flushes inserts to SQL. i. tx.CommitAsync commits.
9. _eventMediator.PublishAsync(new SchemeCreatedEvent(scheme.SchemeId)) fires OUTSIDE the transaction. → InstrumentedEventMediator starts span "Event:SchemeCreatedEvent" → LiteBus invokes SchemeCreatedEventHandler
10. SchemeCreatedEventHandler: - Upserts [query].SchemeList row for the new scheme. - Upserts [query].AddressList row for the parent address (sector counts changed). - Upserts [query].CompanyList rows for every linked company (portfolio counts changed).
11. Back in CreateSchemeCommandHandler, the handler re-reads the scheme with a heavy .Include graph and calls LoadPolymorphicCollectionsAsync for Notes/Tags/ExternalLinks.
12. Handler returns CommandResult<Scheme>.Ok(result) to the mediator. InstrumentedCommandMediator tags command.success=true and records the duration histogram.
13. Controller checks result.HasErrors (false), calls ApplyPreferenceHeader (Preference-Applied: return=representation), returns CreatedAtAction(nameof(Get), new { key = entity.Id }, entity).
14. HTTP response: Status: 201 Created Location: /Schemes/SC1b2Cd Preference-Applied: return=representation Content-Type: application/json Body: { "Id": "SC1b2Cd", "SchemeId": 42, "SchemeName": "…", … }On an error path, step 8b or 8f fails, the transaction rolls back via await using, the handler returns CommandResult<Scheme>.ValidationError(errors), and step 13 becomes return result.ToErrorResult() — producing a 400 ProblemDetails body with the error list in extensions.errors. The event fan-out (steps 9-10) does not run on failure.
CommandResult<T>
Section titled “CommandResult<T>”Every command handler returns a CommandResult<T> (or CommandResult for operations with no payload, like delete). The shape lives in src/common/models/Models/CommandResult.cs:
public class CommandResult<T>{ public T? Data { get; init; } public IReadOnlyList<string> Errors { get; init; } public ErrorType ErrorType { get; init; } public bool HasErrors => Errors.Any();
public static CommandResult<T> Ok(T data) => … public static CommandResult<T> NotFound(string message) => … public static CommandResult<T> ValidationError(IEnumerable<string> errors) => … public static CommandResult<T> Conflict(IEnumerable<string> errors) => …}Factory methods cover the standard outcomes. Controllers never branch on ErrorType directly — ToErrorResult() handles the mapping to ProblemDetails (see Controller Pattern).
Gotchas
Section titled “Gotchas”-
Events are published outside the transaction. Handlers that publish inside
await using var tx = …will emit phantom events on rollback. The pattern is always: save, commit, publish. -
Don’t validate after
BeginTransactionAsync. The resource cost of an open transaction is non-trivial; reject invalid requests before opening one. Handlers follow the patternconstruct → validate → open tx → save → commit → publish. -
Returning
ValidationErrorfrom inside a transaction rolls it back. Theawait usingdisposal pattern handles this automatically. Don’t callRollbackAsyncexplicitly — it’ll race with the dispose. -
Event handlers are not transactional. If one handler fails, others may have already succeeded. Recovery is via the rebuild job, not retry.
-
Event payload must be a key, not an entity. Passing the entity by value produces stale-snapshot bugs and couples the event shape to entity evolution. Pass
SchemeCreatedEvent(int SchemeId)and let handlers re-read. -
Prefer: return=representationdoubles DB cost. The representation re-read is a separate query with the fullIncludegraph. Clients that don’t need the body should omit the header. -
_createFactory.CreateFromBodyAsyncis not the same asJsonSerializer.Deserialize. It runs the patch rewriter, translates nested-object references to FKs, and attaches existing related entities. Bypassing it with a manual deserialise will produce entities that EF treats as brand-new (double-inserting related rows). -
TryValidaterecurses into navigation properties. That’s intentional —scheme.Companies[0].PercentageShareis validated as part ofscheme.TryValidate(). Beware of circular references in test fixtures: unlike EF, the validator doesn’t track visited objects. -
Event handler ordering is undefined. Don’t write handler A on the assumption that handler B has already run. If you need ordered side-effects, do them sequentially in a single handler.
-
Global authorization doesn’t apply to background jobs. The job service (
src/services/job/…) dispatches commands through its own LiteBus mediator without the HTTP auth pipeline. Commands must not assume aClaimsPrincipalis present — if audit stamping needs a user, it must come through the command payload.
See also
Section titled “See also”- Dual controller pattern — how HTTP routes into commands
- Query views — what event handlers write to
- JSON Patch — how the inbound JSON gets translated to FK paths
- EF Core interceptors — audit and soft-delete logic that runs inside
SaveChanges - Deployment topology — where telemetry lands in Azure Monitor