Hacker Newsnew | past | comments | ask | show | jobs | submitlogin
Show HN: Light implementation of Event Sourcing using PostgreSQL as event store (github.com/eugene-khyst)
231 points by eugene-khyst on Oct 31, 2023 | hide | past | favorite | 47 comments
Hi everyone,

If you have a Java Spring Boot application with a PostgreSQL database, you can implement Event Sourcing without introducing new specialized databases or frameworks.

If you have an application dealing with an entity called Order, you should adopt Event Sourcing to keep track of all changes, and know how the Order got into the current state.

Event Sourcing gives you:

1. the true history of the system (audit and traceability),

2. the ability to put the system in any prior state (debugging),

3. the ability to create read projections from events as needed to respond to new demands.

There are several well-known specialized frameworks and databases for Event Sourcing: EventStoreDB, Marten, Eventuate, to name a few. But adopting a new framework or database you are not familiar with may stop you from trying the Event Sourcing pattern in your project. But you can actually implement Event Sourcing with a few classes and use PostgreSQL as an event store.

The "postgresql-event-sourcing" project is a reference implementation of an event-sourced system that uses PostgreSQL as an event store built with Spring Boot. Fork the repository and use it as a template for your projects. Or clone the repository and run end-to-end tests to see how everything works together.

The project describes in detail:

- database model for storing events,

- synchronous and asynchronous event handlers,

- CQRS,

- Transactional Outbox pattern,

- Polling Publisher pattern,

- optimized publisher that uses PostgreSQL LISTEN/NOTIFY capabilities,

- and more.

This project can be easily extended to comply with your domain model.

The source code is available on GitHub <https://github.com/eugene-khyst/postgresql-event-sourcing>.



Using audit tables is another approach that avoids rolling up the data periodically, and you avoid all the needless complexity this approach can introduce; particularly if you're not 100% sure you need it.

You gain temporal querying with a history/audit table that mirrors the main one (or not -- if you prefer not to, and instead want to cram everything into a jsonb column).

Combine it with TSTZRANGE and you can better express the bounds of when the data was last amended and valid. A "period" column has a range `[start, null)` indicating it's current; `[start, end)` indicating an older record. Your main table is always current, with the history table recording every change made. The benefit of this approach is that you can use a GiST index and postgres' datetime ranges to find rows that intersect a point or range in time. If it gets too big, then think of ways you can roll up or prune old records, as needed.

And you can have all of this without compromising on your table structure. Using an ORM and you can have it mirror your audit tables as needed, or of course you can use any number of other methods, such as, ugh, table inheritance or a bit of plpgsql + create table as magic.

Audit tables are useful, and they can approximate a lot of this event system stuff but without the downsides.


No doubt audit tables are a popular alternative to event sourcing. But if the current state and changes history of the entity are stored in different tables, someone may say: "Prove me that your audit log is correct". Because you are not using audit table for the business logic, you may not immediately notice the problem with it that corrupts the audit log. Event Sourcing provides other advantages, not only audit log. For example, a service command typically needs to create/update/delete aggregates in the DB (JDBC/R2DBC) and send messages to a Kafka. Without using the two-phase commit (2PC), sending a message in the middle of a transaction is not reliable. There is no guarantee that the transaction will commit. With Event Sourcing you have to subscribe to the event and send the message to Kafka from listener. The delivery guarantee is "at least once". Anyway, there is a demand for Event Sourcing on the market


Regarding publishes to a message broker, the transactional outbox pattern (mentioned in TFA , and something that can be used on its own) provides similar capabilities if you don't want to fully buy into event sourcing.

https://microservices.io/patterns/data/transactional-outbox....


I was thinking about this yesterday and your tip about TSTZRANGE is a good one. My main concern with audit table is that now every write becomes a transaction (if you are concerned with consistency between you master table and your audit table) and you have to decide on what content goes in your master table and what goes in your audit table. When I tried to sketch this out for my use case it turns out to be non-trivial.

I was thinking about this specifically for a jobs table where the jobs go through states (e.g. pending, in-progress, completed, failed). Having an audit of each of those states alongside relevant details might be useful. However, once I start hammering out the details I find myself reaching for pl/pgsql and worrying about putting too much business logic into my db. It starts to feel like I'm building a state machine on top of SQL.

