top | item 38087659

(no title)

reese_john | 2 years ago

Using only the event ID to track events processed by the subscription is unreliable and can result in lost events. The ID column of the ES_EVENT table is of type BIGSERIAL.

It's a notational convenience for creating ID columns having their default values assigned from a SEQUENCE generator. PostgreSQL sequences can't be rolled back.

SELECT nextval('ES_EVENT_ID_SEQ') increments and returns the sequence value. Even if the transaction is not yet committed, the new sequence value becomes visible to other transactions.

If transaction #2 started after transaction #1 but committed first, the event subscription processor can read the events created by transaction #2, update the last processed event ID, and thus lose the events created by transaction #1.

Very interesting write-up, thanks!

Could you comment on this approach by the folks at Citus ? It uses pg_sequence_last_value() to get the last value of the sequence, then does this "one weird trick" to make sure there are no more uncommitted writes with a identifier lower or equal to $LAST_EVENT_ID. I haven't tried it in production, since the table lock is poised to raise a few eyebrows.

  SELECT event_table_name, last_aggregated_id+1, pg_sequence_last_value(event_id_sequence_name)
    INTO table_to_lock, window_start, window_end
    FROM rollups
    WHERE name = rollup_name FOR UPDATE;

    IF NOT FOUND THEN
        RAISE 'rollup ''%'' is not in the rollups table', rollup_name;
    END IF;
    
    IF window_end IS NULL THEN
        /* sequence was never used */
        window_end := 0;
        RETURN;
    END IF;

    /*
     * Play a little trick: We very briefly lock the table for writes in order to
     * wait for all pending writes to finish. That way, we are sure that there are
     * no more uncommitted writes with a identifier lower or equal to window_end.
     * By throwing an exception, we release the lock immediately after obtaining it
     * such that writes can resume.
     */
    BEGIN
        EXECUTE format('LOCK %s IN SHARE ROW EXCLUSIVE MODE', table_to_lock);
        RAISE 'release table lock' USING ERRCODE = 'RLTBL';
    EXCEPTION WHEN SQLSTATE 'RLTBL' THEN
    END;
    
    UPDATE rollups SET last_aggregated_id = window_end WHERE name = rollup_name;

https://gist.github.com/marcocitus/1ac72e7533dbb01801973ee51...

discuss

order

eugene-khyst|2 years ago

The "little trick" in the Citus approach is very inventive. SHARE ROW EXCLUSIVE mode protects a table against concurrent data changes, and is self-exclusive so that only one session can hold it at a time. Thus, when such lock is obtained, we can be sure that there are no more pending transactions with uncommited changes. It's a protection from loosing data of the pending transactions. Throwing the exception immediately releases the lock. Thus, the exclusive table lock is held for milliseconds. I like the general idea, but I don't want to add plpgsql functions/procedures. I'll see if this can be elegantly implemented in Java+SQL (without plpgsql) and perhaps add it as alternative approach to my project. Such approach may be even more effective because it focuses on a single table and not on all transactions like the one described in my project, thus, locks on irrelevant tables have no effect on event handlers. Thanks for sharing.

nextaccountic|2 years ago

Why no plpgsql? Is it because the language is bad? If so, what about something like pl/rust https://plrust.io/ ? (Or other language)