F# event sourcing with Marten

December 10, 2022

tl;dr: In this post we are going to

  • Configure Marten in an F# project with the default serializer Newtonsoft.Json and use the document store as well as the event store
  • Check what F# types are supported out of the box
  • Use System.Text.Json for JSON serialization and customize some types with the help of FSharp.SystemTextJson
  • Refactor the code to be a bit more functional by using a Discriminated Union as our event type

Sample code is available on github: https://github.com/jannikbuschke/blog-samples/tree/main/marten-and-fsharp-getting-started


Getting started

We need the dotnet sdk and a postgresql database.


mkdir marten-fs-sample //highlight-line
cd marten-fs-sample
dotnet new xunit --language f#
dotnet add package Marten

Let's start by creating a Marten store object in order to persist and load a record:


type Du = | Case1 of int | Case2
type SampleType = { Id:string; Hello: string; Option: int option; Du: Du }
[<Fact>]
let ``A first simple test`` () =
task {
let store =
DocumentStore.For(fun options ->
options.Connection("xxx")
options.AutoCreateSchemaObjects <- AutoCreate.None
)
// Finally, use the `IDocumentSession` to start
// working with documents and queries.
use session = store.OpenSession()
// The following will instruct Marten to do an upsert for the `SampleType` given
session.Store({ Id = "1"; Hello = "World"; Option = Some 5; Du = Du.Case2 })
let! _ = session.SaveChangesAsync()
// Create a new session and try to load the saved object from the database
use session = store.OpenSession()
let! sample = session.LoadAsync<SampleType>("1")
// check if object is same as the one saved
Assert.Equal(sample, ({ Id = "1"; Hello = "World"; Option = Some 5; Du = Du.Case2 }))
// => true
return ()
}

Executing the above test will cause Marten to generate a table mt_doc_tests_sampletype with the following columns: id, data, mt_last_modified, mt_version, and mt_dotnet_type:


id: 1 // document id
mt_last_modified: 2022-12-09 16:02:34.219722+01
mt_version: 0184f768-d700-4969-b01a-c809eb24d4f0
mt_dotnet_type: Tests+SampleType
data:


{
"Du": {
"Case": "Case2"
},
"Id": "1",
"Hello": "World",
"Option": {
"Case": "Some",
"Fields": [5]
}
}

Let's customize the serializer later to save our data in a different format. It was easy to store and retrieve documents, and now let's take a leap and get started with event sourcing.

Event sourcing

In addition to the document store Marten provides an event store and feature for constructing projections. I will use chess a an example domain. First we will create some types representing the domain, an event and an aggregate for our event stream:


type Color =
| Black
| White
type PieceType =
| King
| Queen
| Bishop
| Knight
| Rook
| Pawn
type ChessPiece =
{ Type: PieceType
Color: Color
Position: int * int }
type State =
| Initialized
| Running
| Ended
// First Event
type GameInitialized = { Pieces: ChessPiece list }
// Aggregate
// As Marten needs a default contstructor, apply the CLIMutable attribute
[<CLIMutable>]
type ChessGame =
{ Id: Guid
Pieces: ChessPiece list
State: State }
member this.Apply(e: GameInitialized, meta: Marten.Events.IEvent) : ChessGame =
// return new ChessGame record
{ Id = meta.StreamId
Pieces = e.Pieces
State = State.Running }

