Advertisement
Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- using System;
- using System.Reflection;
- using CommonDomain;
- using CommonDomain.Core;
- using CommonDomain.Persistence;
- using CommonDomain.Persistence.EventStore;
- using EventStore;
- using EventStore.Dispatcher;
- using MemBus;
- using MemBus.Configurators;
- using MemBus.Subscribing;
- using Shouldly;
- using Xunit;
- //These are the nuget packages I'm using
- /*<?xml version="1.0" encoding="utf-8"?>
- <packages>
- <package id="CommonDomain" version="1.4.0" targetFramework="net40" />
- <package id="EventStore" version="3.0.11326.44" targetFramework="net40" />
- <package id="MemBus" version="1.5.4" targetFramework="net40" />
- <package id="Shouldly" version="1.1.1.1" targetFramework="net40" />
- <package id="xunit" version="1.9.1" targetFramework="net40" />
- </packages>*/
- //HUGE CAVEATE <<<<< I am not an expert on CQRS. Second, there is no "one way" to do CQRS. Third, CQRS isn't
- //the best fit for every app, or every part of every app. That said, what I have below should provide a decent sketch.
- //Overall, here are my thoughts.... first, using some of the available tools out there, it's pretty straightforward to implement.
- //
- //Second, it's easy to grow this up to your needs. More work at the low-end but less work at scale. Once synch with MemBus is too slow,
- //we can take it out of band and make it asynch. Once MemBus isn't scaling in-proc for us, we can replace it with NServiceBus or our own implementation
- //on top of ZeroMQ. We're storing denormalized data in Raven. Once Raven is the bottleneck (work with me and pretend it doesn't have sharding for a minute) then
- //we could denormalize to multiple instances... or we could choose to put some things in both Raven and in to, say, a ring database like RRDTool or a
- //bucket aggregator like StatsD. That's where things, in my opinion, get easier when you're message-based and passing commands around.
- //
- //Third, this is a lot more code than the CRUD-style I write. Part of that is based on how I've broken things out here.. .Part of it is based on
- //just the overhead of CQRS on C# (at least my understanding of CQRS as expressed below). Rinat Abdullin has an approach of using a custom dialect + T4 parser
- //to cull away the boilerplate stuff.
- //
- //At the end of the day, I believe this overall approach has a lot of merit and I'll be discussing adoption pros and cons with my coworker before we proceed.
- //After I can hash things out with his brilliant mind, I'll write up my refined thoughts and a proper example.
- //This is all xunit.net right now. It should be redone in mspec as it'd make following along much, much easier
- //I'll probably do that later. Maybe.
- //xUnit.net calls the constructor and dispose (if implemented) on your test class before each test. Consider that
- //the test setup/teardown section.
- namespace Tests.PropertyManagement
- {
- //This is my mock UI. It'd be something like an MVC controller (well, FubuMVC in my case because I roll like that)
- public class SomeAwesomeUI
- {
- private readonly IBus _bus;
- public SomeAwesomeUI(IBus bus)
- {
- _bus = bus;
- }
- //Simulate the client kicking something off
- public Guid CreateNewAccount()
- {
- var createCommand = new CreateAccountCommand(Guid.NewGuid(), "Testy", new Address("2501 AStreet", "Denver", "CO"));
- _bus.Publish(createCommand);
- return createCommand.Id;
- }
- public void CloseAccount(Guid accountId)
- {
- var closeCommand = new CloseAccountCommand(accountId);
- _bus.Publish(closeCommand);
- }
- }
- //Commands to do things are sent to your domain
- //For a great discussion on validation with commands, check out http://ingebrigtsen.info/2012/07/28/cqrs-in-asp-net-mvc-with-bifrost/
- public class CreateAccountCommand
- {
- public readonly Guid Id;
- public readonly string Name;
- public Address Address { get; set; }
- public CreateAccountCommand(Guid id, string accountName, Address address)
- {
- Id = id;
- Name = accountName;
- Address = address;
- }
- }
- //A command doesn't need to carry state if you don't want it to... Here, we're just telling it the account id to close.
- public class CloseAccountCommand
- {
- public readonly Guid AccountId;
- public CloseAccountCommand(Guid accountId)
- {
- AccountId = accountId;
- }
- }
- //By convention, I mark my command handlers with this interface. It's partially to handle wiring and partially
- //so I can toss around things like contravariant
- public interface IHandleCommand<in T>
- {
- void Handle(T command);
- }
- //This is the handler that will apply commands to my domain. I could choose
- //another round of some sort of non-business rule validation here. I could
- //log stuff. Whatever. There's also no reason that you need one CommandHandler
- //per command. I'm just doing this because I think this is how our real impl will
- //shape out.
- //IRepository comes from CommonDomain and is a facade over EventStore (both by Jonathan Oliver)
- public class CreateAccountCommandHandler : IHandleCommand<CreateAccountCommand>
- {
- private readonly IRepository _repository;
- public CreateAccountCommandHandler(IRepository repository)
- {
- _repository = repository;
- }
- public void Handle(CreateAccountCommand command)
- {
- var account = new Account(command.Id, command.Name, command.Address);
- _repository.Save(account, Guid.NewGuid());
- }
- }
- //This may _look_ like a normal "load this by id and then mutate state and then save it" repository
- //However, it's actually loading the entire event stream for that object and re-applying the events to
- //it. Don't worry, though, it's not re-publishing events to the bus.. it's just raising events
- //internally to the object. Your domain object, at the end, will be the culmination of all of those
- //applied events. This would be much simpler in F# of we thought about domain state as a left fold
- //of immutable events causing state change.
- //
- //One neat thing about EventStore and, by extension, CommonDomain, is that you can load versions of your
- //object. Check out the overloads on _repository.GetById some time.
- public class DeactivateAccountCommandHandler : IHandleCommand<CloseAccountCommand>
- {
- private readonly IRepository _repository;
- public DeactivateAccountCommandHandler(IRepository repository)
- {
- _repository = repository;
- }
- public void Handle(CloseAccountCommand command)
- {
- var account = _repository.GetById<Account>(command.AccountId);
- account.Close();
- _repository.Save(account, Guid.NewGuid());
- }
- }
- //By convention, I want to provide two means for creating domain objects. To the public, I want
- //to provide an always-valid constructor. This explicitly shows what needs to be provided to the domain
- //to create a valid instance of that object (eg, Person needs a twitter handle to be valid if I were doing twitter stream analysis)
- //Internally, to EventStore, I want it to be able to create my object via a private ctor and I'm going to pass in the
- //objects id.
- //This method is pretty simplistic but my current domain suits it just fine.
- public class AggregateFactory : IConstructAggregates
- {
- public IAggregate Build(Type type, Guid id, IMemento snapshot)
- {
- ConstructorInfo constructor = type.GetConstructor(
- BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(Guid) }, null);
- return constructor.Invoke(new object[] { id }) as IAggregate;
- }
- }
- //Ok, so this is where things get a little.... interesting
- //EventStore is sort of my coordinator for everything
- //When I create a new domain object, I issue commands to it. It, in turn, raises events to change its internal state.
- //
- //Again, thing of the current state of a domain object as what you get after all events that built it are applied.
- //new Person("@benhyr") might raise a PersonCreatedEvent. Then person.UpdateTwitterId("@hyrmn") raises a PersonUpdatedEvent
- //When I load that Person from the EventStore, rather than getting Person.TwitterId from a db field, I'm getting PersonCreatedEvent
- //(which sets the TwitterId initially) and then PersonUpdatedEvent (which updates the TwitterId to the new value)
- //
- //Now, back to this class. When I raise events, they're uncommitted until I persist them back to the EventStore.
- //By default, we assume others might be interested in these uncommitted events. Of course, it's EventStore not EventStoreAndMessageBus
- //(although EventStore could do some basic stuff for us). So, we're telling EventStore to publish to our MemBus bus... at some point,
- //we might put NSB or MassTransit or EasyNetQ or whatever in place.
- public static class DelegateDispatcher
- {
- public static void DispatchCommit(IPublisher bus, Commit commit)
- {
- // This is where we'd hook into our messaging infrastructure, such as NServiceBus,
- // MassTransit, WCF, or some other communications infrastructure.
- // This can be a class as well--just implement IDispatchCommits.
- foreach (var @event in commit.Events)
- bus.Publish(@event.Body);
- }
- }
- //On to the concrete part of the spike... we have accounts, accounts are an aggregate in my domain.
- //For this spike, accounts have a name, are active or inactive (in the real world, they're deactivated for many reasons, but not here)
- //and an account has an address (in the real world, they actually have a couple addresses. Again, not germane to this spike)
- //This is just a value object in DDD parlance. It has no identity itself because it's always owned by an entity object.
- public class Address
- {
- public Address(string line1, string city, string state)
- {
- this.Line1 = line1;
- this.City = city;
- this.State = state;
- }
- public string Line1 { get; private set; }
- public string City { get; private set; }
- public string State { get; private set; }
- }
- //This is my old buddy Account. It inherits from AggregateBase, which comes from CommonDomain.
- //There's no real need to bring CommonDomain in if you don't want. It provides a couple simple mechanisms for me.
- //First, it gives me the IRepository wrapper around EventStore which I use above in my CommandHandlers
- //Second, it gives me a base that tracks all of my uncommitted changes for me.
- //Third, it wires up, by convention, my event handlers (the private void Apply(SomeEvent @event) methods
- public class Account : AggregateBase
- {
- public string Name { get; set; }
- public bool IsActive { get; set; }
- public Address Address { get; set; }
- private Account(Guid id)
- {
- this.Id = id;
- }
- public Account(Guid id, string name, Address address)
- {
- this.Id = id;
- // the event will be routed by convention to a method called ApplyEvent(type of event)
- RaiseEvent(new AccountCreatedEvent
- {
- Id = this.Id,
- Name = name,
- Address = address,
- IsActive = true
- });
- }
- public void Close()
- {
- this.RaiseEvent(new AccountClosedEvent());
- }
- private void Apply(AccountCreatedEvent @event)
- {
- this.Id = @event.Id;
- this.Name = @event.Name;
- this.Address = @event.Address;
- }
- private void Apply(AccountClosedEvent @event)
- {
- this.IsActive = false;
- }
- }
- //A marker interface I have for my own sanity. Useful for convention-based
- //analysis and verification
- public interface IDomainEvent
- {
- }
- //This is going to seem a bit conflated so bear with me. When we create a new Account,
- //we raise an AccountCreatedEvent. We then apply that AccountCreatedEvent to ourselves.
- //Once we save our uncommitted events to EventStore, then that AccountCreatedEvent is also
- //sent out to our bus for other interested parties.
- [Serializable]
- public class AccountCreatedEvent : IDomainEvent
- {
- public Guid Id { get; set; }
- public string Name { get; set; }
- public bool IsActive { get; set; }
- public Address Address { get; set; }
- }
- [Serializable]
- public class AccountClosedEvent : IDomainEvent
- {
- }
- //Again, another convention interface so I can tell my bus how to resolve my handlers.
- //Any party that wants to know about a particular event will mark itself as such.
- public interface IHandleEvent<in T>
- {
- void Handle(T @event);
- }
- //Normally this class would do something awesome like update Raven
- //There's no reason for this to be a single denormalizer
- //However, there's no reason for this to be multiple denormalizers. Design decision!
- //Our use-case in production is that our denormalizers will update our flattened models in RavenDB
- //although, honestly, it could be SQL Server, Mongo, Raik, whatever.
- public class AccountDenormalizer : IHandleEvent<AccountCreatedEvent>,
- IHandleEvent<AccountClosedEvent>
- {
- public string AccountName { get; set; }
- public bool IsActive { get; set; }
- public void Handle(AccountCreatedEvent @event)
- {
- AccountName = @event.Name;
- }
- public void Handle(AccountClosedEvent @event)
- {
- IsActive = false;
- }
- }
- //And, to show multiple event handlers in action, here's a handler that might
- //do something like send an email welcoming the person that just registered
- //or maybe a cool SignalR tie-in that goes to the sales dashboard
- //or a web service endpoint that has a Netduino polling it and ringing a gong when
- //someone signs up. You know, whatever.
- public class KaChingNotifier : IHandleEvent<AccountCreatedEvent>
- {
- public void Handle(AccountCreatedEvent @event)
- {
- Console.WriteLine("Dude, we got a customer, we're gonna be rich!");
- }
- }
- public class OmgSadnessNotifier : IHandleEvent<AccountClosedEvent>
- {
- public void Handle(AccountClosedEvent @event)
- {
- Console.WriteLine("Dude, we lost a customer... start the layoffs :(");
- }
- }
- //and, now we're into the meat of the spike. This is the xUnit.net class under test
- //a lot of this looks repetitive because, well, it was written as I was increasing my
- //understanding of things.
- public class EndToEndTest
- {
- private readonly SomeAwesomeUI _client;
- private readonly IBus _bus;
- //Here, I'm wiring up my MemBus instance and telling it how to resolve my subscribers
- //MemBus also has an awesome way to resolve subscribers from an IoC container. In prod,
- //I'll wire my subscribers into StructureMap and have MemBus resolve them from there.
- //I'm also initializing my awesome test client UI which, if you'll recall from way back at the start
- //simply publishes commands to my MemBus instance (and, again, it could be whatever)
- public EndToEndTest()
- {
- _bus = BusSetup.StartWith<Conservative>()
- .Apply<FlexibleSubscribeAdapter>(a =>
- {
- a.ByInterface(typeof(IHandleEvent<>));
- a.ByInterface(typeof(IHandleCommand<>));
- })
- .Construct();
- _client = new SomeAwesomeUI(_bus);
- }
- [Fact]
- public void CanPublishCreateAccountCommand()
- {
- Should.NotThrow(() => _client.CreateNewAccount());
- }
- [Fact]
- public void CanReceiveCreateAccountCommand()
- {
- var store = Wireup.Init().UsingInMemoryPersistence().Build();
- var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- _bus.Subscribe(handler);
- Should.NotThrow(() => _client.CreateNewAccount());
- }
- [Fact]
- public void CreateAccountEventIsStored()
- {
- var store = Wireup.Init().UsingInMemoryPersistence().Build();
- var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
- var handler = new CreateAccountCommandHandler(repository);
- _bus.Subscribe(handler);
- var accountId = _client.CreateNewAccount();
- store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBeGreaterThan(0);
- }
- [Fact]
- public void CanLoadAccountFromEventStore()
- {
- var store = Wireup.Init().UsingInMemoryPersistence().Build();
- var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
- var handler = new CreateAccountCommandHandler(repository);
- _bus.Subscribe(handler);
- var accountId = _client.CreateNewAccount();
- var account = repository.GetById<Account>(accountId);
- account.ShouldNotBe(null);
- account.Name.ShouldBe("Testy");
- }
- [Fact]
- public void CreateAccountEventIsPublishedToBus()
- {
- var store = Wireup.Init().UsingInMemoryPersistence()
- .UsingSynchronousDispatchScheduler()
- .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
- .Build();
- var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- var denormalizer = new AccountDenormalizer();
- _bus.Subscribe(handler);
- _bus.Subscribe(denormalizer);
- _client.CreateNewAccount();
- denormalizer.AccountName.ShouldBe("Testy");
- }
- [Fact]
- public void DeactivingAccountDoesntRetriggerInitialCreate()
- {
- var store = Wireup.Init().UsingInMemoryPersistence()
- .UsingSynchronousDispatchScheduler()
- .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
- .Build();
- var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- var denormalizer = new AccountDenormalizer();
- _bus.Subscribe(createHandler);
- _bus.Subscribe(deactivateHandler);
- _bus.Subscribe(denormalizer);
- var accountId = _client.CreateNewAccount();
- _client.CloseAccount(accountId);
- denormalizer.AccountName.ShouldBe("Testy");
- denormalizer.IsActive.ShouldBe(false);
- store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
- }
- //For fun, run this with the Debugger (eg, if using TDD.NET then right click on this method and select Test With -> Debugger.
- //Put break points in various spots of the code above and see what happens.
- [Fact]
- public void TyingtTogether()
- {
- var store = Wireup.Init().UsingInMemoryPersistence()
- .UsingSynchronousDispatchScheduler()
- .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
- .Build();
- var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
- var denormalizer = new AccountDenormalizer();
- _bus.Subscribe(createHandler);
- _bus.Subscribe(deactivateHandler);
- _bus.Subscribe(denormalizer);
- _bus.Subscribe(new KaChingNotifier());
- _bus.Subscribe(new OmgSadnessNotifier());
- var accountId = _client.CreateNewAccount();
- _client.CloseAccount(accountId);
- denormalizer.AccountName.ShouldBe("Testy");
- denormalizer.IsActive.ShouldBe(false);
- store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
- }
- }
- }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement