Skip to content

CQRS Flow with LiteBus

Every write to the Formation API flows through the command side of a CQRS pipeline backed by LiteBus. Commands encode intent, handlers own the transaction, and events fan out to side-effects (query-view rebuilds, notifications, search re-indexing) after the main transaction commits.

This page walks the whole flow end-to-end using Create Scheme as the worked example, and explains the architectural choices that shape it.

┌──────────────┐
HTTP POST ────▶ │ WriteCtrl │
│ (thin shell) │
└──────┬───────┘
│ CommandMediator.SendAsync(CreateSchemeCommand)
┌───────────────────────────────────────┐
│ InstrumentedCommandMediator │◀── OpenTelemetry span
│ → inner LiteBus mediator │ duration histogram
└──────┬────────────────────────────────┘
│ dispatches by Command type
┌───────────────────────────────────────┐
│ CreateSchemeCommandHandler │
│ 1. Build entity from JSON │
│ 2. Validate (TryValidate + domain) │
│ 3. BEGIN TRANSACTION │
│ 4. Save + apply collection ops │
│ 5. COMMIT │
│ 6. Publish SchemeCreatedEvent │◀── outside the TX
│ 7. Re-read for representation │
└──────┬────────────────────────────────┘
│ returns CommandResult<Scheme>
WriteController
▼ if HasErrors → ToErrorResult() → ProblemDetails
HTTP response

Once the event is published, a separate fan-out runs:

┌───────────────────────────────────────┐
│ InstrumentedEventMediator │◀── one span per subscriber
│ → one handler per IEventHandler<T> │
└──────┬────────────────────────────────┘
┌──────▼────────────────────┐
│ SchemeCreatedEventHandler │── upserts [query].SchemeList row
│ │── upserts [query].AddressList row
│ │── upserts [query].CompanyList rows
└───────────────────────────┘

LiteBus is wired up in Program.cs by scanning the API assembly for handler implementations:

// src/services/api/app/app.api/Program.cs:298-307
builder.Services.AddLiteBus(liteBus =>
{
// Use a type from the API assembly to ensure handlers are found when running tests
var appAssembly = typeof(FormationDbContext).Assembly;
liteBus.AddCommandModule(module => module.RegisterFromAssembly(appAssembly));
liteBus.AddQueryModule (module => module.RegisterFromAssembly(appAssembly));
liteBus.AddEventModule (module => module.RegisterFromAssembly(appAssembly));
});

Any class implementing ICommandHandler<TCommand, TResult>, IQueryHandler<TQuery, TResult>, or IEventHandler<TEvent> under the API assembly is picked up automatically. There is no manual registration step per handler.

Immediately after the AddLiteBus block, Formation decorates the mediators with OpenTelemetry wrappers. LiteBus registers ICommandMediator and IEventMediator directly on the service collection; Formation replaces those registrations with factory functions that construct the real LiteBus mediator and wrap it in an instrumented decorator:

// src/services/api/app/app.api/Program.cs:312-334
var cmdDescriptor = builder.Services.First(d => d.ServiceType == typeof(ICommandMediator));
builder.Services.Remove(cmdDescriptor);
builder.Services.Add(ServiceDescriptor.Describe(
typeof(ICommandMediator),
sp =>
{
var inner = (ICommandMediator)ActivatorUtilities.CreateInstance(
sp, cmdDescriptor.ImplementationType!);
return new InstrumentedCommandMediator(inner);
},
cmdDescriptor.Lifetime));
// (same pattern for IEventMediator → InstrumentedEventMediator)

Every ICommandMediator / IEventMediator dependency resolves to the instrumented wrapper. Handlers themselves are oblivious to this.

A command is an immutable record representing intent. It carries whatever payload the handler needs and declares its return type via the ICommand<TResult> interface:

src/common/models/Commands/Schemes/CreateSchemeCommand.cs
public sealed record CreateSchemeCommand(
JsonElement Body,
bool ReturnRepresentation
) : ICommand<CommandResult<Scheme>>;

