2015 m. lapkričio 2 d., pirmadienis

Event Sourcing - events de-duplication and order of events


One of the challenges teams has to tackle while implementing event sourcing is events de-duplication and processing order of events while updating read model. Once events are being stored in event store we usually publish those so that all interested parties could update their models, including our own projections. Now if you are using service like AWS SQS/SNS or any other queue that does not provide FIFO and one-time delivery features you will have to implement a solution by yourself.

To illustrate the problem let's assume we have six different events coming out of our domain model and consumed by our event handlers concurrently in order like this:

1) EventOne {"id":"aggregate-id-1", "fieldA":"value-a6", "version":6}
2) EventTwo {"id":"aggregate-id-1", "fieldA":"value-a3", "fieldB":"value-b3", "version":3}
3) EventThree {"id":"aggregate-id-1", "fieldB":"value-b5", "version":5}
4) EventFour {"id":"aggregate-id-1", "fieldC":"value-c2", "version":2}
5) EventFive {"id":"aggregate-id-1", "fieldC":"value-c4", "fieldD":"value-d4", "version":4}
6) EventSix {"id":"aggregate-id-1", "fieldD":"value-d1", "version":1}
7) EventOne {"id":"aggregate-id-1", "fieldA":"value-x7", "version":7}

"version" field in each event tells us the sequence events has been created by domain, but once we put those events in queue order of messages is lost and on the other end we get them in mixed order.

Now let's say we want to build a projection that would have all 4 fields in it:

- "fieldA"
- "fieldB"
- "fieldC"
- "fieldD"

Problem is that if events will arrive in order specified above "EventTwo" will override "fieldA" value, "EventSix" will override "fieldD" and so on. If you will say that we can use "version" to reject update, well then you will loose "fieldB" value from "EventTwo". Perhaps "EventTwo" overrides "fieldA", but it does not overrides "fieldB" so it should be added.

If we will add to all this that same message can arrive multiple times this gets even more messy.


The most simple solution I have come up with for this problem was to add version field for each projection field. Basically in our case for those 4 fields i would add another 4 fields. Then all my event handlers would build this generic update query for required storage engine (MySQL, AWS RDS, DynamoDB it does not matter)

"INSERT fields from consumed events with this unique aggregate id, IF id exists, then UPDATE record with this unique id and SET every field to value from consumed event IF version of event is higher the the one stored in DB, otherwise reject change"

Basically we apply versioning and we use "version" value for each field. I do agree that having 2x times fields is not so "clean" but this is a solution that allows you to have idempotent event handlers which can apply same "UPDATE" statement multiple times and result will be the same all the time. Also it does not care about order of events. You can process them from the beginning, middle or start, it has no difference as only the latest version for every field will be set.

Here is a sqlfiddle to illustrate problem and solution.