Now let`s write another test in which we will first save an event and then load the aggregate:


[<Fact>]
let ``Event store with default options should save events and build a projection`` () =
task {
let store =
DocumentStore.For (fun options ->
options.Connection ("xxx")
// Marten uses a code generator for the event projections
options.GeneratedCodeMode <- LamarCodeGeneration.TypeLoadMode.Auto
// Marten can automatically create the database schema for us
options.AutoCreateSchemaObjects <- AutoCreate.All
// Here we let Marten know that we want `ChessGame` to be a stream of type `SelfAggrgate`.
// The second argument tells Marten to apply the event
// to any projection immediatly as part of the transaction that saves the event.
options.Projections.SelfAggregate<ChessGame> (ProjectionLifecycle.Inline)
|> ignore)
// Open a session to get access to the database
use session = store.LightweightSession ()
// create and id for our aggregate/stream
let gameId = Guid.NewGuid ()
// Append our first event (GameInitialized), this will not yet save the event
session.Events.Append (
gameId,
{ Pieces =
[ { Color = Black
Position = (0, 0)
Type = Pawn } ] }
)
|> ignore
// Marten will now save this event to the event table and potentially to projections.
let! _ = session.SaveChangesAsync ()
let! game = session.LoadAsync<ChessGame> (gameId)
Assert.Equal (
game,
({ Id = gameId
State = Running
Pieces =
[ { Color = Black
Position = (0, 0)
Type = Pawn } ] })
)
return ()
}

Nice, now when we execute the code, Marten will save our event to the table mt_events, apply it to our ChessGame projection and saves that to the document store in the table mt_doc_chess_chessgame! This way Marten provides both an event store and a store for our read model.

Entry in mt_events:


seq_id: 1 // global sequence id
id: 0184f5b5-b110-4dab-bb0e-43c68257e7a9 // event id
stream_id: 716c0877-5de2-4451-8590-8713b1e8014a // stream id
version: 1
type: game_initialized
timestamp: 2022-12-09 08:13:45.835311+01
tenant_id: *DEFAULT*
mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samples
is_archived: false
data:


{
"Pieces": [
{
"Type": {
"Case": "Pawn"
},
"Color": {
"Case": "Black"
},
"Position": {
"Item1": 0,
"Item2": 0
}
}
]
}

The seq_id is a global counter that increments for every event, the id is a unique identifier for each event, and the stream_id is the ID of the associated stream/aggregate.

Let's have a look at our projection table mt_doc_chess_chessgame:


id: d09d093e-95c0-4575-99d1-089d74ed95d1 // aggregate/stream id
mt_last_modified: 2022-12-09 08:13:45.816568+01
mt_version: 0184f5bb-a6b8-4eb0-9742-0de8bb4d6dc0
mt_dotnet_type: EventStoreTest.Chess+ChessGame
data:


{
"Id": "d09d093e-95c0-4575-99d1-089d74ed95d1",
"State": { "Case": "Running" },
"Pieces": [
{
"Type": { "Case": "Pawn" },
"Color": { "Case": "Black" },
"Position": { "Item1": 0, "Item2": 0 }
}
]
}

Ok let's add another event type and refactor the domain a little bit:


type Color =
| Black
| White
type PieceType =
| King
| Queen
| Bishop
| Knight
| Rook
| Pawn
type ChessPiece = { Type: PieceType; Color: Color }
type State =
| NotStarted
| Running
| Ended of Result
type GameInitialized = { Pieces: Map<int * int, ChessPiece> }
// new event
type PieceMoved = { From: int * int; To: int * int }
[<CLIMutable>]
type ChessGame =
{ Id: Guid
Pieces: Map<int * int, ChessPiece>
State: State }
member this.Apply(e: GameInitialized, meta: Marten.Events.IEvent) : ChessGame =
{ Id = meta.StreamId
Pieces = e.Pieces
State = State.NotStarted }
member this.Apply(e: PieceMoved) : ChessGame =
{ this with Pieces = this.Pieces.Add(e.To, this.Pieces.[e.From]).Remove (e.From) }

We store the pieces in a Map where the key is the position and the value is the piece. This should make it easier to move pieces around. We've also added an additional Apply method override to handle the PieceMoved event type.

Let`s adjust the test:


