Skip to content

Commit 711cad4

Browse files
Joey0538claude
andauthored
feat(room-worker): publish InboxMemberEvent to local INBOX + fix inbox-worker Subjects (#111)
Wires the publisher side of the INBOX-based search-sync pipeline landed in #109. After this, search-sync-worker's spotlight + user-room collections actually receive events end-to-end instead of spinning up with an empty feed. room-worker changes (handler.go) -------------------------------- processAddMembers, processRemoveIndividual, processRemoveOrg now split their member list by home site and publish InboxMemberEvent: - Same-site accounts → one publish per call to `chat.inbox.{site}.member_added/removed` - Cross-site accounts → existing OUTBOX publish per destination, inner payload migrated MemberAddEvent/MemberRemoveEvent → InboxMemberEvent so the remote site's search-sync sees RoomName + RoomType (needed for spotlight typeahead indexing). InboxMemberEvent is a strict JSON superset of MemberAddEvent on the fields inbox-worker reads (Accounts, RoomID, SiteID, JoinedAt, HistorySharedSince), so inbox-worker's handleMemberAdded / handleMemberRemoved continue to unmarshal into their existing types and simply ignore the extra RoomName/RoomType fields. OrgID on MemberRemoveEvent is dropped from the INBOX payload — not currently read by any consumer, and InboxMemberEvent stays focused on what search-sync needs. If inbox-worker ever needs OrgID, add it then. Remove events omit RoomName/RoomType entirely (search-sync keys its deletes by {account}_{roomID} and script-removes by roomID; neither needs room metadata) so we skip the extra DB lookup on the remove path. inbox-worker changes (main.go) ------------------------------ One-line fix: `CreateOrUpdateStream` was copying only `.Name` from `stream.Inbox()` and dropping `.Subjects`. The INBOX stream was being created with zero subjects, so any publish to `chat.inbox.{site}.*` would have failed once room-worker started publishing. Now passes both. The remote-site Sources + SubjectTransforms federation config is intentionally left out — production multi-site isn't deployed yet, and when it is we want inbox-worker (not search-sync-worker's bootstrap path) to own that config. Tests ----- - TestHandler_ProcessAddMembers_PublishesToInbox — new, verifies local INBOX publish carries RoomName/RoomType + cross-site OUTBOX payload is InboxMemberEvent. - TestHandler_ProcessRemoveMember_SelfLeave_IndividualOnly + _OwnerRemovesIndividual + _OwnerRemovesOrg — bump expected publish count +1 and assert the local INBOX publish carries the right Accounts. - TestHandler_ProcessRemoveMember_CrossSiteOutbox — assert OUTBOX payload is InboxMemberEvent (not MemberRemoveEvent) and confirm NO local INBOX publish fires when the removed user is remote. Follows PR #109. https://claude.ai/code/session_01XTmSpmv5dT6UXX7NpRdYqN Co-authored-by: Claude <noreply@anthropic.com>
1 parent 7754ae4 commit 711cad4

3 files changed

Lines changed: 257 additions & 36 deletions

File tree

inbox-worker/main.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -147,7 +147,8 @@ func main() {
147147

148148
inboxCfg := stream.Inbox(cfg.SiteID)
149149
if _, err = js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
150-
Name: inboxCfg.Name,
150+
Name: inboxCfg.Name,
151+
Subjects: inboxCfg.Subjects,
151152
}); err != nil {
152153
slog.Error("create inbox stream failed", "error", err)
153154
os.Exit(1)

room-worker/handler.go

Lines changed: 108 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -312,17 +312,37 @@ func (h *Handler) processRemoveIndividual(ctx context.Context, req *model.Remove
312312
slog.Error("system message publish failed", "error", err, "roomID", req.RoomID)
313313
}
314314

315-
// Cross-site outbox for federated users
316-
if user.SiteID != h.siteID {
315+
// Publish InboxMemberEvent for downstream search-sync indexing. Goes to
316+
// local INBOX when the removed user is on this site, cross-site OUTBOX
317+
// when federated. Remove events don't need RoomName/RoomType (search-
318+
// sync keys deletes by {account}_{roomID}), so they're omitted to avoid
319+
// an extra DB lookup.
320+
inboxEvt := model.InboxMemberEvent{
321+
RoomID: req.RoomID,
322+
SiteID: h.siteID,
323+
Accounts: []string{req.Account},
324+
Timestamp: now.UnixMilli(),
325+
}
326+
if user.SiteID == h.siteID {
317327
outbox := model.OutboxEvent{
318-
Type: "member_removed",
328+
Type: model.OutboxMemberRemoved,
329+
SiteID: h.siteID,
330+
DestSiteID: h.siteID,
331+
Payload: mustMarshal(inboxEvt),
332+
Timestamp: now.UnixMilli(),
333+
}
334+
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), mustMarshal(outbox)); err != nil {
335+
slog.Error("inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
336+
}
337+
} else {
338+
outbox := model.OutboxEvent{
339+
Type: model.OutboxMemberRemoved,
319340
SiteID: h.siteID,
320341
DestSiteID: user.SiteID,
321-
Payload: memberEvtData,
342+
Payload: mustMarshal(inboxEvt),
322343
Timestamp: now.UnixMilli(),
323344
}
324-
outboxData, _ := json.Marshal(outbox)
325-
if err := h.publish(ctx, subject.Outbox(h.siteID, user.SiteID, "member_removed"), outboxData); err != nil {
345+
if err := h.publish(ctx, subject.Outbox(h.siteID, user.SiteID, "member_removed"), mustMarshal(outbox)); err != nil {
326346
slog.Error("outbox publish failed", "error", err, "destSiteID", user.SiteID)
327347
}
328348
}
@@ -427,24 +447,50 @@ func (h *Handler) processRemoveOrg(ctx context.Context, req *model.RemoveMemberR
427447
slog.Error("system message publish failed", "error", err, "roomID", req.RoomID)
428448
}
429449

430-
// Cross-site outbox grouped by destination site
450+
// Publish InboxMemberEvent for downstream search-sync indexing. Split
451+
// by home site — local accounts go to local INBOX, remote accounts go
452+
// to per-site OUTBOX. Like processRemoveIndividual, remove events omit
453+
// RoomName/RoomType (not needed for spotlight deletes or user-room
454+
// script-removes). OrgID is NOT carried on InboxMemberEvent — it's an
455+
// inbox-worker concern for org-level cleanup, not a search-index one.
456+
var localAccounts []string
431457
siteAccounts := make(map[string][]string)
432458
for _, m := range toRemove {
433-
if m.SiteID != h.siteID {
459+
if m.SiteID == h.siteID {
460+
localAccounts = append(localAccounts, m.Account)
461+
} else {
434462
siteAccounts[m.SiteID] = append(siteAccounts[m.SiteID], m.Account)
435463
}
436464
}
465+
466+
if len(localAccounts) > 0 {
467+
inboxEvt := model.InboxMemberEvent{
468+
RoomID: req.RoomID,
469+
SiteID: h.siteID,
470+
Accounts: localAccounts,
471+
Timestamp: now.UnixMilli(),
472+
}
473+
outbox := model.OutboxEvent{
474+
Type: model.OutboxMemberRemoved,
475+
SiteID: h.siteID,
476+
DestSiteID: h.siteID,
477+
Payload: mustMarshal(inboxEvt),
478+
Timestamp: now.UnixMilli(),
479+
}
480+
if err := h.publish(ctx, subject.InboxMemberRemoved(h.siteID), mustMarshal(outbox)); err != nil {
481+
slog.Error("inbox member_removed publish failed", "error", err, "roomID", req.RoomID)
482+
}
483+
}
484+
437485
for destSiteID, accounts := range siteAccounts {
438-
evt := model.MemberRemoveEvent{
439-
Type: "member_removed",
486+
evt := model.InboxMemberEvent{
440487
RoomID: req.RoomID,
441-
Accounts: accounts,
442488
SiteID: h.siteID,
443-
OrgID: req.OrgID,
489+
Accounts: accounts,
444490
Timestamp: now.UnixMilli(),
445491
}
446492
outbox := model.OutboxEvent{
447-
Type: "member_removed",
493+
Type: model.OutboxMemberRemoved,
448494
SiteID: h.siteID,
449495
DestSiteID: destSiteID,
450496
Payload: mustMarshal(evt),
@@ -665,32 +711,64 @@ func (h *Handler) processAddMembers(ctx context.Context, data []byte) error {
665711
slog.Error("system message publish failed", "error", err, "roomID", req.RoomID)
666712
}
667713

668-
// 10. Outbox for cross-site members — batched by destination site
714+
// 10. Publish InboxMemberEvent to local INBOX (same-site accounts) and
715+
// OUTBOX (cross-site accounts). Split subscriptions by home site and
716+
// fan out: one INBOX publish for all local accounts, one OUTBOX
717+
// publish per remote destination site. InboxMemberEvent is a
718+
// superset of MemberAddEvent (adds RoomName + RoomType), so
719+
// inbox-worker on the destination site continues to unmarshal it
720+
// into MemberAddEvent and ignores the extra fields.
721+
var localAccounts []string
669722
remoteSiteMembers := make(map[string][]string)
670723
for _, sub := range subs {
671724
user, ok := userMap[sub.User.Account]
672-
if !ok || user.SiteID == room.SiteID {
725+
if !ok {
673726
continue
674727
}
675-
remoteSiteMembers[user.SiteID] = append(remoteSiteMembers[user.SiteID], sub.User.Account)
728+
if user.SiteID == room.SiteID {
729+
localAccounts = append(localAccounts, sub.User.Account)
730+
} else {
731+
remoteSiteMembers[user.SiteID] = append(remoteSiteMembers[user.SiteID], sub.User.Account)
732+
}
676733
}
734+
735+
inboxEvt := model.InboxMemberEvent{
736+
RoomID: req.RoomID,
737+
RoomName: room.Name,
738+
RoomType: room.Type,
739+
SiteID: room.SiteID,
740+
Accounts: nil, // filled per publish below
741+
HistorySharedSince: historySharedSince,
742+
JoinedAt: req.Timestamp,
743+
Timestamp: now.UnixMilli(),
744+
}
745+
746+
if len(localAccounts) > 0 {
747+
local := inboxEvt
748+
local.Accounts = localAccounts
749+
outbox := model.OutboxEvent{
750+
Type: model.OutboxMemberAdded,
751+
SiteID: room.SiteID,
752+
DestSiteID: room.SiteID,
753+
Payload: mustMarshal(local),
754+
Timestamp: now.UnixMilli(),
755+
}
756+
if err := h.publish(ctx, subject.InboxMemberAdded(room.SiteID), mustMarshal(outbox)); err != nil {
757+
slog.Error("inbox member_added publish failed", "error", err, "roomID", req.RoomID)
758+
}
759+
}
760+
677761
for destSiteID, accounts := range remoteSiteMembers {
678-
siteEvt := model.MemberAddEvent{
679-
Type: "member_added",
680-
RoomID: req.RoomID,
681-
Accounts: accounts,
682-
SiteID: room.SiteID,
683-
JoinedAt: req.Timestamp,
684-
HistorySharedSince: historySharedSince,
685-
Timestamp: now.UnixMilli(),
686-
}
687-
siteEvtData, _ := json.Marshal(siteEvt)
762+
remote := inboxEvt
763+
remote.Accounts = accounts
688764
outbox := model.OutboxEvent{
689-
Type: "member_added", SiteID: room.SiteID, DestSiteID: destSiteID,
690-
Payload: siteEvtData, Timestamp: now.UnixMilli(),
765+
Type: model.OutboxMemberAdded,
766+
SiteID: room.SiteID,
767+
DestSiteID: destSiteID,
768+
Payload: mustMarshal(remote),
769+
Timestamp: now.UnixMilli(),
691770
}
692-
outboxData, _ := json.Marshal(outbox)
693-
if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), outboxData); err != nil {
771+
if err := h.publish(ctx, subject.Outbox(room.SiteID, destSiteID, "member_added"), mustMarshal(outbox)); err != nil {
694772
return fmt.Errorf("outbox publish to %s failed: %w", destSiteID, err)
695773
}
696774
}

0 commit comments

Comments
 (0)