Commands live in src/common/models/Commands/, grouped by entity. Keeping them in the common/models assembly (rather than the API assembly) means other services — the load job, the completion-score job — can construct and dispatch the same commands without taking a dependency on the API project.

The pattern is:

VerbCommand shape
CreateCreateFooCommand(JsonElement body, bool returnRepresentation)
PatchPatchFooCommand(int id, JsonElement patchDoc, bool returnRepresentation)
Replace (PUT)ReplaceFooCommand(int id, Foo entity, bool returnRepresentation) or ReplaceFooCommand(int id, JsonElement body, bool returnRepresentation)
DeleteDeleteFooCommand(int id)

Replace has two variants because some entities go through a typed shape (e.g. Scheme) and others accept raw JSON (where the controller doesn’t need the entity projected upfront).

A command handler implements ICommandHandler<TCommand, TResult> and has a single method, HandleAsync:

// src/services/api/app/app.api/Handlers/Commands/Schemes/CreateSchemeCommandHandler.cs:17-37
public sealed class CreateSchemeCommandHandler
: ICommandHandler<CreateSchemeCommand, CommandResult<Scheme>>
{
private readonly FormationDbContext _context;
private readonly IPatchCreateFactory _createFactory;
private readonly IPatchRewriter _patchRewriter;
private readonly IEventMediator _eventMediator;
private readonly ISchemeMarketBoundaryService _schemeMarketService;
public CreateSchemeCommandHandler(
FormationDbContext context,
IPatchCreateFactory createFactory,
IPatchRewriter patchRewriter,
IEventMediator eventMediator,
ISchemeMarketBoundaryService schemeMarketService)
{
_context = context;
_createFactory = createFactory;
_patchRewriter = patchRewriter;
_eventMediator = eventMediator;
_schemeMarketService = schemeMarketService;
}
public async Task<CommandResult<Scheme>> HandleAsync(
CreateSchemeCommand command,
CancellationToken cancellationToken = default)
{ … }
}

Dependencies: the DbContext for persistence, services that rewrite JSON into EF-friendly shapes (IPatchCreateFactory, IPatchRewriter), the event mediator for post-commit fan-out, and any domain services specific to this entity (here, ISchemeMarketBoundaryService for spatial sync).

Entity Construction via IPatchCreateFactory

Section titled “Entity Construction via IPatchCreateFactory”

Create handlers don’t deserialise the body themselves. They delegate to IPatchCreateFactory.CreateFromBodyAsync<T>, which uses the same patch machinery as PATCH to convert the inbound JSON into a fully-hydrated entity with navigation properties attached or foreign keys set correctly:

// CreateSchemeCommandHandler.cs:46-50
var scheme = await _createFactory.CreateFromBodyAsync<Scheme>(
_context,
_patchRewriter,
command.Body
);

This step turns, e.g., "Address": { "Id": "AD03KwA" } into scheme.AddressId = 42. See the JSON Patch guide for the full translation rules.

After construction, the entity is validated against its data annotations:

// CreateSchemeCommandHandler.cs:52-62
var errors = new List<string>();
if (!scheme.TryValidate(out var validationResults, "Scheme"))
{
errors.AddRange(validationResults);
}
if (errors.Count > 0)
{
return CommandResult<Scheme>.ValidationError(errors);
}

TryValidate (defined in EntityValidation.cs) wraps System.ComponentModel.DataAnnotations.Validator.TryValidateObject with recursive validation across navigation properties and collections. The second argument is a path prefix — every error is emitted with that prefix so the frontend can map errors back to fields. You get strings like:

Scheme.SchemeName: The SchemeName field is required.
Scheme.Companies[0].PercentageShare: Value must be between 0 and 1
Scheme.Developments[2].Type: The Type field is required.

On failure, the handler returns CommandResult<T>.ValidationError(errors) before opening the transaction — there’s no point starting a DB transaction for a request we’re going to reject. The write controller’s base class converts this into a 400 ProblemDetails response via ToErrorResult().

After validation, the handler opens an explicit database transaction, saves, commits, and only then publishes events:

