Advertisement
Guest User

Untitled

a guest
Sep 17th, 2012
701
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 18.39 KB | None | 0 0
  1. using System;
  2. using System.Reflection;
  3.  
  4. using CommonDomain;
  5. using CommonDomain.Core;
  6. using CommonDomain.Persistence;
  7. using CommonDomain.Persistence.EventStore;
  8.  
  9. using EventStore;
  10. using EventStore.Dispatcher;
  11.  
  12. using MemBus;
  13. using MemBus.Configurators;
  14. using MemBus.Subscribing;
  15.  
  16. using Shouldly;
  17.  
  18. using Xunit;
  19.  
  20. //These are the nuget packages I'm using
  21. /*<?xml version="1.0" encoding="utf-8"?>
  22. <packages>
  23. <package id="CommonDomain" version="1.4.0" targetFramework="net40" />
  24. <package id="EventStore" version="3.0.11326.44" targetFramework="net40" />
  25. <package id="MemBus" version="1.5.4" targetFramework="net40" />
  26. <package id="Shouldly" version="1.1.1.1" targetFramework="net40" />
  27. <package id="xunit" version="1.9.1" targetFramework="net40" />
  28. </packages>*/
  29.  
  30. //HUGE CAVEATE <<<<< I am not an expert on CQRS. Second, there is no "one way" to do CQRS. Third, CQRS isn't
  31. //the best fit for every app, or every part of every app. That said, what I have below should provide a decent sketch.
  32.  
  33. //Overall, here are my thoughts.... first, using some of the available tools out there, it's pretty straightforward to implement.
  34. //
  35. //Second, it's easy to grow this up to your needs. More work at the low-end but less work at scale. Once synch with MemBus is too slow,
  36. //we can take it out of band and make it asynch. Once MemBus isn't scaling in-proc for us, we can replace it with NServiceBus or our own implementation
  37. //on top of ZeroMQ. We're storing denormalized data in Raven. Once Raven is the bottleneck (work with me and pretend it doesn't have sharding for a minute) then
  38. //we could denormalize to multiple instances... or we could choose to put some things in both Raven and in to, say, a ring database like RRDTool or a
  39. //bucket aggregator like StatsD. That's where things, in my opinion, get easier when you're message-based and passing commands around.
  40. //
  41. //Third, this is a lot more code than the CRUD-style I write. Part of that is based on how I've broken things out here.. .Part of it is based on
  42. //just the overhead of CQRS on C# (at least my understanding of CQRS as expressed below). Rinat Abdullin has an approach of using a custom dialect + T4 parser
  43. //to cull away the boilerplate stuff.
  44. //
  45. //At the end of the day, I believe this overall approach has a lot of merit and I'll be discussing adoption pros and cons with my coworker before we proceed.
  46. //After I can hash things out with his brilliant mind, I'll write up my refined thoughts and a proper example.
  47.  
  48.  
  49. //This is all xunit.net right now. It should be redone in mspec as it'd make following along much, much easier
  50. //I'll probably do that later. Maybe.
  51. //xUnit.net calls the constructor and dispose (if implemented) on your test class before each test. Consider that
  52. //the test setup/teardown section.
  53. namespace Tests.PropertyManagement
  54. {
  55. //This is my mock UI. It'd be something like an MVC controller (well, FubuMVC in my case because I roll like that)
  56. public class SomeAwesomeUI
  57. {
  58. private readonly IBus _bus;
  59.  
  60. public SomeAwesomeUI(IBus bus)
  61. {
  62. _bus = bus;
  63. }
  64.  
  65. //Simulate the client kicking something off
  66. public Guid CreateNewAccount()
  67. {
  68. var createCommand = new CreateAccountCommand(Guid.NewGuid(), "Testy", new Address("2501 AStreet", "Denver", "CO"));
  69. _bus.Publish(createCommand);
  70.  
  71. return createCommand.Id;
  72. }
  73.  
  74. public void CloseAccount(Guid accountId)
  75. {
  76. var closeCommand = new CloseAccountCommand(accountId);
  77. _bus.Publish(closeCommand);
  78. }
  79. }
  80.  
  81. //Commands to do things are sent to your domain
  82. //For a great discussion on validation with commands, check out http://ingebrigtsen.info/2012/07/28/cqrs-in-asp-net-mvc-with-bifrost/
  83. public class CreateAccountCommand
  84. {
  85. public readonly Guid Id;
  86. public readonly string Name;
  87. public Address Address { get; set; }
  88.  
  89. public CreateAccountCommand(Guid id, string accountName, Address address)
  90. {
  91. Id = id;
  92. Name = accountName;
  93. Address = address;
  94. }
  95. }
  96.  
  97. //A command doesn't need to carry state if you don't want it to... Here, we're just telling it the account id to close.
  98. public class CloseAccountCommand
  99. {
  100. public readonly Guid AccountId;
  101.  
  102. public CloseAccountCommand(Guid accountId)
  103. {
  104. AccountId = accountId;
  105. }
  106. }
  107.  
  108. //By convention, I mark my command handlers with this interface. It's partially to handle wiring and partially
  109. //so I can toss around things like contravariant
  110. public interface IHandleCommand<in T>
  111. {
  112. void Handle(T command);
  113. }
  114.  
  115. //This is the handler that will apply commands to my domain. I could choose
  116. //another round of some sort of non-business rule validation here. I could
  117. //log stuff. Whatever. There's also no reason that you need one CommandHandler
  118. //per command. I'm just doing this because I think this is how our real impl will
  119. //shape out.
  120. //IRepository comes from CommonDomain and is a facade over EventStore (both by Jonathan Oliver)
  121. public class CreateAccountCommandHandler : IHandleCommand<CreateAccountCommand>
  122. {
  123. private readonly IRepository _repository;
  124.  
  125. public CreateAccountCommandHandler(IRepository repository)
  126. {
  127. _repository = repository;
  128. }
  129.  
  130. public void Handle(CreateAccountCommand command)
  131. {
  132. var account = new Account(command.Id, command.Name, command.Address);
  133.  
  134. _repository.Save(account, Guid.NewGuid());
  135. }
  136. }
  137.  
  138. //This may _look_ like a normal "load this by id and then mutate state and then save it" repository
  139. //However, it's actually loading the entire event stream for that object and re-applying the events to
  140. //it. Don't worry, though, it's not re-publishing events to the bus.. it's just raising events
  141. //internally to the object. Your domain object, at the end, will be the culmination of all of those
  142. //applied events. This would be much simpler in F# of we thought about domain state as a left fold
  143. //of immutable events causing state change.
  144. //
  145. //One neat thing about EventStore and, by extension, CommonDomain, is that you can load versions of your
  146. //object. Check out the overloads on _repository.GetById some time.
  147. public class DeactivateAccountCommandHandler : IHandleCommand<CloseAccountCommand>
  148. {
  149. private readonly IRepository _repository;
  150.  
  151. public DeactivateAccountCommandHandler(IRepository repository)
  152. {
  153. _repository = repository;
  154. }
  155.  
  156. public void Handle(CloseAccountCommand command)
  157. {
  158. var account = _repository.GetById<Account>(command.AccountId);
  159. account.Close();
  160.  
  161. _repository.Save(account, Guid.NewGuid());
  162. }
  163. }
  164.  
  165. //By convention, I want to provide two means for creating domain objects. To the public, I want
  166. //to provide an always-valid constructor. This explicitly shows what needs to be provided to the domain
  167. //to create a valid instance of that object (eg, Person needs a twitter handle to be valid if I were doing twitter stream analysis)
  168. //Internally, to EventStore, I want it to be able to create my object via a private ctor and I'm going to pass in the
  169. //objects id.
  170. //This method is pretty simplistic but my current domain suits it just fine.
  171. public class AggregateFactory : IConstructAggregates
  172. {
  173. public IAggregate Build(Type type, Guid id, IMemento snapshot)
  174. {
  175. ConstructorInfo constructor = type.GetConstructor(
  176. BindingFlags.NonPublic | BindingFlags.Instance, null, new Type[] { typeof(Guid) }, null);
  177.  
  178. return constructor.Invoke(new object[] { id }) as IAggregate;
  179. }
  180. }
  181.  
  182. //Ok, so this is where things get a little.... interesting
  183. //EventStore is sort of my coordinator for everything
  184. //When I create a new domain object, I issue commands to it. It, in turn, raises events to change its internal state.
  185. //
  186. //Again, thing of the current state of a domain object as what you get after all events that built it are applied.
  187. //new Person("@benhyr") might raise a PersonCreatedEvent. Then person.UpdateTwitterId("@hyrmn") raises a PersonUpdatedEvent
  188. //When I load that Person from the EventStore, rather than getting Person.TwitterId from a db field, I'm getting PersonCreatedEvent
  189. //(which sets the TwitterId initially) and then PersonUpdatedEvent (which updates the TwitterId to the new value)
  190. //
  191. //Now, back to this class. When I raise events, they're uncommitted until I persist them back to the EventStore.
  192. //By default, we assume others might be interested in these uncommitted events. Of course, it's EventStore not EventStoreAndMessageBus
  193. //(although EventStore could do some basic stuff for us). So, we're telling EventStore to publish to our MemBus bus... at some point,
  194. //we might put NSB or MassTransit or EasyNetQ or whatever in place.
  195. public static class DelegateDispatcher
  196. {
  197. public static void DispatchCommit(IPublisher bus, Commit commit)
  198. {
  199. // This is where we'd hook into our messaging infrastructure, such as NServiceBus,
  200. // MassTransit, WCF, or some other communications infrastructure.
  201. // This can be a class as well--just implement IDispatchCommits.
  202.  
  203. foreach (var @event in commit.Events)
  204. bus.Publish(@event.Body);
  205. }
  206. }
  207.  
  208. //On to the concrete part of the spike... we have accounts, accounts are an aggregate in my domain.
  209. //For this spike, accounts have a name, are active or inactive (in the real world, they're deactivated for many reasons, but not here)
  210. //and an account has an address (in the real world, they actually have a couple addresses. Again, not germane to this spike)
  211.  
  212.  
  213. //This is just a value object in DDD parlance. It has no identity itself because it's always owned by an entity object.
  214. public class Address
  215. {
  216. public Address(string line1, string city, string state)
  217. {
  218. this.Line1 = line1;
  219. this.City = city;
  220. this.State = state;
  221. }
  222.  
  223. public string Line1 { get; private set; }
  224. public string City { get; private set; }
  225. public string State { get; private set; }
  226. }
  227.  
  228.  
  229. //This is my old buddy Account. It inherits from AggregateBase, which comes from CommonDomain.
  230. //There's no real need to bring CommonDomain in if you don't want. It provides a couple simple mechanisms for me.
  231. //First, it gives me the IRepository wrapper around EventStore which I use above in my CommandHandlers
  232. //Second, it gives me a base that tracks all of my uncommitted changes for me.
  233. //Third, it wires up, by convention, my event handlers (the private void Apply(SomeEvent @event) methods
  234. public class Account : AggregateBase
  235. {
  236. public string Name { get; set; }
  237. public bool IsActive { get; set; }
  238. public Address Address { get; set; }
  239.  
  240. private Account(Guid id)
  241. {
  242. this.Id = id;
  243. }
  244.  
  245. public Account(Guid id, string name, Address address)
  246. {
  247. this.Id = id;
  248. // the event will be routed by convention to a method called ApplyEvent(type of event)
  249. RaiseEvent(new AccountCreatedEvent
  250. {
  251. Id = this.Id,
  252. Name = name,
  253. Address = address,
  254. IsActive = true
  255. });
  256. }
  257.  
  258. public void Close()
  259. {
  260. this.RaiseEvent(new AccountClosedEvent());
  261. }
  262.  
  263. private void Apply(AccountCreatedEvent @event)
  264. {
  265. this.Id = @event.Id;
  266. this.Name = @event.Name;
  267. this.Address = @event.Address;
  268. }
  269.  
  270. private void Apply(AccountClosedEvent @event)
  271. {
  272. this.IsActive = false;
  273. }
  274. }
  275.  
  276. //A marker interface I have for my own sanity. Useful for convention-based
  277. //analysis and verification
  278. public interface IDomainEvent
  279. {
  280. }
  281.  
  282. //This is going to seem a bit conflated so bear with me. When we create a new Account,
  283. //we raise an AccountCreatedEvent. We then apply that AccountCreatedEvent to ourselves.
  284. //Once we save our uncommitted events to EventStore, then that AccountCreatedEvent is also
  285. //sent out to our bus for other interested parties.
  286. [Serializable]
  287. public class AccountCreatedEvent : IDomainEvent
  288. {
  289. public Guid Id { get; set; }
  290. public string Name { get; set; }
  291. public bool IsActive { get; set; }
  292. public Address Address { get; set; }
  293. }
  294.  
  295. [Serializable]
  296. public class AccountClosedEvent : IDomainEvent
  297. {
  298. }
  299.  
  300. //Again, another convention interface so I can tell my bus how to resolve my handlers.
  301. //Any party that wants to know about a particular event will mark itself as such.
  302. public interface IHandleEvent<in T>
  303. {
  304. void Handle(T @event);
  305. }
  306.  
  307. //Normally this class would do something awesome like update Raven
  308. //There's no reason for this to be a single denormalizer
  309. //However, there's no reason for this to be multiple denormalizers. Design decision!
  310. //Our use-case in production is that our denormalizers will update our flattened models in RavenDB
  311. //although, honestly, it could be SQL Server, Mongo, Raik, whatever.
  312. public class AccountDenormalizer : IHandleEvent<AccountCreatedEvent>,
  313. IHandleEvent<AccountClosedEvent>
  314. {
  315. public string AccountName { get; set; }
  316. public bool IsActive { get; set; }
  317.  
  318. public void Handle(AccountCreatedEvent @event)
  319. {
  320. AccountName = @event.Name;
  321. }
  322.  
  323. public void Handle(AccountClosedEvent @event)
  324. {
  325. IsActive = false;
  326. }
  327. }
  328.  
  329. //And, to show multiple event handlers in action, here's a handler that might
  330. //do something like send an email welcoming the person that just registered
  331. //or maybe a cool SignalR tie-in that goes to the sales dashboard
  332. //or a web service endpoint that has a Netduino polling it and ringing a gong when
  333. //someone signs up. You know, whatever.
  334. public class KaChingNotifier : IHandleEvent<AccountCreatedEvent>
  335. {
  336. public void Handle(AccountCreatedEvent @event)
  337. {
  338. Console.WriteLine("Dude, we got a customer, we're gonna be rich!");
  339. }
  340. }
  341.  
  342. public class OmgSadnessNotifier : IHandleEvent<AccountClosedEvent>
  343. {
  344. public void Handle(AccountClosedEvent @event)
  345. {
  346. Console.WriteLine("Dude, we lost a customer... start the layoffs :(");
  347. }
  348. }
  349.  
  350. //and, now we're into the meat of the spike. This is the xUnit.net class under test
  351. //a lot of this looks repetitive because, well, it was written as I was increasing my
  352. //understanding of things.
  353. public class EndToEndTest
  354. {
  355. private readonly SomeAwesomeUI _client;
  356. private readonly IBus _bus;
  357.  
  358. //Here, I'm wiring up my MemBus instance and telling it how to resolve my subscribers
  359. //MemBus also has an awesome way to resolve subscribers from an IoC container. In prod,
  360. //I'll wire my subscribers into StructureMap and have MemBus resolve them from there.
  361. //I'm also initializing my awesome test client UI which, if you'll recall from way back at the start
  362. //simply publishes commands to my MemBus instance (and, again, it could be whatever)
  363. public EndToEndTest()
  364. {
  365. _bus = BusSetup.StartWith<Conservative>()
  366. .Apply<FlexibleSubscribeAdapter>(a =>
  367. {
  368. a.ByInterface(typeof(IHandleEvent<>));
  369. a.ByInterface(typeof(IHandleCommand<>));
  370. })
  371. .Construct();
  372.  
  373. _client = new SomeAwesomeUI(_bus);
  374. }
  375.  
  376. [Fact]
  377. public void CanPublishCreateAccountCommand()
  378. {
  379. Should.NotThrow(() => _client.CreateNewAccount());
  380. }
  381.  
  382. [Fact]
  383. public void CanReceiveCreateAccountCommand()
  384. {
  385. var store = Wireup.Init().UsingInMemoryPersistence().Build();
  386. var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  387.  
  388. _bus.Subscribe(handler);
  389.  
  390. Should.NotThrow(() => _client.CreateNewAccount());
  391. }
  392.  
  393. [Fact]
  394. public void CreateAccountEventIsStored()
  395. {
  396. var store = Wireup.Init().UsingInMemoryPersistence().Build();
  397. var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
  398. var handler = new CreateAccountCommandHandler(repository);
  399.  
  400. _bus.Subscribe(handler);
  401. var accountId = _client.CreateNewAccount();
  402.  
  403. store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBeGreaterThan(0);
  404. }
  405.  
  406. [Fact]
  407. public void CanLoadAccountFromEventStore()
  408. {
  409. var store = Wireup.Init().UsingInMemoryPersistence().Build();
  410. var repository = new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector());
  411. var handler = new CreateAccountCommandHandler(repository);
  412.  
  413. _bus.Subscribe(handler);
  414. var accountId = _client.CreateNewAccount();
  415.  
  416. var account = repository.GetById<Account>(accountId);
  417.  
  418. account.ShouldNotBe(null);
  419. account.Name.ShouldBe("Testy");
  420. }
  421.  
  422. [Fact]
  423. public void CreateAccountEventIsPublishedToBus()
  424. {
  425. var store = Wireup.Init().UsingInMemoryPersistence()
  426. .UsingSynchronousDispatchScheduler()
  427. .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
  428. .Build();
  429.  
  430. var handler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  431. var denormalizer = new AccountDenormalizer();
  432.  
  433. _bus.Subscribe(handler);
  434. _bus.Subscribe(denormalizer);
  435.  
  436. _client.CreateNewAccount();
  437.  
  438. denormalizer.AccountName.ShouldBe("Testy");
  439. }
  440.  
  441. [Fact]
  442. public void DeactivingAccountDoesntRetriggerInitialCreate()
  443. {
  444. var store = Wireup.Init().UsingInMemoryPersistence()
  445. .UsingSynchronousDispatchScheduler()
  446. .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
  447. .Build();
  448.  
  449. var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  450. var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  451. var denormalizer = new AccountDenormalizer();
  452.  
  453. _bus.Subscribe(createHandler);
  454. _bus.Subscribe(deactivateHandler);
  455. _bus.Subscribe(denormalizer);
  456.  
  457. var accountId = _client.CreateNewAccount();
  458. _client.CloseAccount(accountId);
  459.  
  460. denormalizer.AccountName.ShouldBe("Testy");
  461. denormalizer.IsActive.ShouldBe(false);
  462. store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
  463. }
  464.  
  465.  
  466. //For fun, run this with the Debugger (eg, if using TDD.NET then right click on this method and select Test With -> Debugger.
  467. //Put break points in various spots of the code above and see what happens.
  468. [Fact]
  469. public void TyingtTogether()
  470. {
  471. var store = Wireup.Init().UsingInMemoryPersistence()
  472. .UsingSynchronousDispatchScheduler()
  473. .DispatchTo(new DelegateMessageDispatcher(c => DelegateDispatcher.DispatchCommit(_bus, c)))
  474. .Build();
  475.  
  476. var createHandler = new CreateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  477. var deactivateHandler = new DeactivateAccountCommandHandler(new EventStoreRepository(store, new AggregateFactory(), new ConflictDetector()));
  478. var denormalizer = new AccountDenormalizer();
  479.  
  480. _bus.Subscribe(createHandler);
  481. _bus.Subscribe(deactivateHandler);
  482.  
  483. _bus.Subscribe(denormalizer);
  484. _bus.Subscribe(new KaChingNotifier());
  485. _bus.Subscribe(new OmgSadnessNotifier());
  486.  
  487. var accountId = _client.CreateNewAccount();
  488. _client.CloseAccount(accountId);
  489.  
  490. denormalizer.AccountName.ShouldBe("Testy");
  491. denormalizer.IsActive.ShouldBe(false);
  492. store.OpenStream(accountId, 0, int.MaxValue).CommittedEvents.Count.ShouldBe(2);
  493. }
  494. }
  495. }
Advertisement
Add Comment
Please, Sign In to add comment
Advertisement