feat(search-sync-worker): add spotlight + user-room sync via INBOX#109
feat(search-sync-worker): add spotlight + user-room sync via INBOX#109
Conversation
Add two Collection implementations in search-sync-worker that consume
member_added/member_removed events from the INBOX stream and maintain
the spotlight (room typeahead) and user-room (access control)
Elasticsearch indexes.
Index naming:
- spotlight-{site}-v1-chat (overridable via SPOTLIGHT_INDEX)
- user-room-{site} (overridable via USER_ROOM_INDEX)
Stream naming
- pkg/stream.Inbox(siteID) now returns a fully-populated Config —
Name = `INBOX_{siteID}` and Subjects = `chat.inbox.{siteID}.>`. This
is an additive change; inbox-worker's existing call reads only
`.Name` and is unaffected. The change centralizes every stream name
and subject pattern in pkg/stream/stream.go so any consumer in the
repo can see at a glance which stream it binds to and with what
subject filter. Cross-site federation (Sources + SubjectTransforms
sourcing from remote OUTBOX streams) stays out of the baseline and
is layered on by the service that owns stream creation.
Collection interface
- BuildAction now returns []BulkAction so a single JetStream message
can fan out to zero, one, or multiple ES actions. Handler tracks
per-message action ranges and acks/nakks each source message as a
unit — any failed action in the range naks the whole source event
for redelivery.
- New FilterSubjects(siteID) so inbox-based collections can subscribe
to both local (chat.inbox.{site}.member_*) and federated
(chat.inbox.{site}.aggregate.member_*) variants via NATS 2.10+
consumer FilterSubjects.
- StreamConfig returns jetstream.StreamConfig, converted from the
canonical pkg/stream.* definition so collections never redefine
stream names locally.
Shared bits to remove duplication
- inboxMemberCollection base struct centralizes StreamConfig (reading
from pkg/stream.Inbox) and FilterSubjects for spotlight and
user-room. It holds no per-instance state.
- parseMemberEvent helper decodes OutboxEvent + MemberAddedPayload
and validates preconditions shared by both inbox-member collections.
- esPropertiesFromStruct[T any] generic consolidates template-mapping
reflection — messages and spotlight share the same code path.
pkg/searchengine
- New ActionUpdate type. Bulk adapter emits a plain `update` meta
without version/version_type because _update is a read-modify-write
operation and ES rejects external versioning on it (applies to both
doc-merge and scripted updates; not specific to painless).
- Index action still uses external versioning (Version = evt.Timestamp)
for messages and spotlight idempotency.
pkg/model
- New MemberAddedPayload{Subscription, Room} — the payload shape
carried by OutboxEvent{Type: "member_added"} so inbox-member
consumers can index without a DB lookup.
- OutboxMemberAdded / OutboxMemberRemoved constants replace stringly-
typed "member_added" / "member_removed" literals in the new code.
pkg/subject
- New InboxMemberAdded / InboxMemberRemoved builders for local-publish
subjects, their Aggregate counterparts for federated (transformed)
subjects, and InboxAggregatePattern for inbox-worker's future
FilterSubject. InboxMemberEventSubjects returns the four-subject
list used by spotlight and user-room consumers.
spotlightCollection
- Per-subscription docs keyed by Subscription.ID; ActionIndex on
member_added, ActionDelete on member_removed, both with
Version = evt.Timestamp so the external-version check makes
out-of-order delivery safe.
- Template pattern `spotlight-*` with search_as_you_type on roomName
via a whitespace/lowercase custom analyzer.
userRoomCollection (multi-pod safe)
- One doc per user, keyed by user account. rooms is a plain string
array used by the search service as a `terms` filter on message
search queries.
- member_added emits ActionUpdate with a painless script + upsert;
member_removed emits ActionUpdate with a painless script only.
- Restricted rooms (Subscription.HistorySharedSince != nil) are
skipped — the search service handles those via DB+cache at query
time.
- Per-room LWW guard in the scripts: each doc carries a flattened
roomTimestamps map of roomId -> last-applied event timestamp. Both
scripts read the stored timestamp, compare to params.ts, and set
ctx.op = 'none' if the incoming event is stale — ES skips the
write entirely (no version bump, no disk I/O). This makes
user-room-sync safe to run with multiple pods sharing the durable
consumer: ES's primary-shard per-doc atomicity serializes
concurrent _update operations and the guard converges on
highest-timestamp-wins regardless of physical arrival order.
- Timestamp source is OutboxEvent.Timestamp (publish time) NOT
Subscription.JoinedAt, because JoinedAt is immutable on the
subscription row and both added/removed events for the same
subscription would otherwise carry the same value and become
indistinguishable to the guard.
- Template pattern `user-room-*` maps rooms as text+keyword (keeping
existing query behavior) and roomTimestamps as `flattened` to
avoid mapping explosion as new roomIds accumulate.
- Remove path carries only rid + ts — no `now`, no updatedAt stamp,
because removal has no user-visible doc mutation to timestamp.
Bootstrap config (nested, test-only)
- New bootstrapConfig struct groups fields that are meaningful ONLY
when the worker is standing up its own streams in dev / integration
tests. Env vars are all prefixed BOOTSTRAP_ so they're obvious in
deployment manifests.
- BOOTSTRAP_STREAMS (bool) — toggles CreateOrUpdateStream.
- BOOTSTRAP_REMOTE_SITE_IDS (list) — cross-site OUTBOX sources to
attach to INBOX during bootstrap.
- In production streams are owned by their publishers
(message-gatekeeper for MESSAGES_CANONICAL, inbox-worker for INBOX)
and search-sync-worker only manages its own durable consumers.
Neither bootstrap field is consulted.
- Collections hold NO remote-site state. The bootstrap loop in
main.go detects the INBOX stream by comparing against
`stream.Inbox(cfg.SiteID).Name` and swaps the collection's
baseline config for inboxBootstrapStreamConfig (which layers on
cross-site Sources + SubjectTransforms) before calling
CreateOrUpdateStream. Stream creation is deduped by name so
spotlight + user-room don't double-create the shared INBOX stream.
Consumers
- Per-purpose durable names: message-sync, spotlight-sync,
user-room-sync. Graceful shutdown waits on all three runConsumer
goroutines via a doneChs slice.
Scope note
- inbox-worker is intentionally NOT modified here. The enhanced
INBOX behavior (publishing, consuming aggregate.* events,
migrating the handler to the new MemberAddedPayload shape) ships
in a separate PR. The pkg/stream.Inbox change in this PR is
additive — inbox-worker reads only Name and is unaffected.
https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
…member events Extend MemberAddedPayload from a single Subscription to []Subscriptions so a single room-worker publish can carry N users being added/removed from the same room in one admin action. Rewire BuildAction, the handler's buffer accounting, and the consumer loop so fan-out events are bounded correctly against the ES bulk request limit. Why --- Bulk invite (admin invites 5 users to a room at once) is a real use case. The previous event shape forced room-worker to publish N events for one admin action, which worked for 1:1 delivery but now that we've committed to fan-out semantics downstream, pushing the bulk shape through the event model is cleaner: one publish, one event, one atomic DB write at the producer, and the subscription list travels together. This commit lands the payload schema change plus every consumer-side adjustment needed to ingest it safely. Payload schema (pkg/model/event.go) ----------------------------------- - MemberAddedPayload.Subscription Subscription → Subscriptions []Subscription - All subscriptions in one event MUST target the same Room (documented on the struct). - Round-trip test in pkg/model updated to exercise two subscriptions (one restricted, one unrestricted) in one payload. Fan-out in collections (search-sync-worker) ------------------------------------------- - spotlightCollection.BuildAction now loops over Subscriptions and emits len(subs) actions per event. All actions from one event share the same external Version (evt.Timestamp) so a redelivery 409s uniformly. - userRoomCollection.BuildAction loops and emits one ActionUpdate per subscription, each keyed by a different user account (distinct ES docs). Restricted-room filtering (HistorySharedSince != nil) moves INSIDE the loop so a mixed bulk invite (some restricted, some not) only produces actions for the unrestricted subscriptions. If every subscription in the event is restricted, BuildAction returns an empty slice and the handler acks the source message without touching ES — same path as existing filter-out semantics. - messageCollection is unchanged. Message sync stays strictly 1:1 — fan-out is only for member events. - newSpotlightSearchIndex now takes (*Subscription, *Room) instead of *MemberAddedPayload so it can be called inside the loop. Handler action-count bookkeeping (handler.go) --------------------------------------------- The handler already tracked per-message action ranges (pendingMsg.actionStart / actionCount), so Flush's ack-all-or-nak-all-per-source logic is already fan-out-correct. The change is in the public API: - New ActionCount() — count of buffered ES bulk actions. This is the quantity that should drive the flush decision for fan-out collections. - Renamed BufferLen() → MessageCount() to make it unambiguous that this is the source-message count, not the action count. Used for diagnostics and the per-source ack/nak accounting. - Removed BufferFull() — it was checking message count against the batch size, which is wrong for fan-out. Callers now compare ActionCount() directly. - Renamed the Handler's internal field batchSize → bulkSize to reflect that it bounds buffered actions, not messages. Consumer loop split: FETCH_BATCH_SIZE vs BULK_BATCH_SIZE (main.go) ------------------------------------------------------------------ Previously one BATCH_SIZE env conflated two distinct concerns. Split into two clearly-named variables so operators can tune them independently and so readers can tell which concern any given value relates to: - FETCH_BATCH_SIZE (default 100): max JetStream messages pulled per cons.Fetch() round-trip. Pure JetStream-client knob — does NOT bound ES bulk size. - BULK_BATCH_SIZE (default 500): soft cap on buffered ES bulk actions before a flush is triggered. The real ES-side bound. - FLUSH_INTERVAL (unchanged): max seconds before a time-based flush. runConsumer is rewritten to be fan-out-safe: 1. Before each Fetch, clamp fetchCount to min(FETCH_BATCH_SIZE, BULK_BATCH_SIZE - ActionCount()). This prevents a steady stream of 1:1 messages from overshooting BULK_BATCH_SIZE. 2. A mid-message-loop flush catches the single-fat-message case: if one fan-out event alone pushes ActionCount past BULK_BATCH_SIZE, flush immediately before processing the next message in the fetch batch — otherwise the next message's actions would add to an already-oversized bulk request. 3. Outer flush conditions unchanged: BULK_BATCH_SIZE hit → flush, FLUSH_INTERVAL elapsed with non-empty buffer → flush. Tests ----- Unit tests: - pkg/model: TestMemberAddedPayloadJSON now uses a 2-subscription fixture (one restricted, one not). - spotlight_test: new baseBulkMemberAddedPayload helper; TestSpotlightCollection_BuildAction_BulkInvite verifies 3 subs → 3 actions with shared Version; TestSpotlightCollection_BuildAction_BulkRemove verifies 2 subs → 2 ActionDelete actions. - user_room_test: new TestUserRoomCollection_BuildAction_BulkInvite (3 unrestricted subs → 3 distinct user doc updates); TestUserRoomCollection_BuildAction_BulkInviteMixedRestricted (2 of 4 subs are restricted → only 2 actions emitted); TestUserRoomCollection_BuildAction_AllRestrictedIsNoOp (every sub restricted → empty slice, no error). - handler_test: new fanOutCollection stub (emits N actions per msg); TestHandler_FanOut covers (a) MessageCount/ActionCount diverge, (b) all fan-out succeed → source acked, (c) any fan-out fails → source nakked, (d) multi-message mixed success — only the message whose range contains a failure gets nakked, the other acks independently. - Existing tests updated: payload.Subscription.X → payload.Subscriptions[0].X; BufferLen calls renamed to MessageCount. Integration tests: - New buildBulkMemberEventPayload helper for multi-sub scenarios, with a memberFixture struct for clean (account, subID, restricted) rows. Single-sub helper delegates to it. - New TestSpotlightSync_BulkInvite: publishes one event with 3 subscriptions, drains 1 JetStream message, asserts 3 spotlight docs land; then publishes bulk remove and asserts all 3 are gone. - New TestUserRoomSync_BulkInvite: publishes one event with 4 subscriptions (2 restricted), drains 1 message, asserts only the 2 unrestricted users got upserted; then bulk-remove asserts the 2 user docs have empty rooms arrays (ghost docs retained for LWW monotonicity) while restricted users are still absent. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
…ing, fail fast on bad config Follow-up fixes to CodeRabbit's review of 201c715 plus a naming consistency pass on the flush-interval config. Rename FlushInterval -> BulkFlushInterval ------------------------------------------ FlushInterval alone was ambiguous — what gets flushed? The variable it partners with (BulkBatchSize) already has the Bulk prefix, and FlushInterval sitting next to it without the prefix looked like an unrelated concept. Rename both the Go field and the env var so the two ES-bulk-flush triggers (size-based BulkBatchSize, time-based BulkFlushInterval) share a consistent Bulk* prefix. FlushInterval (int) -> BulkFlushInterval (int) FLUSH_INTERVAL (env) -> BULK_FLUSH_INTERVAL (env) No back-compat shim — the field only ships in this unmerged PR, so there's no deploy consuming the old name. Tighten 404 handling in handler.go (CodeRabbit 🟠 major) -------------------------------------------------------- Previous commit treated ANY 404 on Delete/Update as idempotent success. That was too broad: `index_not_found_exception` at 404 means the backing index/template is missing or misconfigured, and silently acking those would drop messages on a bad deploy with no feedback. Fix: pkg/searchengine.BulkResult: - New ErrorType field (populated from the ES bulk item error.type). - BulkResult.Error (Reason) remains human-readable; ErrorType is the machine-readable classifier callers should match on. pkg/searchengine/adapter.go: - Propagates detail.Error.Type into BulkResult.ErrorType alongside the existing Reason. search-sync-worker/handler.go isBulkItemSuccess: - Delete 404: success ONLY when ErrorType is empty (delete on a missing doc sets result=not_found with no error block). Any other error type at 404 (notably index_not_found_exception) is a real failure. - Update 404: success ONLY when ErrorType == "document_missing_exception" (user-room remove on an empty doc). index_not_found_exception or any unfamiliar error type fails closed. - Index 404: always a failure (unchanged — indexing should create the doc, so 404 means the index itself is missing). Updated TestIsBulkItemSuccess with 14 cases covering the new shape: document_missing_exception vs index_not_found_exception on both delete and update, plus an "unknown error type at 404" fail-closed case. Updated TestHandler_Flush_404OnDeleteAndUpdate to include end-to-end cases where ErrorType is index_not_found_exception on both delete and update actions — these messages must be nakked for redelivery, not silently acked. Updated pkg/searchengine/adapter_test.go: TestAdapter_Bulk now has a "bulk error types propagate" subtest that verifies document_missing_exception and index_not_found_exception flow into BulkResult.ErrorType correctly. Fail fast on non-positive batch/interval settings (CodeRabbit 🟠 major) ---------------------------------------------------------------------- runConsumer assumes FetchBatchSize, BulkBatchSize, and BulkFlushInterval are all > 0 — otherwise: - FetchBatchSize <= 0 would call Fetch(0) or go negative and hit the remaining<=0 fast path forever (busy loop). - BulkBatchSize <= 0 keeps remaining negative forever (stall). - BulkFlushInterval <= 0 makes the time-based flush check fire on every iteration. Add startup validation in main.go immediately after config parsing so an operator gets a clear slog.Error + os.Exit(1) with the offending setting name and value. Matches CLAUDE.md's "fail fast on bad config" rule. pkg/stream/stream_test.go: convert to testify (CodeRabbit 🟡 minor) ------------------------------------------------------------------ Replaced t.Errorf / t.Fatalf with assert.Equal / require.Len to match the repo-wide "use testify" guideline in CLAUDE.md §4. template.go: guard empty/ignored json names (CodeRabbit 🟢 nit) --------------------------------------------------------------- esPropertiesFromStruct previously would emit a mapping entry under "" or "-" if a future struct had an `es` tag but no usable `json` tag. That would silently corrupt the template. Added a skip guard with a doc comment explaining the fail-closed policy. inbox_integration_test.go: propagate historyShared timestamp (CodeRabbit 🟢 nit) ------------------------------------------------------------------------------- memberFixture used to collapse historyShared into a boolean Restricted flag, dropping the caller's timestamp value. Now carries HistorySharedSince *time.Time verbatim and uses the Restricted bool only as a shortcut for "pick a synthetic timestamp for me." Doc comment spells out the precedence. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
Add a pair of shared helpers in pkg/natsutil for the repeating
"ack/nak a JetStream message and log the error" pattern so every
service in the repo can use the same shape. Convert search-sync-worker's
handler.go to use them — that's the PR's motivating case.
Why
---
The pattern appears 18 times across 7 services today
(message-gatekeeper, broadcast-worker, inbox-worker, search-sync-worker,
room-worker, notification-worker, message-worker), with divergent
spellings:
"failed to ack message" vs "ack failed" vs "ack malformed message"
"failed to nack message" vs "failed to nak message" vs "nak failed"
"error" vs "err" key in the slog call
Consolidating gives us:
1. One place to add tracing spans, metrics counters, or delivery-context
fields later instead of 18.
2. A consistent structured-log shape ("reason" + "error") so operators
can query by cause across services in log aggregation.
3. Less visual noise at the call site — `natsutil.Ack(msg, "filtered")`
reads as intent; `if err := msg.Ack(); err != nil { slog.Error(...) }`
is mechanical boilerplate.
Scope
-----
This commit does ONLY (a) the helper + tests and (b) the
search-sync-worker conversion. The 6 other services that do the same
pattern (13 call sites) are intentionally left alone so this PR stays
focused on spotlight/user-room sync. A small, mechanical follow-up PR
will migrate them and normalize the divergent spellings.
pkg/natsutil/ack.go
-------------------
- `Acker` / `Naker` are minimal interfaces (`{ Ack() error }` /
`{ Nak() error }`). Both `jetstream.Msg` (nats.go) and otel-wrapped
variants (oteljetstream.Msg) satisfy them, so the helpers work for
every consumer in the repo without a wrapper type.
- `Ack(msg, reason)` / `Nak(msg, reason)` try the op and log any
failure with `slog.Error("ack failed", "reason", ..., "error", ...)`.
Fire-and-forget by design — the caller doesn't branch on the result.
- `reason` is a short label describing WHY the message is being acked
or nakked so operators can query logs by cause.
pkg/natsutil/ack_test.go
------------------------
- Covers success, swallowed error, and compile-time interface
satisfaction via a tiny stubMsg test double.
search-sync-worker/handler.go
-----------------------------
Five call sites converted:
- Add() malformed payload → natsutil.Ack(msg, "build action failed")
- Add() filtered event → natsutil.Ack(msg, "filtered, no actions")
- Flush() all-succeeded → natsutil.Ack(p.jsMsg, "bulk actions succeeded")
- Flush() any-failed → natsutil.Nak(p.jsMsg, "bulk action failed")
- nakAll() loop body → natsutil.Nak(p.jsMsg, reason)
nakAll gains a `reason string` parameter so its two Flush call sites
("bulk request failed", "bulk result count mismatch") emit distinct
reasons downstream — one shared helper, two distinct log labels.
https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
Replace MemberAddedPayload (Subscriptions + Room) with InboxMemberEvent
(Accounts + RoomName + event-level HistorySharedSince). Collections now
fan out by account, synthesize spotlight DocID as {account}_{roomID},
and short-circuit the entire bulk on restricted-room events.
Integration tests (inbox_integration_test.go) still reference the old
shape — follow-up commit will migrate them.
https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
Replace memberFixture + MemberAddedPayload-based helpers with
buildInboxMemberEvent / publishInboxMemberEvent. Update DocID
assertions to the synthesized {account}_{roomID} scheme.
Restricted-room behavior now tested as an all-or-nothing event-level
skip (HistorySharedSince != 0), matching the collection logic.
https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 49 minutes and 49 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (5)
📝 WalkthroughWalkthroughAdds INBOX-based member-event indexing: new Changes
Sequence Diagram(s)sequenceDiagram
participant JS as JetStream<br/>INBOX Stream
participant H as Handler
participant C as Collection
participant ES as Elasticsearch
participant A as Ack/Nak
JS->>H: Deliver message(s)
loop Per message
H->>C: BuildAction(payload)
C-->>H: []BulkAction (0..N)
alt zero actions
H->>A: Ack message
else actions produced
H->>H: Append actions to shared buffer<br/>Record pending range
end
end
alt Flush (capacity or timer)
H->>ES: BulkRequest(all buffered actions)
ES-->>H: BulkResult[]
loop For each pending message
H->>H: Evaluate results[actionStart:actionEnd]
alt all actions success
H->>A: Ack message
else
H->>A: Nak message
end
end
end
sequenceDiagram
participant M as Main
participant S as StreamBootstrap
participant Coll as Collection
participant R as runConsumer
participant JS as JetStream
participant H as Handler
participant ES as Elasticsearch
M->>S: Create INBOX stream (optional bootstrap)
S-->>M: Stream with Sources/Transforms
M->>Coll: Upsert ES templates
M->>JS: Create Consumers (per-collection, optional FilterSubjects)
M->>R: Start runConsumer goroutines
loop runConsumer
R->>JS: Fetch (clamped by remaining bulk capacity)
JS-->>R: Messages
loop Per message
R->>H: Add(message)
alt capacity reached mid-batch
R->>H: Flush()
end
end
alt timer triggers flush and actions present
R->>H: Flush()
end
end
Estimated code review effort🎯 5 (Critical) | ⏱️ ~120 minutes Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (7)
pkg/subject/subject_test.go (1)
80-95: Use testify for the new slice assertion.This subtest can be reduced to a single
assert.Equaland avoid manual length/index checks.♻️ Proposed test cleanup
import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/hmchangw/chat/pkg/subject" ) @@ t.Run("InboxMemberEventSubjects", func(t *testing.T) { got := subject.InboxMemberEventSubjects("site-a") want := []string{ "chat.inbox.site-a.member_added", "chat.inbox.site-a.member_removed", "chat.inbox.site-a.aggregate.member_added", "chat.inbox.site-a.aggregate.member_removed", } - if len(got) != len(want) { - t.Fatalf("got %d subjects, want %d", len(got), len(want)) - } - for i := range want { - if got[i] != want[i] { - t.Errorf("[%d] = %q, want %q", i, got[i], want[i]) - } - } + assert.Equal(t, want, got) })As per coding guidelines,
**/*_test.go: “Use standardtestingpackage withgithub.com/stretchr/testify/assertandtestify/requirefor assertions.”🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/subject/subject_test.go` around lines 80 - 95, Replace the manual length and element-by-element checks in the subtest for InboxMemberEventSubjects with a single testify assertion: import github.com/stretchr/testify/assert (or require) in subject_test.go, then call assert.Equal(t, want, got) (or require.Equal) inside the t.Run for InboxMemberEventSubjects to compare the expected slice with the result from subject.InboxMemberEventSubjects("site-a"); remove the len/got/index loops and keep the test name and the want/got variables intact.pkg/stream/stream.go (1)
74-77: Move the new INBOX wildcard subjects intopkg/subject.These are NATS subject patterns, so adding subject-builder helpers keeps stream configs and consumer filters from diverging.
♻️ Proposed shape
return Config{ Name: fmt.Sprintf("INBOX_%s", siteID), Subjects: []string{ - fmt.Sprintf("chat.inbox.%s.*", siteID), - fmt.Sprintf("chat.inbox.%s.aggregate.>", siteID), + subject.InboxWildcard(siteID), + subject.InboxAggregateWildcard(siteID), }, }Add the corresponding helpers in
pkg/subjectand cover them inpkg/subject/subject_test.go.As per coding guidelines,
**/*.go: “Use dot-delimited hierarchical NATS subjects viapkg/subjectbuilders, never rawfmt.Sprintf; use wildcards*for single-token,>for multi-token tail.”🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/stream/stream.go` around lines 74 - 77, Replace the raw fmt.Sprintf NATS subject patterns in the Subjects slice in stream.go with calls to new builder helpers in pkg/subject (e.g., SubjectInbox(siteID) and SubjectInboxAggregatePrefix(siteID) or appropriately named functions) so the stream config uses dot-delimited builders and correct wildcards; add those helper functions to pkg/subject to return "chat.inbox.<siteID>.*" and "chat.inbox.<siteID>.aggregate.>" (using the package's subject composition utilities) and add unit tests in pkg/subject/subject_test.go covering both helpers.pkg/searchengine/adapter_test.go (1)
107-109: SetVersionhere so the test proves it is ignored.The assertion at Line 123 is stronger if the input action carries a non-zero version; otherwise the test does not catch a regression that only leaks
versionwhen callers set it.🧪 Proposed test tweak
results, err := a.Bulk(context.Background(), []BulkAction{ - {Action: ActionUpdate, Index: "user-room-site1", DocID: "alice", Doc: updateBody}, + {Action: ActionUpdate, Index: "user-room-site1", DocID: "alice", Version: 123, Doc: updateBody}, })🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/searchengine/adapter_test.go` around lines 107 - 109, The test calls a.Bulk(...) with a BulkAction but doesn't set Version, so it can't verify that versions from callers are ignored; update the BulkAction passed to a.Bulk in the adapter_test.go test (the call to a.Bulk with ActionUpdate/Index "user-room-site1"/DocID "alice"/Doc updateBody) to include a non-zero Version (e.g. Version: 1) so the test proves any incoming version is ignored by the implementation.pkg/natsutil/ack_test.go (1)
37-55: Assert the logged error path, not just the method call.These tests are named
ErrorIsLoggedNotReturned, but they would still pass ifAck/Nakstopped logging failures. Captureslogoutput and assert the event/reason/error fields so the exported helper contract is actually covered. As per coding guidelines, tests must cover error paths.🧪 Proposed test hardening
import ( + "bytes" "errors" + "log/slog" "testing" @@ "github.com/hmchangw/chat/pkg/natsutil" ) + +func captureSlog(t *testing.T) *bytes.Buffer { + t.Helper() + var buf bytes.Buffer + old := slog.Default() + slog.SetDefault(slog.New(slog.NewJSONHandler(&buf, nil))) + t.Cleanup(func() { + slog.SetDefault(old) + }) + return &buf +} @@ func TestAck_ErrorIsLoggedNotReturned(t *testing.T) { @@ + logs := captureSlog(t) msg := &stubMsg{ackErr: errors.New("connection closed")} natsutil.Ack(msg, "filtered") assert.True(t, msg.ackCalled) + assert.Contains(t, logs.String(), "ack failed") + assert.Contains(t, logs.String(), "filtered") + assert.Contains(t, logs.String(), "connection closed") } @@ func TestNak_ErrorIsLoggedNotReturned(t *testing.T) { + logs := captureSlog(t) msg := &stubMsg{nakErr: errors.New("consumer deleted")} natsutil.Nak(msg, "bulk failure") assert.True(t, msg.nakCalled) + assert.Contains(t, logs.String(), "nak failed") + assert.Contains(t, logs.String(), "bulk failure") + assert.Contains(t, logs.String(), "consumer deleted") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@pkg/natsutil/ack_test.go` around lines 37 - 55, Update the two tests TestAck_ErrorIsLoggedNotReturned and TestNak_ErrorIsLoggedNotReturned to assert the logged error instead of only checking ack/nak invocation: install a temporary slog logger/handler that captures records, call natsutil.Ack(msg, "filtered") / natsutil.Nak(msg, "bulk failure") with the stubMsg that has ackErr/nakErr set, then assert the captured log contains an error-level record with expected fields (e.g., event/reason/error or similar message text) indicating the ack/nak failure; finally restore the original logger. Ensure you reference natsutil.Ack, natsutil.Nak and stubMsg in the test changes so the error path is exercised and verified.search-sync-worker/user_room.go (2)
51-71: Operational note:roomTimestampsgrows unbounded across repeat join/leave cycles.
removeRoomScriptcallsctx._source.roomTimestamps.put(params.rid, params.ts)to serve the LWW guard, but never deletes prior entries — andaddRoomScriptonly overwrites. For a user who has churned through many rooms over years (departmental rotations, short-lived project rooms, DMs with many people), the map grows monotonically. Becauseflattenedmapping means every scripted update still rewrites the full_source, write amplification also grows with the map size, and anymatch_allscan pulls a larger source body.A couple of mitigations if this becomes material:
- Add a TTL/compaction step: in
addRoomScriptandremoveRoomScript, drop entries older thanparams.ts - retentionMs.- Periodically re-issue a clean upsert that keeps only timestamps for rooms still in
ctx._source.rooms.Not a blocker for this PR — the LWW semantics are correct — but worth tracking as a follow-up before heavy users accumulate meaningful history.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/user_room.go` around lines 51 - 71, The roomTimestamps map grows unbounded because removeRoomScript and addRoomScript only overwrite or insert entries; update both scripts (addRoomScript and removeRoomScript) to accept a retentionMs param and, before writing, iterate the keys of ctx._source.roomTimestamps and remove entries whose stored timestamp < params.ts - params.retentionMs to compact old history; also add a periodic cleanup upsert (server-side job) that reads ctx._source.rooms and rewrites roomTimestamps keeping only entries for rooms still present (or within retention window) to avoid long-lived growth and excessive source rewrite amplification.
146-171:nowparameter is derived from the event timestamp, not wall-clock — confirm intent.
now := time.UnixMilli(ts).UTC().Format(time.RFC3339Nano)reuses the event timestamp for both the upsert'screatedAt/updatedAtand the script'sparams.now. That's reasonable — it makes replays deterministic — but the variable name reads as wall-clock. If intentional (it matches the fix-for-replay property LWW already relies on), consider renaming toeventTimeand dropping a short comment to avoid future "fix me to use real now" PRs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/user_room.go` around lines 146 - 171, The variable now in buildAddRoomUpdateBody is derived from the event timestamp (ts) but named like a wall-clock time; rename now to eventTime (or eventTimeStr) and update its usages in the params map and the upsert's CreatedAt/UpdatedAt to eventTime to make intent explicit, and add a brief comment above the assignment stating that this uses the event timestamp for deterministic replays/LWW semantics rather than current wall-clock time.search-sync-worker/inbox_stream.go (1)
87-99:parseMemberEventcorrectly centralizes the envelope validation.Validating
evt.Timestamp > 0before payload unmarshal gives a consistent error surface across spotlight and user-room, and matches the "timestamp is the external version / LWW key" invariant for both collections. One note: a literal"payload": nullbody unmarshals into a zeroInboxMemberEventwithout error, and downstream (RoomID == "", emptyAccounts) catches that — worth a one-line comment here so the next reader doesn't add a redundant nil-payload check.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/inbox_stream.go` around lines 87 - 99, parseMemberEvent should keep the existing timestamp validation and payload unmarshal but add a brief one-line comment explaining that a literal `"payload": null` will unmarshal into a zero-value InboxMemberEvent (so downstream checks like payload.RoomID == "" or empty Accounts will catch invalid payloads), to prevent future reviewers from adding redundant nil-payload checks; update the parseMemberEvent function comment/body to mention this behavior and why no extra nil check is performed after json.Unmarshal of evt.Payload into model.InboxMemberEvent.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@docs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.md`:
- Around line 264-266: The fenced code blocks in the markdown lack a language
specifier which triggers markdownlint; update the fences around the commands
(the blocks containing "make test-integration SERVICE=search-sync-worker" and
the subsequent git add/commit/push block) to include "shell" (i.e., change ```
to ```shell) so the code fences are properly marked; search for those exact
command blocks in the document (file:
docs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.md) and update both
occurrences noted (around the make test-integration block and the git
add/commit/push block).
- Around line 110-113: Update the Phase 2 status block under the "### Files
remaining in Phase 2 (not yet committed)" heading: either mark the
`search-sync-worker/inbox_integration_test.go` migration as completed (remove
the "not yet committed" wording and the TODO list entry referencing
MemberAddedPayload) or move that remaining-work list into a clearly labeled
"Historical note" section; ensure references to MemberAddedPayload and the
integration test migration are removed from active TODOs so the document matches
the PR scope.
In `@pkg/model/event.go`:
- Around line 52-61: The InboxMemberEvent struct is missing the BSON tag on the
Timestamp field; update the InboxMemberEvent definition so the Timestamp field
reads with both json and bson tags (i.e., add bson:"timestamp" to the Timestamp
int64 field) by editing the Timestamp line in the InboxMemberEvent struct in
pkg/model/event.go.
In `@pkg/searchengine/adapter.go`:
- Around line 75-86: The comment on the ActionUpdate branch is factually wrong
about Elasticsearch/OpenSearch support for version/version_type on bulk update;
change it to state that the bulk "update" action does support versioning in ES
8.x/OpenSearch 2.x but we intentionally omit version/version_type here because
the update uses scripted/idempotent upserts (see the Painless guards like
roomTimestamps) and adding external versioning is unnecessary and could be
confusing; update the comment near ActionUpdate / bulkActionMeta (updateMeta) to
clearly document this rationale and mention that omission is deliberate.
In `@search-sync-worker/handler.go`:
- Around line 160-183: The current isBulkItemSuccess function treats any 409 as
success; change it so 409 is considered success only for external-versioned
write actions (ActionIndex and ActionDelete) and NOT for ActionUpdate: inside
isBulkItemSuccess (and referring to action and result), check result.Status ==
409 and then return true only when action == searchengine.ActionIndex || action
== searchengine.ActionDelete (leave 2xx handling unchanged); for ActionUpdate on
409 return false so the caller will NAK and allow redelivery of scripted
updates.
In `@search-sync-worker/inbox_integration_test.go`:
- Line 138: The test function names violate the Test<Type>_<Scenario> convention
by missing the underscore before "Integration"; rename
TestSpotlightSyncIntegration to TestSpotlightSync_Integration (and the other
test at the same location noted in the comment) so they follow
TestSpotlightSync_Integration (or Test<Type>_<Method>_<Scenario> as
appropriate), and update any references/imports or test invocations that refer
to those symbols to the new names.
In `@search-sync-worker/spotlight.go`:
- Around line 161-162: The json.Marshal call that assigns data, _ :=
json.Marshal(tmpl) is silently discarding the error; update the code around the
json.Marshal(tmpl) usage to either (a) handle the error by capturing err :=
json.Marshal(tmpl) and returning/logging it (or failing the caller) where
appropriate, or (b) if you truly guarantee tmpl can never error (only JSON-safe
literals), replace the discard with a clear one-line comment immediately above
that explains why the error is intentionally ignored (referencing tmpl and the
json.Marshal call) so the intent is documented for reviewers and static checks.
- Around line 147-151: The whitespace tokenizer config under the
"custom_tokenizer" map is using an unsupported "token_chars" setting which
causes UpsertTemplate (and index creation for the spotlight index) to fail;
remove the "token_chars" entry from the custom_tokenizer map in spotlight.go
and, if needed, replace it with a valid option such as "max_token_length" for
the whitespace tokenizer so the UpsertTemplate JSON only contains valid fields
for the "whitespace" tokenizer.
---
Nitpick comments:
In `@pkg/natsutil/ack_test.go`:
- Around line 37-55: Update the two tests TestAck_ErrorIsLoggedNotReturned and
TestNak_ErrorIsLoggedNotReturned to assert the logged error instead of only
checking ack/nak invocation: install a temporary slog logger/handler that
captures records, call natsutil.Ack(msg, "filtered") / natsutil.Nak(msg, "bulk
failure") with the stubMsg that has ackErr/nakErr set, then assert the captured
log contains an error-level record with expected fields (e.g.,
event/reason/error or similar message text) indicating the ack/nak failure;
finally restore the original logger. Ensure you reference natsutil.Ack,
natsutil.Nak and stubMsg in the test changes so the error path is exercised and
verified.
In `@pkg/searchengine/adapter_test.go`:
- Around line 107-109: The test calls a.Bulk(...) with a BulkAction but doesn't
set Version, so it can't verify that versions from callers are ignored; update
the BulkAction passed to a.Bulk in the adapter_test.go test (the call to a.Bulk
with ActionUpdate/Index "user-room-site1"/DocID "alice"/Doc updateBody) to
include a non-zero Version (e.g. Version: 1) so the test proves any incoming
version is ignored by the implementation.
In `@pkg/stream/stream.go`:
- Around line 74-77: Replace the raw fmt.Sprintf NATS subject patterns in the
Subjects slice in stream.go with calls to new builder helpers in pkg/subject
(e.g., SubjectInbox(siteID) and SubjectInboxAggregatePrefix(siteID) or
appropriately named functions) so the stream config uses dot-delimited builders
and correct wildcards; add those helper functions to pkg/subject to return
"chat.inbox.<siteID>.*" and "chat.inbox.<siteID>.aggregate.>" (using the
package's subject composition utilities) and add unit tests in
pkg/subject/subject_test.go covering both helpers.
In `@pkg/subject/subject_test.go`:
- Around line 80-95: Replace the manual length and element-by-element checks in
the subtest for InboxMemberEventSubjects with a single testify assertion: import
github.com/stretchr/testify/assert (or require) in subject_test.go, then call
assert.Equal(t, want, got) (or require.Equal) inside the t.Run for
InboxMemberEventSubjects to compare the expected slice with the result from
subject.InboxMemberEventSubjects("site-a"); remove the len/got/index loops and
keep the test name and the want/got variables intact.
In `@search-sync-worker/inbox_stream.go`:
- Around line 87-99: parseMemberEvent should keep the existing timestamp
validation and payload unmarshal but add a brief one-line comment explaining
that a literal `"payload": null` will unmarshal into a zero-value
InboxMemberEvent (so downstream checks like payload.RoomID == "" or empty
Accounts will catch invalid payloads), to prevent future reviewers from adding
redundant nil-payload checks; update the parseMemberEvent function comment/body
to mention this behavior and why no extra nil check is performed after
json.Unmarshal of evt.Payload into model.InboxMemberEvent.
In `@search-sync-worker/user_room.go`:
- Around line 51-71: The roomTimestamps map grows unbounded because
removeRoomScript and addRoomScript only overwrite or insert entries; update both
scripts (addRoomScript and removeRoomScript) to accept a retentionMs param and,
before writing, iterate the keys of ctx._source.roomTimestamps and remove
entries whose stored timestamp < params.ts - params.retentionMs to compact old
history; also add a periodic cleanup upsert (server-side job) that reads
ctx._source.rooms and rewrites roomTimestamps keeping only entries for rooms
still present (or within retention window) to avoid long-lived growth and
excessive source rewrite amplification.
- Around line 146-171: The variable now in buildAddRoomUpdateBody is derived
from the event timestamp (ts) but named like a wall-clock time; rename now to
eventTime (or eventTimeStr) and update its usages in the params map and the
upsert's CreatedAt/UpdatedAt to eventTime to make intent explicit, and add a
brief comment above the assignment stating that this uses the event timestamp
for deterministic replays/LWW semantics rather than current wall-clock time.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 463a1929-a5b1-43d6-aaa7-8b35f201b7e5
📒 Files selected for processing (27)
docs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.mdpkg/model/event.gopkg/model/model_test.gopkg/natsutil/ack.gopkg/natsutil/ack_test.gopkg/searchengine/adapter.gopkg/searchengine/adapter_test.gopkg/searchengine/searchengine.gopkg/stream/stream.gopkg/stream/stream_test.gopkg/subject/subject.gopkg/subject/subject_test.gosearch-sync-worker/collection.gosearch-sync-worker/handler.gosearch-sync-worker/handler_test.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/inbox_stream.gosearch-sync-worker/inbox_stream_test.gosearch-sync-worker/integration_test.gosearch-sync-worker/main.gosearch-sync-worker/messages.gosearch-sync-worker/messages_test.gosearch-sync-worker/spotlight.gosearch-sync-worker/spotlight_test.gosearch-sync-worker/template.gosearch-sync-worker/user_room.gosearch-sync-worker/user_room_test.go
…drop invalid token_chars - handler.go: NAK 409 for ActionUpdate (internal version_conflict from concurrent writers means the painless script didn't run; ack would silently drop the update). 409 stays ack-on-success for externally- versioned ActionIndex/ActionDelete. - spotlight.go: drop token_chars from whitespace tokenizer — only valid on ngram/edge_ngram. Sending it would reject the UpsertTemplate call. - pkg/model/event.go: add bson:"timestamp" tag on InboxMemberEvent.Timestamp per the repo-wide "every NATS event struct must have both json + bson tags" rule. - spotlight.go: document intentional json.Marshal error discard. - adapter.go: correct comment — ES _update DOES accept version + version_type=external; we omit them because the painless LWW guard already handles ordering. - inbox_integration_test.go: rename TestSpotlight/UserRoomSyncIntegration to Test<Type>_Integration to follow Test<Type>_<Scenario> convention. - plan doc: mark Phase 2 complete (integration tests landed in c7a303b); add `shell` language hint on command fences. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
099f2fd to
67636fb
Compare
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (2)
search-sync-worker/spotlight.go (1)
50-63: Optional: short-circuitHistorySharedSincebefore validatingAccounts.If a restricted-room event ever arrives with an empty
Accountsslice, it currently returns an"empty accounts"error (which the handler converts to an ack-and-drop athandler.go:70) instead of being silently skipped like every other restricted event. Swapping the two checks makes restricted events uniformly skipped regardless of payload shape and matches the documented event-level contract inpkg/model/event.go:45-50.user_room.gohas the same ordering, so apply symmetrically if you change this.♻️ Proposed nit
- if len(payload.Accounts) == 0 { - return nil, fmt.Errorf("build spotlight action: empty accounts") - } if payload.HistorySharedSince != 0 { return nil, nil } + if len(payload.Accounts) == 0 { + return nil, fmt.Errorf("build spotlight action: empty accounts") + }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/spotlight.go` around lines 50 - 63, The BuildAction in spotlightCollection currently validates payload.Accounts before checking payload.HistorySharedSince, which causes restricted-room events with empty Accounts to error instead of be skipped; move the HistorySharedSince check in spotlightCollection.BuildAction so the "if payload.HistorySharedSince != 0 { return nil, nil }" short-circuits before validating Accounts. Also apply the same ordering change to the analogous handler in user_room.go to keep behavior consistent.search-sync-worker/inbox_integration_test.go (1)
92-116: Optional: surfacebatch.Error()to improve diagnostics on flaky fetches.If the JetStream server returns an error mid-batch (e.g., consumer deleted, leadership change, or transient connection issue),
batch.Messages()closes without signaling the error — the outerrequire.Equal(expected, received, ...)will still fail the test, but the actual fetch error is lost. Add a check after the inner loop to surface the error:♻️ Proposed improvement
batch, err := cons.Fetch(expected-received, jetstream.FetchMaxWait(5*time.Second)) require.NoError(t, err) for msg := range batch.Messages() { handler.Add(msg) received++ } + require.NoError(t, batch.Error(), "batch error after draining") }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@search-sync-worker/inbox_integration_test.go` around lines 92 - 116, The drainConsumer helper currently ignores errors returned by the jetstream.Batch (created via cons.Fetch), which loses diagnostic context when batch.Messages() closes early; after the inner for msg := range batch.Messages() loop in drainConsumer, call batch.Error() and assert or fail with that error (e.g., require.NoError(t, batch.Error(), "fetch attempt %d: received %d of %d", attempts, received, expected)) so any fetch-level errors from cons.Fetch are surfaced in the test logs; keep existing behavior of handler.Add and handler.Flush.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In @.github/workflows/ci.yml:
- Line 26: Update the pinned CI Go toolchain from GO_VERSION "1.25.8" to the
patched "1.25.9" to pick up the April 7, 2026 security fixes; locate the
GO_VERSION variable in the GitHub Actions workflow (the GO_VERSION entry in the
.github/workflows/ci.yml) and change its value to "1.25.9", then run the
workflow to confirm builds pass with the updated toolchain.
- Around line 35-40: Workflow uses Node20-backed action versions; update
actions/checkout@v4 → actions/checkout@v6 and actions/setup-go@v5 →
actions/setup-go@v6 (and verify golangci/golangci-lint-action is on a
Node24-ready release if needed) in the CI workflow where those actions are
referenced (the lint, test, and test-integration jobs) so the runners use
Node24-backed action releases and avoid deprecation warnings; make the same
substitutions wherever the action versions appear in .github/workflows/ci.yml.
---
Nitpick comments:
In `@search-sync-worker/inbox_integration_test.go`:
- Around line 92-116: The drainConsumer helper currently ignores errors returned
by the jetstream.Batch (created via cons.Fetch), which loses diagnostic context
when batch.Messages() closes early; after the inner for msg := range
batch.Messages() loop in drainConsumer, call batch.Error() and assert or fail
with that error (e.g., require.NoError(t, batch.Error(), "fetch attempt %d:
received %d of %d", attempts, received, expected)) so any fetch-level errors
from cons.Fetch are surfaced in the test logs; keep existing behavior of
handler.Add and handler.Flush.
In `@search-sync-worker/spotlight.go`:
- Around line 50-63: The BuildAction in spotlightCollection currently validates
payload.Accounts before checking payload.HistorySharedSince, which causes
restricted-room events with empty Accounts to error instead of be skipped; move
the HistorySharedSince check in spotlightCollection.BuildAction so the "if
payload.HistorySharedSince != 0 { return nil, nil }" short-circuits before
validating Accounts. Also apply the same ordering change to the analogous
handler in user_room.go to keep behavior consistent.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 5da8b76b-6782-41de-ae8c-9575d94f671a
⛔ Files ignored due to path filters (1)
go.sumis excluded by!**/*.sum
📒 Files selected for processing (10)
.github/workflows/ci.yml.golangci.ymldocs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.mdgo.modpkg/model/event.gopkg/searchengine/adapter.gosearch-sync-worker/handler.gosearch-sync-worker/handler_test.gosearch-sync-worker/inbox_integration_test.gosearch-sync-worker/spotlight.go
💤 Files with no reviewable changes (1)
- .golangci.yml
✅ Files skipped from review due to trivial changes (2)
- go.mod
- docs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.md
🚧 Files skipped from review as they are similar to previous changes (2)
- pkg/searchengine/adapter.go
- pkg/model/event.go
Three parallel jobs on push to main and every PR: - lint: golangci-lint via golangci-lint-action@v9 (v7+ required for our v2 .golangci.yml config; v6 only understands v1 configs) - test: unit tests with race detector via make test - test-integration: search-sync-worker integration tests via make test-integration SERVICE=search-sync-worker (Docker is pre-installed on ubuntu-latest runners, so testcontainers-go can start ES + NATS) Scoped integration to search-sync-worker only — other services have their own azure-pipelines.yml and running all -tags=integration tests in one job would be slow. Expand to a per-service matrix later if needed. Also drops goimports from .golangci.yml linters.settings — v2.11+ rejects it (additional properties 'goimports' not allowed) because goimports was reclassified as a formatter in v2. The duplicate block under formatters.settings (unchanged) keeps it active with the same local-prefixes. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
…ain helper - spotlight.go / user_room.go: move the HistorySharedSince short-circuit ahead of the Accounts validation. Previously a restricted-room event with an empty Accounts slice returned an "empty accounts" error instead of being silently skipped like every other restricted event. Matches the event-level contract documented on InboxMemberEvent. - inbox_integration_test.go: surface batch.Error() after draining each fetch so mid-batch server errors (consumer deleted, leader change) fail the test with their real cause instead of a misleading "drained N of M" mismatch. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
67636fb to
7754ae4
Compare
…x-worker Subjects (#111) Wires the publisher side of the INBOX-based search-sync pipeline landed in #109. After this, search-sync-worker's spotlight + user-room collections actually receive events end-to-end instead of spinning up with an empty feed. room-worker changes (handler.go) -------------------------------- processAddMembers, processRemoveIndividual, processRemoveOrg now split their member list by home site and publish InboxMemberEvent: - Same-site accounts → one publish per call to `chat.inbox.{site}.member_added/removed` - Cross-site accounts → existing OUTBOX publish per destination, inner payload migrated MemberAddEvent/MemberRemoveEvent → InboxMemberEvent so the remote site's search-sync sees RoomName + RoomType (needed for spotlight typeahead indexing). InboxMemberEvent is a strict JSON superset of MemberAddEvent on the fields inbox-worker reads (Accounts, RoomID, SiteID, JoinedAt, HistorySharedSince), so inbox-worker's handleMemberAdded / handleMemberRemoved continue to unmarshal into their existing types and simply ignore the extra RoomName/RoomType fields. OrgID on MemberRemoveEvent is dropped from the INBOX payload — not currently read by any consumer, and InboxMemberEvent stays focused on what search-sync needs. If inbox-worker ever needs OrgID, add it then. Remove events omit RoomName/RoomType entirely (search-sync keys its deletes by {account}_{roomID} and script-removes by roomID; neither needs room metadata) so we skip the extra DB lookup on the remove path. inbox-worker changes (main.go) ------------------------------ One-line fix: `CreateOrUpdateStream` was copying only `.Name` from `stream.Inbox()` and dropping `.Subjects`. The INBOX stream was being created with zero subjects, so any publish to `chat.inbox.{site}.*` would have failed once room-worker started publishing. Now passes both. The remote-site Sources + SubjectTransforms federation config is intentionally left out — production multi-site isn't deployed yet, and when it is we want inbox-worker (not search-sync-worker's bootstrap path) to own that config. Tests ----- - TestHandler_ProcessAddMembers_PublishesToInbox — new, verifies local INBOX publish carries RoomName/RoomType + cross-site OUTBOX payload is InboxMemberEvent. - TestHandler_ProcessRemoveMember_SelfLeave_IndividualOnly + _OwnerRemovesIndividual + _OwnerRemovesOrg — bump expected publish count +1 and assert the local INBOX publish carries the right Accounts. - TestHandler_ProcessRemoveMember_CrossSiteOutbox — assert OUTBOX payload is InboxMemberEvent (not MemberRemoveEvent) and confirm NO local INBOX publish fires when the removed user is remote. Follows PR #109. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN Co-authored-by: Claude <noreply@anthropic.com>
Summary
Adds two
Collectionimplementations tosearch-sync-workerthat consumemember_added/member_removedevents from theINBOXstream and maintain the spotlight (room typeahead) and user-room (message-search access control) Elasticsearch indexes.Supersedes #78 — that PR had drifted to a ROOMS-stream architecture which duplicated the existing OUTBOX/INBOX federation pipeline. This PR returns to the INBOX-based design: one federation pipe per site (OUTBOX→INBOX via
Sources + SubjectTransforms), many local consumers off INBOX.See
docs/superpowers/plans/2026-04-20-search-sync-inbox-recovery.mdfor the full architectural rationale and task breakdown.What's in this PR
New payload type (
pkg/model)InboxMemberEvent— single payload type formember_added/member_removedon INBOX. Fields:RoomID, RoomName, RoomType, SiteID, Accounts []string, HistorySharedSince int64, JoinedAt int64, Timestamp int64.HistorySharedSince— when non-zero, the entire bulk is skipped (search service handles restricted rooms via DB+cache at query time).New collections (
search-sync-worker)spotlightCollection(spotlight.go) — per-(user, room) typeahead docs. DocID synthesized as{account}_{roomID}(new payload carries no subscription IDs).member_added→ActionIndexwith external versioning;member_removed→ActionDelete.userRoomCollection(user_room.go) — per-userrooms[]array for message-search access control. Painless scripts with per-room LWW guard viaflattenedroomTimestampsmap.inboxMemberCollectionwhich centralizes the INBOX stream config + subject filters (local + federated aggregate).Shared helper
parseMemberEventininbox_stream.go— unmarshals OutboxEvent + InboxMemberEvent, validates preconditions. Callers apply event-level HSS short-circuit.pkg/stream+pkg/subject(unchanged from recovery)stream.Inbox(siteID)returnsINBOX_{siteID}with two non-overlapping subject patterns:chat.inbox.{site}.*(local direct publishes) +chat.inbox.{site}.aggregate.>(federated events sourced from remote OUTBOX via SubjectTransform).subject.InboxMemberAdded/InboxMemberRemoved/InboxMemberAddedAggregate/InboxMemberRemovedAggregate/InboxMemberEventSubjectsbuilders.Bootstrap (dev/test only)
BOOTSTRAP_STREAMStogglesCreateOrUpdateStreamat startup;BOOTSTRAP_REMOTE_SITE_IDSdrives cross-site Sources + SubjectTransforms ininboxBootstrapStreamConfig. In production,inbox-workerwill own INBOX stream creation (follow-up PR).Scope notes
room-workeris intentionally NOT modified here. The publisher-side migration (publishingInboxMemberEventto localchat.inbox.{site}.member_added/removedfor same-site members, to OUTBOX for cross-site) is a separate follow-up PR coordinated withinbox-worker. Integration tests hand-publish to INBOX to prove the collections work end-to-end independent of the publisher change.inbox-workeris intentionally NOT modified here. It will own INBOX stream creation with production Sources config in a follow-up.Test plan
make lint— 0 issuesmake test— all services greengo vet -tags=integration ./search-sync-worker/...— cleanmake test-integration SERVICE=search-sync-worker— requires Docker for testcontainers-go; needs CI runIntegration test coverage
TestSpotlightSyncIntegration— local + federatedmember_added, federatedmember_removed, doc shape verification with synthesized{account}_{roomID}DocIDsTestSpotlightSync_BulkInvite— fan-out: 1 JetStream message × 3 accounts = 3 ES actions; bulk remove evicts allTestUserRoomSyncIntegration— multi-room joins, federated upsert for new user,roomTimestampsretention after remove, all-or-nothing restricted-room skip,createdAt/updatedAtstampingTestUserRoomSync_BulkInvite— unrestricted bulk upsert + all-restricted event no-op (event-level HSS)TestUserRoomSync_LWWGuard— 6-step stateful sequence: initial add → stale add no-op → stale remove no-op → newer remove evicts → re-add restores → another stale add no-opKnown sharp edges
ActionDeleteon a non-existent doc returns 404, which the handler currently treats as failure → nak/retry. Only triggerable by a multi-publisher race that doesn't exist in our topology; worth a follow-up handler fix to treat 404-on-delete as success.user-room-syncwith multiple pods: safe via the LWW guard for member-event volume. Documented inuser_room.godoc comment.https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN
Summary by CodeRabbit
New Features
Infrastructure
Documentation
Tests
CI