// CreateSchemeCommandHandler.cs:64-95
await using var tx = await _context.Database.BeginTransactionAsync(cancellationToken);
await _context.CreateAsync(scheme);
// Apply collection ops (Tags, Notes) from the request body
var bodyPatch = ObjectToPatch.FromBody(command.Body);
var rewrittenBodyPatch = await _patchRewriter.RewriteAsync<Scheme>(_context, bodyPatch);
await _context.PatchWithCollectionReplaceAsync(scheme, rewrittenBodyPatch);
// … domain rule checks (see next section) …
await _schemeMarketService.SynchroniseSchemeMarketBoundariesAsync(
scheme.SchemeId, cancellationToken);
await _context.SaveChangesAsync(cancellationToken);
await tx.CommitAsync(cancellationToken);

The transaction covers:

  • The primary entity insert (CreateAsync stages it; SaveChangesAsync flushes).
  • Collection replacements — Tags, Notes, ExternalLinks — applied by PatchWithCollectionReplaceAsync.
  • Any domain sync that has to be atomic with the main entity (e.g. SynchroniseSchemeMarketBoundariesAsync reconciles SchemeMarketBoundary join rows).

It deliberately does not cover event publishing or the representation re-read. Those happen after the commit.

Data-annotation validation covers field-level rules (required, range, regex, etc). Handlers add the cross-field and cross-row rules that annotations can’t express:

// CreateSchemeCommandHandler.cs:74-90
// Validate: at most one unknown company per role
var duplicateUnknownRoles = scheme.Companies
.Where(c => c.IsUnknown)
.GroupBy(c => c.SchemeCompanyRoleId)
.Where(g => g.Count() > 1)
.ToList();
if (duplicateUnknownRoles.Count > 0)
return CommandResult<Scheme>.ValidationError(
["Only one unknown company is allowed per role."]);
// Validate: total percentage share per role cannot exceed 100%
var overAllocatedRoles = scheme.Companies
.GroupBy(c => c.SchemeCompanyRoleId)
.Where(g => g.Sum(c => c.PercentageShare ?? 0) > 1m)
.ToList();
if (overAllocatedRoles.Count > 0)
return CommandResult<Scheme>.ValidationError(
["Total percentage share per role cannot exceed 100%."]);

These rules live inside the transaction scope but after the entity is staged. That’s deliberate — they need the post-creation state (collection counts, FK resolution) that wouldn’t be visible pre-save.

Returning a CommandResult<T>.ValidationError(…) from inside a transaction scope aborts cleanly: tx.CommitAsync isn’t reached, so the await using disposal rolls the transaction back automatically. The entity changes are discarded; the DB is untouched.

Events are published outside the transaction:

// CreateSchemeCommandHandler.cs:97-100
await _eventMediator.PublishAsync(
new SchemeCreatedEvent(scheme.SchemeId),
cancellationToken
);

The rationale is reliability asymmetry:

  • An event published inside the transaction that succeeds but whose parent transaction later aborts would produce a phantom event — subscribers would react to a creation that didn’t happen.
  • An event published after commit means the worst case is that the event fires for a real row but one or more of its subscribers fails. That’s recoverable: query-view rebuilds are idempotent, and a later RebuildQueryViewsWorker run will re-sync anyway.

The trade-off: if the process crashes between CommitAsync and PublishAsync, the event is lost. For Formation’s workload (query-view denormalisation, audit logging) this is acceptable because all downstream state can be rebuilt from the source tables. Use cases that need true “exactly-once” event delivery would need outbox-pattern machinery, which Formation does not currently have.

When the caller asked for Prefer: return=representation, the handler re-queries the entity with a heavy Include graph to produce the response shape:

// CreateSchemeCommandHandler.cs:102-133
var result = await _context.Schemes
.AsNoTracking()
.AsSplitQuery()
.Include(s => s.Address)
.Include(s => s.BuildingType)
.ThenInclude(bt => bt!.ParentBuildingType)
.Include(s => s.Developments)
.ThenInclude(d => d.Type)
.ThenInclude(dt => dt!.ParentDevelopmentType)
// … ~15 more Includes …
.FirstAsync(s => s.SchemeId == scheme.SchemeId, cancellationToken);
await _context.LoadPolymorphicCollectionsAsync(
[result],
LinkTableType.Scheme,
s => s.SchemeId,
(s, notes) => s.Notes = notes,
(s, tags) => s.Tags = tags,
(s, externalLinks) => s.ExternalLinks = externalLinks
);
return CommandResult<Scheme>.Ok(result);