I actually think this dual use-case is something we can really improve upon. In some fever-dream what I think I want is some combination of Kafka (append-only event stream) and Postgres (durable/reliable snapshot of current state).


> I actually think this dual use-case is something we can really improve upon. In some fever-dream what I think I want is some combination of Kafka (append-only event stream) and Postgres (durable/reliable snapshot of current state).

Debezium will capture SQL changes to Kafka for you.


Kafka is amazing for what it is made for, but it doesn't seem to solve the query problem for past states. For example, if there is some intermediary details related to the "in-progress" state that will get overwritten once the job transitions into the "complete" state (or "error" state) then that is non-trivial to query from Kafka.

Even in you decide to keep those intermediary states in the main table then there are other niggles, like retries. If a job gets picked up and fails then I might write to an `error_details` column in the main table. However, if I have retries and the job fails a couple of times then only the latest error details are in the main table. If I want to reconstruct the history of the job I have to somehow retrieve each error event for that job from my append only log. And now I'm querying across systems and combining the data in the application tier.

I'm not saying these aren't solvable problems or that there doesn't exist tools already that can achieve what I'm talking about. Engineers love to say "why don't you just ..." for almost any conceivable problem. What I mean to say is that we seem to be separating things into different systems (append only logs vs. rdbms) which feel like they might be more tightly related. rdbms are like one half and append only logs are the other half. Maybe one day those halves will be combined.


Yes, Debezium is an implementation of the Transaction log tailing pattern an alternative to Transactional outbox pattern.


> You gain temporal querying with a history/audit table that mirrors the main one (or not -- if you prefer not to, and instead want to cram everything into a jsonb column).

I rather liked that MariaDB has temporal tables out of the box: https://mariadb.com/kb/en/temporal-tables/

I think for PostgreSQL there was an extension last I checked: https://pgxn.org/dist/temporal_tables/ (edit: this doesn't appear very popular)

Pretty nice, when the database does the heavy lifting and you have to do a bit less work yourself.


I love temporal tables. I built an implementation in Postgres [1] in a few hundred lines of PLPGSQL. We've run the implementation for several years across a few hundred tables for an ERP product.

The core ideas of the implementation:

- The current table is a typical Postgres table. The past table's primary key is (curr_tbl.pk, asr), where the asr column is the valid period of the row using a domain on tstzrange.

