eventsourcing

Types

pub type AggregateId =
  String
pub type Apply(entity, event) =
  fn(entity, event) -> entity

An EventEnvelop is a wrapper around your domain events used by the Event Stores. You can use this type constructor if the event store provides a load_events function.

pub type EventEnvelop(event) {
  MemoryStoreEventEnvelop(
    aggregate_id: AggregateId,
    sequence: Int,
    payload: event,
    metadata: List(#(String, String)),
  )
  SerializedEventEnvelop(
    aggregate_id: AggregateId,
    sequence: Int,
    payload: event,
    metadata: List(#(String, String)),
    event_type: String,
    event_version: String,
    aggregate_type: String,
  )
}

Constructors

  • MemoryStoreEventEnvelop(
      aggregate_id: AggregateId,
      sequence: Int,
      payload: event,
      metadata: List(#(String, String)),
    )
  • SerializedEventEnvelop(
      aggregate_id: AggregateId,
      sequence: Int,
      payload: event,
      metadata: List(#(String, String)),
      event_type: String,
      event_version: String,
      aggregate_type: String,
    )

The main record of the library. It holds everything together and serves as a reference point for other functions such as execute, load_aggregate_entity, and load_events.

pub opaque type EventSourcing(
  eventstore,
  entity,
  command,
  event,
  error,
  aggregatecontext,
)

Wrapper around the event store implementations

pub type EventStore(eventstore, entity, command, event, error) {
  EventStore(
    eventstore: eventstore,
    load_aggregate: fn(eventstore, AggregateId) ->
      AggregateContext(entity, command, event, error),
    load_events: fn(eventstore, AggregateId) ->
      List(EventEnvelop(event)),
    commit: fn(
      eventstore,
      AggregateContext(entity, command, event, error),
      List(event),
      List(#(String, String)),
    ) ->
      List(EventEnvelop(event)),
  )
}

Constructors

  • EventStore(
      eventstore: eventstore,
      load_aggregate: fn(eventstore, AggregateId) ->
        AggregateContext(entity, command, event, error),
      load_events: fn(eventstore, AggregateId) ->
        List(EventEnvelop(event)),
      commit: fn(
        eventstore,
        AggregateContext(entity, command, event, error),
        List(event),
        List(#(String, String)),
      ) ->
        List(EventEnvelop(event)),
    )
pub type Handle(entity, command, event, error) =
  fn(entity, command) -> Result(List(event), error)
pub type Query(event) =
  fn(AggregateId, List(EventEnvelop(event))) -> Nil

Functions

pub fn add_query(
  eventsourcing: EventSourcing(a, b, c, d, e, f),
  query: fn(String, List(EventEnvelop(d))) -> Nil,
) -> EventSourcing(a, b, c, d, e, g)
pub fn execute(
  event_sourcing: EventSourcing(a, b, c, d, e, f),
  aggregate_id aggregate_id: String,
  command command: c,
) -> Result(Nil, e)

Execute The main function of the package. Run execute with your event_sourcing instance and the command you want to apply. It will return a Result with Ok(Nil) or Error(your domain error) if the command failed.

pub fn execute_with_metadata(
  event_sourcing event_sourcing: EventSourcing(a, b, c, d, e, f),
  aggregate_id aggregate_id: String,
  command command: c,
  metadata metadata: List(#(String, String)),
) -> Result(Nil, e)
pub fn load_events(
  eventsourcing: EventSourcing(a, b, c, d, e, f),
  aggregate_id: String,
) -> List(EventEnvelop(d))
pub fn new(
  event_store: EventStore(a, b, c, d, e),
  queries: List(fn(String, List(EventEnvelop(d))) -> Nil),
) -> EventSourcing(a, b, c, d, e, f)

Create a new EventSourcing instance providing an Event Store and a list of queries you want run whenever events are commited.

Examples

pub type BankAccount {
  BankAccount(opened: Bool, balance: Float)
}

pub type BankAccountCommand {
  OpenAccount(account_id: String)
  DepositMoney(amount: Float)
  WithDrawMoney(amount: Float)
}

pub type BankAccountEvent {
  AccountOpened(account_id: String)
  CustomerDepositedCash(amount: Float, balance: Float)
  CustomerWithdrewCash(amount: Float, balance: Float)
}

pub fn handle(
  bank_account: BankAccount,
  command: BankAccountCommand,
) -> Result(List(BankAccountEvent), Nil) {
  case command {
    OpenAccount(account_id) -> Ok([AccountOpened(account_id)])
    DepositMoney(amount) -> {
      let balance = bank_account.balance +. amount
      case amount >. 0.0 {
        True -> Ok([CustomerDepositedCash(amount:, balance:)])
        False -> Error(Nil)
      }
    }
    WithDrawMoney(amount) -> {
      let balance = bank_account.balance -. amount
      case amount >. 0.0 && balance >. 0.0 {
        True -> Ok([CustomerWithdrewCash(amount:, balance:)])
        False -> Error(Nil)
      }
    }
  }
}

pub fn apply(bank_account: BankAccount, event: BankAccountEvent) {
  case event {
    AccountOpened(_) -> BankAccount(..bank_account, opened: True)
    CustomerDepositedCash(_, balance) -> BankAccount(..bank_account, balance:)
    CustomerWithdrewCash(_, balance) -> BankAccount(..bank_account, balance:)
  }
}
fn main() {
  let mem_store =
    memory_store.new(BankAccount(opened: False, balance: 0.0), handle, apply)
  let query = fn(
    aggregate_id: String,
    events: List(eventsourcing.EventEnvelop(BankAccountEvent)),
  ) {
    io.println(
      "Aggregate Bank Account with ID: "
      <> aggregate_id
      <> " commited "
      <> events |> list.length |> int.to_string
      <> " events.",
    )
  }
  let event_sourcing = eventsourcing.new(mem_store, [query])
}
Search Document