This is expensive — it’s a second full read of the just-saved graph. Callers who don’t need the body should send Prefer: return=minimal (or omit the header) so the handler can return a lean CommandResult<Scheme>.Ok(scheme) without the re-query. The controller only enters this branch when command.ReturnRepresentation == true.

Events are immutable records in src/common/models/Events/. They carry just enough information to look up the affected entity — typically a primary key plus any denormalised data the subscribers need:

src/common/models/Events/Schemes/SchemeCreatedEvent.cs
public sealed record SchemeCreatedEvent(int SchemeId) : IEvent;

Events deliberately do not carry the entity itself. Subscribers that need the full state re-read it from the DB. This keeps the event payload small, avoids stale-snapshot bugs, and decouples the event shape from entity evolution.

Event handlers implement IEventHandler<TEvent>. They do the post-write fan-out work:

src/services/api/app/app.api/Handlers/Events/Schemes/SchemeCreatedEventHandler.cs
public class SchemeCreatedEventHandler : IEventHandler<SchemeCreatedEvent>
{
public async Task HandleAsync(
SchemeCreatedEvent @event,
CancellationToken cancellationToken = default)
{
_logger.LogInformation("Scheme created with ID: {SchemeId}", @event.SchemeId);
await _schemeListViewService.UpsertSchemeAsync(@event.SchemeId, cancellationToken);
// Update address list view (counts and sectors changed)
var addressId = await _context.Set<Scheme>()
.Where(s => s.SchemeId == @event.SchemeId)
.Select(s => s.AddressId)
.FirstOrDefaultAsync(cancellationToken);
if (addressId > 0)
await _addressListViewService.UpsertAddressAsync(addressId, cancellationToken);
// Update company list views for all companies linked to this scheme
var companyIds = await _context.Set<SchemeCompany>()
.Where(sc => sc.SchemeId == @event.SchemeId && sc.CompanyId != null)
.Select(sc => sc.CompanyId!.Value)
.Distinct()
.ToListAsync(cancellationToken);
foreach (var companyId in companyIds)
await _companyListViewService.UpsertCompanyAsync(companyId, cancellationToken);
}
}

The typical shape is “update every query-view row that this change might have invalidated”. A new scheme changes the scheme list, the parent address’s scheme count, and the linked companies’ portfolio counts — so three query views get upserted. See Query Views for how those services write to [query].*List.

Other handler responsibilities (less common but pattern-compatible): audit logging, cache invalidation, notification dispatch. Each kind of side-effect is its own handler class; events can have multiple subscribers.

LiteBus invokes every registered IEventHandler<T> for an event in sequence (per the current LiteBus default). Each handler runs in its own instrumented span. Key properties:

  • Handlers don’t share a transaction. Each handler manages its own DB work. One handler’s failure does not roll back another handler’s successful upserts.
  • Exceptions bubble up. If a handler throws, LiteBus surfaces the exception to the caller (PublishAsync). The command handler has already completed and committed by this point, so an event-handler failure does not roll back the main entity save.
  • Retries are the caller’s problem. Formation does not retry failed event handlers. Recovery is via the query-view rebuild job, which reconstructs every row from source tables and can be run on demand.
  • Ordering is not guaranteed. Don’t depend on handler A running before handler B. Write each handler as if it’s the only one.

Every command dispatch is wrapped in an Activity span and a duration histogram:

// src/services/api/app/app.api/Services/Telemetry/InstrumentedCommandMediator.cs:20-49
public async Task<TCommandResult> SendAsync<TCommandResult>(
ICommand<TCommandResult> command,
CommandMediationSettings? settings = null,
CancellationToken cancellationToken = default)
{
var commandName = command.GetType().Name;
using var activity = FormationTelemetry.Handlers.StartActivity(commandName);
activity?.SetTag("command.type", commandName);
var sw = Stopwatch.StartNew();
try
{
var result = await _inner.SendAsync(command, settings, cancellationToken);
activity?.SetTag("command.success", true);
return result;
}
catch (Exception ex)
{
activity?.SetStatus(ActivityStatusCode.Error, ex.Message);
activity?.SetTag("command.success", false);
throw;
}
finally
{
sw.Stop();
FormationTelemetry.HandlerDuration.Record(
sw.Elapsed.TotalMilliseconds,
new KeyValuePair<string, object?>("command", commandName));
}
}

Every command gets a span named after the command type (CreateSchemeCommand, PatchAddressCommand, …), tagged with success/failure and surfaced through Azure Monitor in deployed environments. The duration histogram is labelled by command name, so you can chart the p50/p95/p99 for each command separately.

InstrumentedEventMediator does the same for PublishAsync with activity names like "Event:SchemeCreatedEvent".

Central registration lives in FormationTelemetry.cs:

public static readonly ActivitySource Handlers = new("Formation.Handlers");
public static readonly Meter ApiMeter = new("Formation.API");
public static readonly Histogram<double> HandlerDuration =
ApiMeter.CreateHistogram<double>("formation.handler.duration", "ms",
"Duration of command handler execution");
public static readonly Histogram<double> EventDuration =
ApiMeter.CreateHistogram<double>("formation.event.duration", "ms",
"Duration of event handler execution");

Formation.Handlers is one of the tracing sources added to OpenTelemetry:

// Program.cs:147-162 (paraphrased)
builder.Services.AddOpenTelemetry()
.WithTracing(tracing => tracing
.AddSource("Formation.Browser")
.AddSource("Formation.Handlers")
.AddSource("Formation.Search"))
.WithMetrics(metrics => metrics.AddMeter("Formation.API"));

In production, these roll into Azure Monitor via UseAzureMonitor. In development, metrics go to the console. See Deployment Topology for the full observability pipeline.

Putting it all together, here is every step that runs when a client creates a scheme:

1. HTTP POST /Schemes
Headers: Authorization: Bearer …, Prefer: return=representation
Body: { "SchemeName": "…", "Address": { "Id": "AD03KwA" }, … }
2. Authentication middleware validates the JWT and attaches claims.
3. AuthorizeFilter checks the default policy (authenticated + User/Admin role).
4. MVC routes POST /Schemes to SchemesWriteController.Post (inherited from
EntityWriteControllerBase).
5. Post() reads the Prefer header and calls CreateCommand(body, true)
to build a CreateSchemeCommand(body, returnRepresentation: true).
6. CommandMediator.SendAsync is called.
→ resolves to InstrumentedCommandMediator
→ starts span "CreateSchemeCommand", tags command.type
→ delegates to the inner LiteBus mediator
7. LiteBus dispatches to CreateSchemeCommandHandler by command type.
8. CreateSchemeCommandHandler.HandleAsync:
a. _createFactory.CreateFromBodyAsync builds a Scheme entity,
rewriting /Address/Id → AddressId, /BuildingType/Id → BuildingTypeId.
b. scheme.TryValidate(out errors, "Scheme") runs data-annotation
validation recursively. Any failures return CommandResult
.ValidationError(errors) immediately.
c. _context.Database.BeginTransactionAsync opens a SQL transaction.
d. _context.CreateAsync(scheme) stages the insert.
e. PatchWithCollectionReplaceAsync applies Tags/Notes from the body.
f. Domain rule checks (unique unknown companies, percentage share
≤ 100%). Failure → ValidationError → tx auto-rolls back.
g. SchemeMarketBoundaryService synchronises spatial joins.
h. _context.SaveChangesAsync flushes inserts to SQL.
i. tx.CommitAsync commits.
9. _eventMediator.PublishAsync(new SchemeCreatedEvent(scheme.SchemeId))
fires OUTSIDE the transaction.
→ InstrumentedEventMediator starts span "Event:SchemeCreatedEvent"
→ LiteBus invokes SchemeCreatedEventHandler
10. SchemeCreatedEventHandler:
- Upserts [query].SchemeList row for the new scheme.
- Upserts [query].AddressList row for the parent address (sector
counts changed).
- Upserts [query].CompanyList rows for every linked company
(portfolio counts changed).
11. Back in CreateSchemeCommandHandler, the handler re-reads the scheme
with a heavy .Include graph and calls LoadPolymorphicCollectionsAsync
for Notes/Tags/ExternalLinks.
12. Handler returns CommandResult<Scheme>.Ok(result) to the mediator.
InstrumentedCommandMediator tags command.success=true and records
the duration histogram.
13. Controller checks result.HasErrors (false), calls ApplyPreferenceHeader
(Preference-Applied: return=representation), returns
CreatedAtAction(nameof(Get), new { key = entity.Id }, entity).
14. HTTP response:
Status: 201 Created
Location: /Schemes/SC1b2Cd
Preference-Applied: return=representation
Content-Type: application/json
Body: { "Id": "SC1b2Cd", "SchemeId": 42, "SchemeName": "…", … }

On an error path, step 8b or 8f fails, the transaction rolls back via await using, the handler returns CommandResult<Scheme>.ValidationError(errors), and step 13 becomes return result.ToErrorResult() — producing a 400 ProblemDetails body with the error list in extensions.errors. The event fan-out (steps 9-10) does not run on failure.

Every command handler returns a CommandResult<T> (or CommandResult for operations with no payload, like delete). The shape lives in src/common/models/Models/CommandResult.cs:

public class CommandResult<T>
{
public T? Data { get; init; }
public IReadOnlyList<string> Errors { get; init; }
public ErrorType ErrorType { get; init; }
public bool HasErrors => Errors.Any();
public static CommandResult<T> Ok(T data) =>
public static CommandResult<T> NotFound(string message) =>
public static CommandResult<T> ValidationError(IEnumerable<string> errors) =>
public static CommandResult<T> Conflict(IEnumerable<string> errors) =>
}

Factory methods cover the standard outcomes. Controllers never branch on ErrorType directly — ToErrorResult() handles the mapping to ProblemDetails (see Controller Pattern).

  1. Events are published outside the transaction. Handlers that publish inside await using var tx = … will emit phantom events on rollback. The pattern is always: save, commit, publish.

  2. Don’t validate after BeginTransactionAsync. The resource cost of an open transaction is non-trivial; reject invalid requests before opening one. Handlers follow the pattern construct → validate → open tx → save → commit → publish.

  3. Returning ValidationError from inside a transaction rolls it back. The await using disposal pattern handles this automatically. Don’t call RollbackAsync explicitly — it’ll race with the dispose.

  4. Event handlers are not transactional. If one handler fails, others may have already succeeded. Recovery is via the rebuild job, not retry.

  5. Event payload must be a key, not an entity. Passing the entity by value produces stale-snapshot bugs and couples the event shape to entity evolution. Pass SchemeCreatedEvent(int SchemeId) and let handlers re-read.

  6. Prefer: return=representation doubles DB cost. The representation re-read is a separate query with the full Include graph. Clients that don’t need the body should omit the header.

  7. _createFactory.CreateFromBodyAsync is not the same as JsonSerializer.Deserialize. It runs the patch rewriter, translates nested-object references to FKs, and attaches existing related entities. Bypassing it with a manual deserialise will produce entities that EF treats as brand-new (double-inserting related rows).

  8. TryValidate recurses into navigation properties. That’s intentional — scheme.Companies[0].PercentageShare is validated as part of scheme.TryValidate(). Beware of circular references in test fixtures: unlike EF, the validator doesn’t track visited objects.

  9. Event handler ordering is undefined. Don’t write handler A on the assumption that handler B has already run. If you need ordered side-effects, do them sequentially in a single handler.

  10. Global authorization doesn’t apply to background jobs. The job service (src/services/job/…) dispatches commands through its own LiteBus mediator without the HTTP auth pipeline. Commands must not assume a ClaimsPrincipal is present — if audit stamping needs a user, it must come through the command payload.