- Create statement-based triggers on the current table to copy the after-image of modified rows to the past table. Statement-based triggers amortize function call overhead compared to row-base triggers (I think depesz compared the performance and found about a 10x difference, but I can't find the link).

- Copying the row after-image on update and delete to the past table is advantageous because inserting a new row has no overhead. The disadvantage is that it's harder to figure out when a row was deleted (requires checking for gaps in the past table).

Some pointers if you dive into uni-temporal tables in Postgres:

- Separate the past table and current table into different schemas. Most database users shouldn't modify the past tables. It's easier to grant permission by schema and it makes autocomplete nicer. By convention, we use erp.invoice and erppast.invoice.

- Use a domain type instead of tstzrange to blunt the sharp edges of tstzrange. See the temporal.period domain type in [1].

- Resist the temptation to query against the past table. Temporal joins are fabulously complex without first-class database support (like Oracle's AS OF).

- Optimize for the typical case: querying the current table. Our first temporal implementation used table inheritance consisting of three tables: a parent table, the current table, and a past table. Theoretically, the parent table lets you query data transparently across the parent and child tables. In practice, we didn't use the parent query capability at all. Having a parent table made code review harder by requiring the reviewer to check that the code queried the current table and not the parent table. It's easy enough and rare enough to query all versions of a row by using:

    SELECT * FROM current_table
    UNION ALL
    SELECT * FROM past_table
- Track the current and past tables in a metadata table. Use tests to check that columns don't drift between the current and past tables. See misaligned_cols.sql in [1] for an example.

- Unfortunately, you can't use declarative partitioning because the primary key of the past table differs from the current table. The past table must include the valid period column to differentiate it from other versions of the same row.

[1]: https://gist.github.com/jschaf/cd1c1a3c2a5897282929ee5e16f94...


How do you manage schema changes?


Table renames don’t break anything.

Changing columns must be done to both tables in a transaction. Unit tests verify that the table columns match.


While I haven't kept up to date with Event Sourcing, the README referenced is a surprisingly in-depth tour of Postgresql functionality and internals, and typical relational database gotchas related to sequences, queues, transaction reliability and using notify. Worth a read.


Nice job, eugene-khyst. Looks very comprehensive from an initial skim.

I've worked on something in the same space, with a focus on reliable but flexible synchronization to many consumers, where logical replication gets impractical.

I have a mind to do a proper writeup, but at least there is code at https://github.com/cognitedata/txid-syncing (MIT-licensed) and a presentation at https://vimeo.com/747697698

The README mentions …

> A long-running transaction in the same database will effectively "pause" all event handlers.

… as the approach is based on the xmin-horizon.

My linked code works with involving the MVCC snapshot's xip_list as well, to avoid this gotcha.

Also, note that when doing a logical restore of a database, you're working with different physical txids, which complicates recovery. (So my approach relies on offsetting the txid and making sure the offset is properly maintained)


Thanks for sharing.

> My linked code works with involving the MVCC snapshot's xip_list as well, to avoid this gotcha.

I will definitely take a look. It would be great to fix this problem. This problem really concerns me, although in most cases it is not critical.


Agreed - perhaps the most comprehensive README I've ever seen.


I'm working on an append-only (immutable) (bi)temporal DBS[1] in my spare time, which transforms CRUD operations into an event store, automatically providing an audit log for each stored node, while the nodes are stored with immutable node-IDs, which never change. As the contents stored are based on a custom binary JSON format also a rolling hash can optionally be built, to check if a whole subtree has changed or not.

You can also add commit comments, revert to a specific revision (while preserving all revisions in-between)...

The system uses persistent index data structures to share unchanged pages between revisions.

The intermittant snapshots are omitted. Rather the snapshot is spread over several revisions, applying a sliding snapshot algorithm on the data pages (thus, avoiding write peaks, while at max a predefined number of page fragments has to be read in parallel to reconstruct a page in-memory).

[1] https://github.com/sirixdb/sirix | https://sirix.io | https://sirix.io/docs/concepts.html


In addition to the alternatives mentioned, here’s another Postgres-only ES implementation: https://github.com/message-db/message-db


This reminds me of Commanded[0] for elixir which also uses Postgresql by default.

[0]https://github.com/commanded/commanded


Check out Marten for a fully fleshed out implementation https://github.com/JasperFx/marten


Have to say this is a great resource.

I've been using it to discuss how my team could understand and build a similar implementation to this using Golang+Postgres. It provided a very strong reference point and helped us avoid a large number of possible pitfalls. Thank you eugene-khyst.

Currently only processing a few hundred thousand events/commands a day and still baselining the architecture but performance and reliability of our implementation is looking very promising.


This seems as good a place to ask as any. I've become interested in event sourcing because of a problem it might help with.

Essentially, we have a system where users can log events against assets in the business. These events change the state of the assets and people want to query the current state of the assets. But the problem is people sometimes forget to log events. My thinking is it doesn't matter what order the events get logged, as long the date is recorded correctly. But none of the event sourcing implementations I've seen seem to talk about logging events out of order.

I'm not sure if I'm barking up the wrong tree here. It seems like it would be a fairly standard thing in lots of systems but it's difficult to know what to search for.


"retroactive events" is probably the thing to look for, e.g. https://www.infoq.com/news/2018/02/retroactive-future-event-...


You can totally do that. As you read the events, you can store and sort them in date order, then produce the state from the sorted order of events when you've finished reading the stream. There's nothing wrong with storing intermediary state before producing your final aggregate.

It might mean you can't do snapshotting unless you add additional logic though - checking for the date of the last seen event and triggering a new snapshot due to the out-of-orderness of the event entry.


This is what I was thinking. Thanks for confirming it makes sense. I don't know why it seems like the kind of thing I'm sure there must be a ton of existing work and knowledge, but it's quite disconcerting when I can't find any of it.

I did think the same with snapshotting. I was thinking in the system the addition of an event would have to invalidate all subsequent snapshots (can be done quickly), then asynchronously recalculate those snapshots again using the new history. Or perhaps using the transaction time of events and snapshots to invalidate the snapshots (ie. if a snapshot was created before the most recently recorded event, according to transaction time, then the snapshot is invalid).


>I don't know why it seems like the kind of thing I'm sure there must be a ton of existing work and knowledge, but it's quite disconcerting when I can't find any of it.

Yeah, I hate to say it, but a lot of the writing about ES is trying to steer you toward paying consultants to think these things up for you. The truth is that everyone is doing it there own way - there isn't a correct way to do it, only trade offs.

The nice thing is that you always have your event log and so you can optimize projection/state building.

>I did think the same with snapshotting. I was thinking in the system the addition of an event would have to invalidate all subsequent snapshots (can be done quickly), then asynchronously recalculate those snapshots again using the new history. Or perhaps using the transaction time of events and snapshots to invalidate the snapshots (ie. if a snapshot was created before the most recently recorded event, according to transaction time, then the snapshot is invalid).

Yes, well, you can mark a snapshot as invalid if it was built after the decision time. What you can do is jump back to an earlier snapshot and start processing events as of that snapshot's version. This way you can do something like

(regular dates used for ease of reading)

    Snapshot(stream_vsn=90,  date=Date(2023,10,1), latest_decision_date=Date(2023,10,1))
    Snapshot(stream_vsn=100, date=Date(2023,10,10), latest_decision_date=Date(2023,10,7))
    Snapshot(stream_vsn=110, date=Date(2023,10,21), latest_decision_date=Date(2023,10,21))
So you get a new event with a decision date of 2023-10-8. You can invalidate the last snapshot, build from the second snapshot (then invalidate it), and leave the first snapshot as is. You can do build_snapshot(Snapshot(stream_vsn=100), all_events_after_vsn_100)) as an optimization since no events before version 100 affect the state.


do you mean "bitemporal"?

adding the info now (txtime) but as of backward timestamp (validtime) ?

This got in focus again recently. (the full thing is 3-temporal) see wikipedia, xtdb, etc


Thanks, those are some very useful keywords. Yes, what I'm talking about is a bi-temporal database. At the moment it's uni-temporal and really doesn't work!


The illustrations are outstanding.


The illustrations are made with PlantUML.


Thanks!


Using only the event ID to track events processed by the subscription is unreliable and can result in lost events. The ID column of the ES_EVENT table is of type BIGSERIAL.

It's a notational convenience for creating ID columns having their default values assigned from a SEQUENCE generator. PostgreSQL sequences can't be rolled back.

SELECT nextval('ES_EVENT_ID_SEQ') increments and returns the sequence value. Even if the transaction is not yet committed, the new sequence value becomes visible to other transactions.

If transaction #2 started after transaction #1 but committed first, the event subscription processor can read the events created by transaction #2, update the last processed event ID, and thus lose the events created by transaction #1.

Very interesting write-up, thanks!

Could you comment on this approach by the folks at Citus ? It uses pg_sequence_last_value() to get the last value of the sequence, then does this "one weird trick" to make sure there are no more uncommitted writes with a identifier lower or equal to $LAST_EVENT_ID. I haven't tried it in production, since the table lock is poised to raise a few eyebrows.

  SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
    INTO table_to_lock, window_start, window_end
    FROM rollups
    WHERE name = rollup_name FOR UPDATE;

    IF NOT FOUND THEN
        RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
    END IF;
    
    IF window_end IS NULL THEN
        /* sequence was never used */
        window_end := 0;
        RETURN;
    END IF;

    /*
     * Play a little trick: We very briefly lock the table for writes in order to
     * wait for all pending writes to finish. That way, we are sure that there are
     * no more uncommitted writes with a identifier lower or equal to window_end.
     * By throwing an exception, we release the lock immediately after obtaining it
     * such that writes can resume.
     */
    BEGIN
        EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock);
        RAISE 'release table lock' USING ERRCODE = 'RLTBL';
    EXCEPTION WHEN SQLSTATE 'RLTBL' THEN
    END;
    
    UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;

https://gist.github.com/marcocitus/1ac72e7533dbb01801973ee51...


The "little trick" in the Citus approach is very inventive. SHARE ROW EXCLUSIVE mode protects a table against concurrent data changes, and is self-exclusive so that only one session can hold it at a time. Thus, when such lock is obtained, we can be sure that there are no more pending transactions with uncommited changes. It's a protection from loosing data of the pending transactions. Throwing the exception immediately releases the lock. Thus, the exclusive table lock is held for milliseconds. I like the general idea, but I don't want to add plpgsql functions/procedures. I'll see if this can be elegantly implemented in Java+SQL (without plpgsql) and perhaps add it as alternative approach to my project. Such approach may be even more effective because it focuses on a single table and not on all transactions like the one described in my project, thus, locks on irrelevant tables have no effect on event handlers. Thanks for sharing.


Why no plpgsql? Is it because the language is bad? If so, what about something like pl/rust https://plrust.io/ ? (Or other language)


plpgsql is good language. But in my experience Java and .NET developers tend to choose solutions that do not use plpgsql, PL/SQL, T-SQL. And these developers is the main audience for the project.


very interesting.

There is some few-years-old inhouse eventsourcing system that starts to choke on few dozen million events, because of.. many reasons. And there was a plan to scrap it and rewrite. Though the new one may or may not be pure eventsourcing - as long it does the job (tracking claims and all their follow-up, like communications etc, with full audit i.e. "why-this-is-so").