// ...
let gameId = Guid.NewGuid ()
// append fist event
session.Events.Append (gameId, { Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq })
|> ignore
// append second event
session.Events.Append (gameId, { From = (0, 0); To = (0, 1) }) |> ignore
let! _ = session.SaveChangesAsync ()
let! game = session.LoadAsync<ChessGame> (gameId)
Assert.Equal (
({ Id = gameId
State = NotStarted
Pieces = [ ((0, 1), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }),
game
)

Now run the test aaand we get an error:


Newtonsoft.Json.JsonSerializationException:
Could not convert string '(0, 1)' to dictionary key type 'System.Tuple`2[System.Int32,System.Int32]'.
Create a TypeConverter to convert from the string to the key type object. Path 'Pieces['(0, 1)']', line 1, position 97.

The default serializer Newtonsoft.Json worked well for the different F# types, except for tuples used as keys for Map. Data was saved successfully, but Newtonsoft had difficulty deserializing it. Writing a custom TypeConverter may help, but there are a wide range of F# serializers available, such as FSharpLu.Json, Thoth.Json, Json.NET (Newtonsoft.Json), and System.Text.Json. For this example, we are using System.Text.Json. To change the configuration, follow these steps:


DocumentStore.For (fun options ->
options.Connection ("xxx")
options.GeneratedCodeMode <- LamarCodeGeneration.TypeLoadMode.Auto
options.AutoCreateSchemaObjects <- AutoCreate.All
options.Projections.SelfAggregate<ChessGame> (ProjectionLifecycle.Inline) |> ignore
// Configure Marten to use System.Text.Json for json serialization
let serializer = Marten.Services.SystemTextJsonSerializer(()
options.Serializer(serializer)
)

run the test aaand get the following error:


System.NotSupportedException:
F# discriminated union serialization is not supported. Consider authoring a custom converter for the type.

Well, it seems that STJ does not yet fully support F# types. Fortunately, there is a solution! Thanks to FSharp.SystemTextJson, we can use DUs, Maps, and other F# types. To take advantage of this, let's add a package reference and add the JsonFSharpConverter to our project:


dotnet add package FSharp.SystemTextJson


//...
let serializer = Marten.Services.SystemTextJsonSerializer ()
serializer.Customize (fun v -> v.Converters.Add (JsonFSharpConverter ()))
options.Serializer (serializer)

Now the test works again! After cleaing the database and running the test the database looks like this:

mt_events table:


// First row
seq_id: 1 // global sequence id
id: 0184f604-9000-4928-a8e8-efc9a4678922 // event id
stream_id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // stream id
version: 1
type: game_initialized
timestamp: 2022-12-09 09:33:25.059613+01
tenant_id: *DEFAULT*
mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samples
is_archived: false
data:


{
"Pieces": [
[[0, 0], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }]
]
}


// Second row
seq_id: 2 // global sequence id
id: 0184f604-9002-46f6-94c6-682d3a4df3ca // event id
stream_id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // stream id
version: 2
type: piece_moved
timestamp: 2022-12-09 09:33:25.059613+01
tenant_id: *DEFAULT*
mt_dotnet_type: EventStoreTest.Chess+PieceMoved, blog-samples
is_archived: false
data:


{ "To": [0, 1], "From": [0, 0] }

mt_doc_game_chessgame table:


id: 7fc00dff-a666-4be2-bbb9-68bbf1762bfd // aggregate/stream id
mt_last_modified: 2022-12-09 09:33:25.043814+01
mt_version: 0184f604-92fb-4d0b-ad8f-0c5923c808c3
mt_dotnet_type: EventStoreTest.Chess+ChessGame
data:


{
"Id": "7fc00dff-a666-4be2-bbb9-68bbf1762bfd",
"State": { "Case": "NotStarted" },
"Pieces": [
[[0, 1], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }]
]
}

Cool! Marten successfully applied events 1 and 2 to our aggregate and also updated the projection state to its latest state in the document store.

Replay events

Instead of using the persisted projection from table mt_doc_chess_chessgame we can also tell marten to do a live aggregation to any version of the aggregate:


// rebuild aggregate state at version 1 (one event applied)
let! gameV1 = session.Events.AggregateStreamAsync<ChessGame> (gameId, 1)
Assert.Equal (
({ Id = gameId
State = NotStarted
Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }),
gameV1
)
// rebuild aggregate state at version 2
let! gameV2 = session.Events.AggregateStreamAsync<ChessGame> (gameId, 2)
Assert.Equal (
({ Id = gameId
State = Running
Pieces = [ ((0, 1), { Color = Black; Type = Pawn }) ] |> Map.ofSeq }),
gameV2
)
// rebuild latest aggregate state (same as version 2)
let! gameV3 = session.Events.AggregateStreamAsync<ChessGame> (gameId)
Assert.Equal (gameV2, gameV3)

FSharp.SystemTextJson default serialization format

FSharp.SystemTextJson uses some interesting defaults for serializing F# types.

Options are rendered as the value in the Some case and null in the None case


Some 5
None


5 // <- Some
null // <- None

Single Case unions are unwrapped


type Id = | Id of System.Guid


"7817a4bd-c7c3-460f-9679-aa10afd4ac89"

Multi case unions have a discriminator with the case name and a Fields array:


type Du =
| Case1
| Case2 of int
| Case3 of int * string
| Case4 of name: string
| Case5 of name: string * payload: MyRecord
| Case6 of MyRecord


{"Case":"Case1"}
{"Case":"Case2","Fields":[2]}
{"Case":"Case3","Fields":[5,"hello"]}
{"Case":"Case4","Fields":["hello"]}
{"Case":"Case5","Fields":["Name",{"Foo":"Hello"}]}
{"Case":"Case6","Fields":[{"Foo":"Hello"}]}

Lists will be rendered as an array:


[1;2;3]


[1, 2, 3]

One weird thing which I don't yet fully grap: Records with a property set to null can be serialized but not deserialized.


type MyRecord = { Foo: string }
seralizer.Serialize({Foo=null}, defaultOptions)

Produces


{ "Foo": null }

but the following throws:


seralizer.Deserialize<MyRecord>("""{Foo=null}""", defaultOptions)

The docs state that

By default, FSharp.SystemTextJson throws an exception when the following conditions are met:

it is deserializing a record or a union; a field's JSON value is null or the field is unspecified; that field's type isn't an explicitly nullable F# type (like option, voption and Skippable)

So I think this is by design, but it's strange that we can serialize a record with a string property set to null, yet not be able to deserialize it. The best solution would be to disallow serialization altogether. If we need to deal with no-value cases, the explicit Option type is the way to go. I'll update this section if I get more clarity on this. If anyone has ideas on how to improve this behavior, it would be awesome if you drop a comment here or here.

Customize FSharp.SystemTextJson Discriminated Union format

I think the defaults are pretty good. Rendering an array for discriminated union fields however seems a bit off to me. Other options exist, so lets customize the default options. In the following example we mostly initialize the JsonFSharpConverter with the default options, but for union fields will use an object with properties instead of an array and unwrap a record field if it is the only field:


let customOptions = System.Text.Json.JsonSerializerOptions ()
customOptions.Converters.Add (
JsonFSharpConverter (
// Encode unions as a 2-valued object: unionTagName (defaults to "Case") contains the union tag, and unionFieldsName (defaults to "Fields") contains the union fields. If the case doesn't have fields, "Fields": [] is omitted. This flag is included in Default.
JsonUnionEncoding.AdjacentTag
// If unset, union fields are encoded as an array. If set, union fields are encoded as an object using their field names.
||| JsonUnionEncoding.NamedFields
// Implicitly sets NamedFields. If set, when a union case has a single field which is a record, the fields of this record are encoded directly as fields of the object representing the union.
||| JsonUnionEncoding.UnwrapRecordCases
// If set, None is represented as null, and Some x is represented the same as x. This flag is included in Default.
||| JsonUnionEncoding.UnwrapOption
// If set, single-case single-field unions are serialized as the single field's value. This flag is included in Default.
||| JsonUnionEncoding.UnwrapSingleCaseUnions
// In AdjacentTag and InternalTag mode, allow deserializing unions where the tag is not the first field in the JSON object.
||| JsonUnionEncoding.AllowUnorderedTag,
// Also the default. Throw when deserializing a record or union where a field is null but not an explict nullable type (option, voption, skippable) https://github.com/Tarmil/FSharp.SystemTextJson/blob/master/docs/Customizing.md#allownullfields
allowNullFields = false
)


type Du =
| Case1
| Case2 of int
| Case3 of int * string
| Case4 of name: string
| Case5 of name: string * payload: MyRecord
| Case6 of MyRecord
| Case7 of Foo: string


{"Case":"Case1"}
{"Case":"Case2","Fields":{"Item":2}} // implicitly named 'Item'
{"Case":"Case3","Fields":{"Item1":5,"Item2":"hello"}} // implicitly named 'Item1' and 'Item2'
{"Case":"Case4","Fields":{"name":"Hello"}} // explicitly named fields
{"Case":"Case5","Fields":{"name":"Hello","payload":{"Foo":"Hello"}}} // explicitly named fields
{"Case":"Case6","Fields":{"Foo":"Hello"}} // unwrapped single value record
{"Case":"Case7","Fields":{"Foo":"Hello"}} // explicitly named field

This format is a little bit more explicit, as our fields get names. I think this should make querying with PostgreSQL simpler.

Last refactor of our domain

Let's refactor our domain one last time, and use the above options to render Unions with named fields instead as arrays and also unwrap single field record cases. Let's refactor our events to use a single GameEvent Union and use one single Apply method with pattern matching:


//... previous types as before
// new: use single type GameEvent with different union cases
type GameEvent =
| GameInitialized of GameInitialized
| PieceMoved of PieceMoved
// new, left out for brevity
| PieceCaptured of PieceCaptured
// new, left out for brevity
| GameEnded of GameEnded
[<CLIMutable>]
type ChessGame =
{ Id: Guid
Pieces: Map<int * int, ChessPiece>
State: State }
// refactored to one Apply method with pattern matching
member this.Apply(e: GameEvent, meta: Marten.Events.IEvent) : ChessGame =
match e with
| GameInitialized e ->
{ Id = meta.StreamId
Pieces = e.Pieces
State = State.NotStarted }
| PieceMoved e ->
let piece = this.Pieces.[e.From]
{ this with Pieces = this.Pieces.Add(e.To, piece).Remove (e.From); State = State.Running }
// other cases left out for brevity
| _ -> this


// ... other code as before
// when appending events we now need to save the `GameEvent`:
session.Events.Append (gameId, GameEvent.GameInitialized { Pieces = [ ((0, 0), { Color = Black; Type = Pawn }) ] |> Map.ofSeq })
|> ignore
session.Events.Append (gameId, GameEvent.PieceMoved { From = (0, 0); To = (0, 1) }) |> ignore

the mt_events table now looks like this:


seq_id: 1
id: 0184f705-88c0-41f0-9ab9-59af825152f6
stream_id: d6a8e070-4547-4401-97b4-baff45e88b50
version: 1
type: game_initialized
timestamp: 2022-12-09 14:14:05.94232+01
tenant_id: *DEFAULT*
mt_dotnet_type: EventStoreTest.Chess+GameInitialized, blog-samples
is_archived:false
data:


{
"Case": "GameInitialized",
"Fields": {
"Pieces": [
[[0, 0], { "Type": { "Case": "Pawn" }, "Color": { "Case": "Black" } }]
]
}
}


seq_id: 2
id: 0184f705-88c4-4162-8cf8-83e80d48d20b
stream_id: d6a8e070-4547-4401-97b4-baff45e88b50
version: 2
type: piece_moved
timestamp: 2022-12-09 14:14:05.94232+01
tenant_id: *DEFAULT*
mt_dotnet_type: EventStoreTest.Chess+GameEvent+PieceMoved, blog-samples
is_archived: false
data:


{ "Case": "PieceMoved", "Fields": { "To": [0, 1], "From": [0, 0] } }

And the current projection state in nt_doc_game_chessgame:


{
"Id": "661ab7db-b3a7-4930-aa46-0e4cc806507c",
"State": {
"Case": "Running"
},
"Pieces": [
[
[0, 1],
{
"Type": {
"Case": "Pawn"
},
"Color": {
"Case": "Black"
}
}
]
]
}

Thanks for reading!

You can browse the full code at https://github.com/jannikbuschke/blog-samples/tree/main/marten-and-fsharp-getting-started

If you have feedback please comment here or here