So... for about an year i have been toying with the idea to use a bitemporal database (xtdb) instead of eventsourcing, using the particular-state-of-db for any business processing as-of-its-timestamp. Parallelizing the sagas as much as needed/allowed-as-of-business - as long as they can get their as-of time-cut (and also replicating the database (== indexes == "reading-aspect") as much as needed).

Although, as that is somewhat higher level than just what eventsourcing gives, that means maybe rewriting a bit more than just the data-storage/access, i.e. some layer of the saga's business logic as well - e.g. the notion of readmodel disappears, being replaced by a query to the bitemporal-db as-of-time-x.

Then a few weeks ago i stumbled upon EventStoreDb, and thought that... maybe it can be used to replace just the data-storage/access, and not having to rethink the business logic above that (still may need some rewrite, but hopefuly not completely reverse-engineer + splitting notions).

Now i see even more alternatives :)

The funny thing is.. i have not tried either approach for real, yet. But going to.. very soon. There is now a project and resources for it..

e-mail is in the profile


I can definitely recommend EventStoreDB. I used it in production and most colleagues like this DB. I have a sample Java Spring Boot + EventStoreDB project <https://github.com/eugene-khyst/eventstoredb-event-sourcing>.


How do you roll the history up into the current state? I get the idea, but where is the actual code or query that does this? Especially how do you make an up to date read model with postgres?



I noticed that you use the @Transactional annotation on class definition. This will create a write transaction for every public method of the annotated class, including read only methods. You should consider using readOnly=true for read methods.

Additionally, I would consider using two data sources, one for write queries and a read only ds for the Q part of CQRS.


Thanks for suggestions. I will add @Transactional(readOnly = true) annotation. I will mention in the README the possibility of using two data sources.


alright thanks. this java stuff is pretty hard for me to follow. it looks like java is doing the aggregating, but maybe this is some kind of ORM


As long as you're able to assert event version on write (and fail if the expected stream version is wrong), you're already ahead of Kafka on event sourcing capability.


I tried to evaluate Kafka usage for event sourcing: <https://github.com/eugene-khyst/ksqldb-event-souring>. More out of curiosity. But never tried it in production.


Kafka doesn't have a way to assert stream version on event write, which is critical for CQRS. Without it, you can't guarantee stream state when processing a command without resorting to a singleton/locks which does not scale at all. Why Apache doesn't wish to support such a critical feature is beyond me though.

https://issues.apache.org/jira/browse/KAFKA-2260


It appears to be missing any licensing information


Thanks for noticing. I will add Apache License 2.0.


This is very well written!




Guidelines | FAQ | Lists | API | Security | Legal | Apply to YC | Contact

Search: