913 lines
34 KiB
Go
913 lines
34 KiB
Go
package csapi_tests
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/tidwall/gjson"
|
|
|
|
"github.com/matrix-org/complement"
|
|
"github.com/matrix-org/complement/b"
|
|
"github.com/matrix-org/complement/client"
|
|
"github.com/matrix-org/complement/helpers"
|
|
"github.com/matrix-org/complement/federation"
|
|
"github.com/matrix-org/complement/runtime"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
"github.com/matrix-org/gomatrixserverlib/fclient"
|
|
"github.com/matrix-org/util"
|
|
)
|
|
|
|
// Observes "first bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673
|
|
func TestCumulativeJoinLeaveJoinSync(t *testing.T) {
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
roomID := bob.MustCreateRoom(t, map[string]interface{}{
|
|
"preset": "public_chat",
|
|
})
|
|
|
|
var since string
|
|
|
|
// Get floating next_batch from before joining at all
|
|
_, since = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
|
|
|
|
alice.MustJoinRoom(t, roomID, nil)
|
|
|
|
// This assumes that sync does not have side-effects in servers.
|
|
//
|
|
// The alternative would be to sleep, but that is not acceptable here.
|
|
sinceJoin := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
|
|
alice.MustLeaveRoom(t, roomID)
|
|
|
|
sinceLeave := alice.MustSyncUntil(t, client.SyncReq{Since: sinceJoin}, client.SyncLeftFrom(alice.UserID, roomID))
|
|
|
|
alice.MustJoinRoom(t, roomID, nil)
|
|
|
|
alice.MustSyncUntil(t, client.SyncReq{Since: sinceLeave}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
|
|
jsonRes, _ := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0", Since: since})
|
|
if jsonRes.Get("rooms.leave." + client.GjsonEscape(roomID)).Exists() {
|
|
t.Errorf("Incremental sync has joined-left-joined room showing up in leave section, this shouldnt be the case.")
|
|
}
|
|
}
|
|
|
|
// Observes "second bug" from https://github.com/matrix-org/dendrite/pull/1394#issuecomment-687056673
|
|
func TestTentativeEventualJoiningAfterRejecting(t *testing.T) {
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{
|
|
"preset": "public_chat",
|
|
})
|
|
|
|
var since string
|
|
var jsonRes gjson.Result
|
|
|
|
// Get floating current next_batch
|
|
_, since = alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
|
|
|
|
alice.MustInviteRoom(t, roomID, bob.UserID)
|
|
|
|
bob.MustSyncUntil(t, client.SyncReq{}, client.SyncInvitedTo(bob.UserID, roomID))
|
|
|
|
// This rejects the invite
|
|
bob.MustLeaveRoom(t, roomID)
|
|
|
|
// Full sync
|
|
leaveExists := false
|
|
start := time.Now()
|
|
for !leaveExists && time.Since(start) < 1*time.Second {
|
|
jsonRes, since = bob.MustSync(t, client.SyncReq{TimeoutMillis: "0", FullState: true, Since: since})
|
|
leaveExists = jsonRes.Get("rooms.leave." + client.GjsonEscape(roomID)).Exists()
|
|
}
|
|
if !leaveExists {
|
|
t.Errorf("Bob just rejected an invite, it should show up under 'leave' in a full sync")
|
|
}
|
|
|
|
bob.MustJoinRoom(t, roomID, nil)
|
|
|
|
start = time.Now()
|
|
leaveExists = true
|
|
for leaveExists && time.Since(start) < 1*time.Second {
|
|
jsonRes, since = bob.MustSync(t, client.SyncReq{TimeoutMillis: "0", FullState: true, Since: since})
|
|
leaveExists = jsonRes.Get("rooms.leave." + client.GjsonEscape(roomID)).Exists()
|
|
}
|
|
if leaveExists {
|
|
t.Errorf("Bob has rejected an invite, but then just joined the public room anyways, it should not show up under 'leave' in a full sync %s", since)
|
|
}
|
|
}
|
|
|
|
func TestSync(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Dendrite) // FIXME: https://github.com/matrix-org/dendrite/issues/1324
|
|
// sytest: Can sync
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
filterID := createFilter(t, alice, map[string]interface{}{
|
|
"room": map[string]interface{}{
|
|
"timeline": map[string]interface{}{
|
|
"limit": 10,
|
|
},
|
|
},
|
|
})
|
|
|
|
t.Run("parallel", func(t *testing.T) {
|
|
// sytest: Can sync a joined room
|
|
t.Run("Can sync a joined room", func(t *testing.T) {
|
|
t.Parallel()
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
res, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
|
|
// check all required fields exist
|
|
checkJoinFieldsExist(t, res, roomID)
|
|
// sync again
|
|
res, _ = alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch})
|
|
if res.Get("rooms.join." + client.GjsonEscape(roomID)).Exists() {
|
|
t.Errorf("unchanged room %s should not be in the sync", roomID)
|
|
}
|
|
})
|
|
// sytest: Full state sync includes joined rooms
|
|
t.Run("Full state sync includes joined rooms", func(t *testing.T) {
|
|
t.Parallel()
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
|
|
|
|
res, _ := alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch, FullState: true})
|
|
checkJoinFieldsExist(t, res, roomID)
|
|
})
|
|
// sytest: Newly joined room is included in an incremental sync
|
|
t.Run("Newly joined room is included in an incremental sync", func(t *testing.T) {
|
|
t.Parallel()
|
|
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
res, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch})
|
|
checkJoinFieldsExist(t, res, roomID)
|
|
res, _ = alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch})
|
|
if res.Get("rooms.join." + client.GjsonEscape(roomID)).Exists() {
|
|
t.Errorf("unchanged room %s should not be in the sync", roomID)
|
|
}
|
|
})
|
|
// sytest: Newly joined room has correct timeline in incremental sync
|
|
t.Run("Newly joined room has correct timeline in incremental sync", func(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Dendrite) // FIXME: https://github.com/matrix-org/dendrite/issues/1324
|
|
t.Parallel()
|
|
|
|
filterBob := createFilter(t, bob, map[string]interface{}{
|
|
"room": map[string]interface{}{
|
|
"timeline": map[string]interface{}{
|
|
"limit": 10,
|
|
"types": []string{"m.room.message"},
|
|
},
|
|
"state": map[string]interface{}{
|
|
"types": []string{},
|
|
},
|
|
},
|
|
})
|
|
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
|
|
sendMessages(t, alice, roomID, "alice message 1-", 4)
|
|
_, nextBatch := bob.MustSync(t, client.SyncReq{Filter: filterBob})
|
|
sendMessages(t, alice, roomID, "alice message 2-", 4)
|
|
bob.MustJoinRoom(t, roomID, []string{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
|
|
res, _ := bob.MustSync(t, client.SyncReq{Filter: filterBob, Since: nextBatch})
|
|
room := res.Get("rooms.join." + client.GjsonEscape(roomID))
|
|
timeline := room.Get("timeline")
|
|
limited := timeline.Get("limited").Bool()
|
|
timelineEvents := timeline.Get("events").Array()
|
|
for _, event := range timelineEvents {
|
|
if event.Get("type").Str != "m.room.message" {
|
|
t.Errorf("Only expected 'm.room.message' events")
|
|
}
|
|
}
|
|
if len(timelineEvents) == 6 {
|
|
if limited {
|
|
t.Errorf("Timeline has all the events so shouldn't be limited: %+v", timeline)
|
|
}
|
|
} else {
|
|
if !limited {
|
|
t.Errorf("Timeline doesn't have all the events so should be limited: %+v", timeline)
|
|
}
|
|
}
|
|
})
|
|
// sytest: Newly joined room includes presence in incremental sync
|
|
t.Run("Newly joined room includes presence in incremental sync", func(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Dendrite) // FIXME: https://github.com/matrix-org/dendrite/issues/1324
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
_, nextBatch := bob.MustSync(t, client.SyncReq{})
|
|
bob.MustJoinRoom(t, roomID, []string{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
|
|
nextBatch = bob.MustSyncUntil(t, client.SyncReq{Since: nextBatch}, func(userID string, sync gjson.Result) error {
|
|
presence := sync.Get("presence")
|
|
if len(presence.Get("events").Array()) == 0 {
|
|
return fmt.Errorf("presence.events is empty: %+v", presence)
|
|
}
|
|
usersInPresenceEvents(t, presence, []string{alice.UserID})
|
|
return nil
|
|
})
|
|
// There should be no new presence events
|
|
res, _ := bob.MustSync(t, client.SyncReq{Since: nextBatch})
|
|
usersInPresenceEvents(t, res.Get("presence"), []string{})
|
|
})
|
|
// sytest: Get presence for newly joined members in incremental sync
|
|
t.Run("Get presence for newly joined members in incremental sync", func(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Dendrite) // FIXME: https://github.com/matrix-org/dendrite/issues/1324
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
nextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, roomID))
|
|
sendMessages(t, alice, roomID, "dummy message", 1)
|
|
_, nextBatch = alice.MustSync(t, client.SyncReq{Since: nextBatch})
|
|
bob.MustJoinRoom(t, roomID, []string{})
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(bob.UserID, roomID))
|
|
|
|
// wait until there are presence events
|
|
nextBatch = alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch}, func(userID string, sync gjson.Result) error {
|
|
presence := sync.Get("presence")
|
|
if len(presence.Get("events").Array()) == 0 {
|
|
return fmt.Errorf("presence.events is empty: %+v", presence)
|
|
}
|
|
usersInPresenceEvents(t, presence, []string{bob.UserID})
|
|
return nil
|
|
})
|
|
// There should be no new presence events
|
|
res, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch})
|
|
usersInPresenceEvents(t, res.Get("presence"), []string{})
|
|
})
|
|
|
|
t.Run("sync should succeed even if the sync token points to a redaction of an unknown event", func(t *testing.T) {
|
|
// this is a regression test for https://github.com/matrix-org/synapse/issues/12864
|
|
//
|
|
// The idea here is that we need a sync token which points to a redaction
|
|
// for an event which doesn't exist. Such a redaction may not be served to
|
|
// the client. This can lead to server bugs when the server tries to fetch
|
|
// the event corresponding to the sync token.
|
|
//
|
|
// The C-S API does not permit us to generate such a redaction event, so
|
|
// we have to poke it in from a federated server.
|
|
//
|
|
// The situation is complicated further by the very fact that we
|
|
// cannot see the faulty redaction, and therefore cannot tell whether
|
|
// our sync token includes it or not. The normal trick here would be
|
|
// to send another (regular) event as a sentinel, and then if that sentinel
|
|
// is returned by /sync, we can be sure the faulty event has also been
|
|
// processed. However, that doesn't work here, because doing so will mean
|
|
// that the sync token points to the sentinel rather than the redaction,
|
|
// negating the whole point of the test.
|
|
//
|
|
// Instead, as a rough proxy, we send a sentinel in a *different* room.
|
|
// There is no guarantee that the target server will process the events
|
|
// in the order we send them, but in practice it seems to get close
|
|
// enough.
|
|
|
|
t.Parallel()
|
|
|
|
// alice creates two rooms, which charlie (on our test server) joins
|
|
srv := federation.NewServer(t, deployment,
|
|
federation.HandleKeyRequests(),
|
|
federation.HandleTransactionRequests(nil, nil),
|
|
)
|
|
cancel := srv.Listen()
|
|
defer cancel()
|
|
|
|
charlie := srv.UserID("charlie")
|
|
|
|
redactionRoomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
redactionRoom := srv.MustJoinRoom(t, deployment, "hs1", redactionRoomID, charlie)
|
|
|
|
sentinelRoomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
sentinelRoom := srv.MustJoinRoom(t, deployment, "hs1", sentinelRoomID, charlie)
|
|
|
|
// charlie creates a bogus redaction, which he sends out, followed by
|
|
// a good event - in another room - to act as a sentinel. It's not
|
|
// guaranteed, but hopefully if the sentinel is received, so was the
|
|
// redaction.
|
|
redactionEvent := srv.MustCreateEvent(t, redactionRoom, federation.Event{
|
|
Type: "m.room.redaction",
|
|
Sender: charlie,
|
|
Content: map[string]interface{}{},
|
|
Redacts: "$12345"})
|
|
redactionRoom.AddEvent(redactionEvent)
|
|
t.Logf("Created redaction event %s", redactionEvent.EventID())
|
|
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{redactionEvent.JSON()}, nil)
|
|
|
|
sentinelEvent := srv.MustCreateEvent(t, sentinelRoom, federation.Event{
|
|
Type: "m.room.test",
|
|
Sender: charlie,
|
|
Content: map[string]interface{}{"body": "1234"},
|
|
})
|
|
sentinelRoom.AddEvent(sentinelEvent)
|
|
t.Logf("Created sentinel event %s", sentinelEvent.EventID())
|
|
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{redactionEvent.JSON(), sentinelEvent.JSON()}, nil)
|
|
|
|
// wait for the sentinel to arrive
|
|
nextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(sentinelRoomID, sentinelEvent.EventID()))
|
|
|
|
// charlie sends another batch of events to force a gappy sync.
|
|
// We have to send 11 events to force a gap, since we use a filter with a timeline limit of 10 events.
|
|
pdus := make([]json.RawMessage, 11)
|
|
var lastSentEventId string
|
|
for i := range pdus {
|
|
ev := srv.MustCreateEvent(t, redactionRoom, federation.Event{
|
|
Type: "m.room.message",
|
|
Sender: charlie,
|
|
Content: map[string]interface{}{},
|
|
})
|
|
redactionRoom.AddEvent(ev)
|
|
pdus[i] = ev.JSON()
|
|
lastSentEventId = ev.EventID()
|
|
}
|
|
srv.MustSendTransaction(t, deployment, "hs1", pdus, nil)
|
|
t.Logf("Sent filler events, with final event %s", lastSentEventId)
|
|
|
|
// sync, starting from the same ?since each time, until the final message turns up.
|
|
// This is basically an inlining of MustSyncUntil, with the key difference that we
|
|
// keep the same ?since each time, instead of incrementally syncing on each pass.
|
|
numResponsesReturned := 0
|
|
start := time.Now()
|
|
t.Logf("Will sync with since=%s", nextBatch)
|
|
|
|
// This part of the test is flaky for workerised Synapse with the default 5 second timeout,
|
|
// so bump it up to 10 seconds.
|
|
alice.SyncUntilTimeout = 10 * time.Second
|
|
|
|
for {
|
|
if time.Since(start) > alice.SyncUntilTimeout {
|
|
t.Fatalf("%s: timed out after %v. Seen %d /sync responses", alice.UserID, time.Since(start), numResponsesReturned)
|
|
}
|
|
// sync, using a filter with a limit smaller than the number of PDUs we sent.
|
|
syncResponse, _ := alice.MustSync(t, client.SyncReq{Filter: filterID, Since: nextBatch})
|
|
numResponsesReturned += 1
|
|
timeline := syncResponse.Get("rooms.join." + client.GjsonEscape(redactionRoomID) + ".timeline")
|
|
timelineEvents := timeline.Get("events").Array()
|
|
|
|
if len(timelineEvents) > 0 {
|
|
lastEventIdInSync := timelineEvents[len(timelineEvents)-1].Get("event_id").String()
|
|
t.Logf("Iteration %d: /sync returned %d events, with final event %s", numResponsesReturned, len(timelineEvents), lastEventIdInSync)
|
|
|
|
if lastEventIdInSync == lastSentEventId {
|
|
// check we actually got a gappy sync - else this test isn't testing the right thing
|
|
if !timeline.Get("limited").Bool() {
|
|
t.Fatalf("Not a gappy sync after redaction")
|
|
}
|
|
break
|
|
}
|
|
} else {
|
|
t.Logf("Iteration %d: /sync returned %d events", numResponsesReturned, len(timelineEvents))
|
|
}
|
|
|
|
}
|
|
|
|
// that's it - we successfully did a gappy sync.
|
|
})
|
|
|
|
t.Run("Device list tracking", func(t *testing.T) {
|
|
// syncDeviceListsHas checks that `device_lists.changed` or `device_lists.left` contains a
|
|
// given user ID.
|
|
syncDeviceListsHas := func(section string, expectedUserID string) client.SyncCheckOpt {
|
|
jsonPath := fmt.Sprintf("device_lists.%s", section)
|
|
return func(clientUserID string, topLevelSyncJSON gjson.Result) error {
|
|
usersWithChangedDeviceListsArray := topLevelSyncJSON.Get(jsonPath).Array()
|
|
for _, userID := range usersWithChangedDeviceListsArray {
|
|
if userID.Str == expectedUserID {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf(
|
|
"syncDeviceListsHas: %s not found in %s",
|
|
expectedUserID,
|
|
jsonPath,
|
|
)
|
|
}
|
|
}
|
|
|
|
t.Run("User is correctly listed when they leave, even when lazy loading is enabled", func(t *testing.T) {
|
|
// Alice creates a room, and starts syncing with lazy-loading enabled.
|
|
// Charlie joins, sends an event, and then leaves, the room.
|
|
// We check that Charlie appears under the "device_lists" section of the sync.
|
|
//
|
|
// Regression test for https://github.com/element-hq/synapse/issues/16948
|
|
|
|
charlie := deployment.Register(t, "hs1", helpers.RegistrationOpts{LocalpartSuffix: "charlie"})
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
|
|
aliceSyncFilter := `{
|
|
"room": {
|
|
"timeline": { "lazy_load_members": true },
|
|
"state": { "lazy_load_members": true }
|
|
}
|
|
}`
|
|
_, initialSyncToken := alice.MustSync(t, client.SyncReq{Filter: aliceSyncFilter})
|
|
|
|
charlie.MustJoinRoom(t, roomID, nil)
|
|
syncToken := alice.MustSyncUntil(t, client.SyncReq{Filter: aliceSyncFilter, Since: initialSyncToken},
|
|
syncDeviceListsHas("changed", charlie.UserID),
|
|
)
|
|
|
|
// Charlie sends a message, and leaves
|
|
sendMessages(t, charlie, roomID, "test", 1)
|
|
charlie.MustLeaveRoom(t, roomID)
|
|
|
|
// Alice sees charlie in the "left" section
|
|
alice.MustSyncUntil(t, client.SyncReq{Filter: aliceSyncFilter, Since: syncToken},
|
|
syncDeviceListsHas("left", charlie.UserID),
|
|
)
|
|
|
|
// ... even if she makes the request twice
|
|
resp, _ := alice.MustSync(t, client.SyncReq{Filter: aliceSyncFilter, Since: syncToken})
|
|
if err := syncDeviceListsHas("left", charlie.UserID)(alice.UserID, resp); err != nil {
|
|
t.Error(err)
|
|
}
|
|
})
|
|
})
|
|
})
|
|
}
|
|
|
|
// This is a regression test for
|
|
// https://github.com/matrix-org/synapse/issues/16463
|
|
//
|
|
// We test this by having a local user (alice) and remote user (charlie) in a
|
|
// room. Charlie sends 50+ messages into the room without sending to Alice's
|
|
// server. Charlie then sends one more which get sent to Alice.
|
|
//
|
|
// Alice should observe that she receives some (though not all) of charlie's
|
|
// events, with the `limited` flag set.
|
|
func TestSyncTimelineGap(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Dendrite)
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
srv := federation.NewServer(t, deployment,
|
|
federation.HandleKeyRequests(),
|
|
federation.HandleTransactionRequests(nil, nil),
|
|
)
|
|
cancel := srv.Listen()
|
|
defer cancel()
|
|
|
|
charlie := srv.UserID("charlie")
|
|
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{"preset": "public_chat"})
|
|
room := srv.MustJoinRoom(t, deployment, "hs1", roomID, charlie)
|
|
|
|
filterID := createFilter(t, alice, map[string]interface{}{
|
|
"room": map[string]interface{}{
|
|
"timeline": map[string]interface{}{
|
|
"limit": 20,
|
|
},
|
|
},
|
|
})
|
|
_, nextBatch := alice.MustSync(t, client.SyncReq{Filter: filterID})
|
|
t.Logf("Next batch %s", nextBatch)
|
|
|
|
alice.SendEventSynced(t, roomID, b.Event{
|
|
Type: "m.room.message",
|
|
Sender: alice.UserID,
|
|
Content: map[string]interface{}{
|
|
"body": "Hi from Alice!",
|
|
"msgtype": "m.text",
|
|
},
|
|
})
|
|
|
|
// Create 50 messages, but don't send them to Alice
|
|
var missingEvents []gomatrixserverlib.PDU
|
|
for i := 0; i < 50; i++ {
|
|
event := srv.MustCreateEvent(t, room, federation.Event{
|
|
Type: "m.room.message",
|
|
Sender: charlie,
|
|
Content: map[string]interface{}{
|
|
"body": "Remote message",
|
|
"msgtype": "m.text",
|
|
},
|
|
})
|
|
room.AddEvent(event)
|
|
missingEvents = append(missingEvents, event)
|
|
}
|
|
|
|
// Create one more event that we will send to Alice, which references the
|
|
// previous 50.
|
|
lastEvent := srv.MustCreateEvent(t, room, federation.Event{
|
|
Type: "m.room.message",
|
|
Sender: charlie,
|
|
Content: map[string]interface{}{
|
|
"body": "End",
|
|
"msgtype": "m.text",
|
|
},
|
|
})
|
|
room.AddEvent(lastEvent)
|
|
|
|
// Alice's HS will try and fill in the gap, so we need to respond to those
|
|
// requests.
|
|
respondToGetMissingEventsEndpoints(t, srv, room, missingEvents)
|
|
|
|
srv.MustSendTransaction(t, deployment, "hs1", []json.RawMessage{lastEvent.JSON()}, nil)
|
|
|
|
// We now test two different modes of /sync work. The first is when we are
|
|
// syncing when the server receives the `lastEvent` (and so, at least
|
|
// Synapse, will start sending down some events immediately). In this mode
|
|
// we may see alice's message, but charlie's messages should set the limited
|
|
// flag.
|
|
//
|
|
// The second mode is when we incremental sync *after* all the events have
|
|
// finished being persisted, and so we get only charlie's messages.
|
|
t.Run("incremental", func(t *testing.T) {
|
|
timelineSequence := make([]gjson.Result, 0)
|
|
|
|
t.Logf("Doing incremental syncs from %s", nextBatch)
|
|
|
|
// This just reads all timeline batches into `timelineSequence` until we see `lastEvent` come down
|
|
alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch, Filter: filterID}, func(clientUserID string, topLevelSyncJSON gjson.Result) error {
|
|
t.Logf("next batch %s", topLevelSyncJSON.Get("next_batch").Str)
|
|
|
|
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID) + ".timeline")
|
|
if !roomResult.Exists() {
|
|
return fmt.Errorf("No entry for room (%s)", roomID)
|
|
}
|
|
|
|
timelineSequence = append(timelineSequence, roomResult)
|
|
|
|
events := roomResult.Get("events")
|
|
if !events.Exists() || !events.IsArray() {
|
|
return fmt.Errorf("Invalid events entry (%s)", roomResult.Raw)
|
|
}
|
|
|
|
foundLastEvent := false
|
|
for _, ev := range events.Array() {
|
|
if ev.Get("event_id").Str == lastEvent.EventID() {
|
|
foundLastEvent = true
|
|
}
|
|
}
|
|
|
|
if !foundLastEvent {
|
|
return fmt.Errorf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
t.Logf("Got timeline sequence: %s", timelineSequence)
|
|
|
|
// Check that we only see Alice's message from before the gap *before*
|
|
// we seen any limited batches, and vice versa for Charlie's messages.
|
|
limited := false
|
|
for _, section := range timelineSequence {
|
|
limited = limited || section.Get("limited").Bool()
|
|
events := section.Get("events").Array()
|
|
for _, ev := range events {
|
|
if limited {
|
|
if ev.Get("sender").Str == alice.UserID {
|
|
t.Fatalf("Got message from alice after limited flag")
|
|
}
|
|
} else {
|
|
if ev.Get("sender").Str == charlie {
|
|
t.Fatalf("Got message from remote without limited flag being set")
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
if !limited {
|
|
t.Fatalf("No timeline batch for the room was limited")
|
|
}
|
|
})
|
|
|
|
t.Run("full", func(t *testing.T) {
|
|
// Wait until we see `lastEvent` come down sync implying that all events have been persisted
|
|
// by alice's homeserver.
|
|
alice.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHasEventID(roomID, lastEvent.EventID()))
|
|
|
|
// Now an incremental sync from before should return a limited batch for
|
|
// the room, with just Charlie's messages.
|
|
topLevelSyncJSON, _ := alice.MustSync(t, client.SyncReq{Since: nextBatch, Filter: filterID})
|
|
roomResult := topLevelSyncJSON.Get("rooms.join." + client.GjsonEscape(roomID))
|
|
if !roomResult.Exists() {
|
|
t.Fatalf("No entry for room (%s)", roomID)
|
|
}
|
|
|
|
eventsJson := roomResult.Get("timeline.events")
|
|
if !eventsJson.Exists() || !eventsJson.IsArray() {
|
|
t.Fatalf("Invalid events entry (%s)", roomResult.Raw)
|
|
}
|
|
|
|
eventsArray := eventsJson.Array()
|
|
|
|
if eventsArray[len(eventsArray)-1].Get("event_id").Str != lastEvent.EventID() {
|
|
t.Fatalf("Did not find lastEvent (%s) in timeline batch: (%s)", lastEvent.EventID(), roomResult.Raw)
|
|
}
|
|
|
|
if roomResult.Get("timeline.limited").Bool() == false {
|
|
t.Fatalf("Timeline batch was not limited (%s)", roomResult.Raw)
|
|
}
|
|
|
|
for _, ev := range eventsArray {
|
|
if ev.Get("sender").Str == alice.UserID {
|
|
t.Fatalf("Found an event from alice in batch (%s)", roomResult.Raw)
|
|
}
|
|
}
|
|
})
|
|
|
|
}
|
|
|
|
// Test presence from people in 2 different rooms in incremental sync
|
|
func TestPresenceSyncDifferentRooms(t *testing.T) {
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
charlie := deployment.Register(t, "hs1", helpers.RegistrationOpts{
|
|
LocalpartSuffix: "charlie",
|
|
})
|
|
|
|
// Alice creates two rooms: one with her and Bob, and a second with her and Charlie.
|
|
bobRoomID := alice.MustCreateRoom(t, map[string]interface{}{})
|
|
charlieRoomID := alice.MustCreateRoom(t, map[string]interface{}{})
|
|
nextBatch := alice.MustSyncUntil(t, client.SyncReq{}, client.SyncJoinedTo(alice.UserID, bobRoomID), client.SyncJoinedTo(alice.UserID, charlieRoomID))
|
|
|
|
alice.MustInviteRoom(t, bobRoomID, bob.UserID)
|
|
alice.MustInviteRoom(t, charlieRoomID, charlie.UserID)
|
|
bob.MustJoinRoom(t, bobRoomID, nil)
|
|
charlie.MustJoinRoom(t, charlieRoomID, nil)
|
|
|
|
nextBatch = alice.MustSyncUntil(t,
|
|
client.SyncReq{Since: nextBatch},
|
|
client.SyncJoinedTo(bob.UserID, bobRoomID),
|
|
client.SyncJoinedTo(charlie.UserID, charlieRoomID),
|
|
)
|
|
|
|
// Bob and Charlie mark themselves as online.
|
|
reqBody := client.WithJSONBody(t, map[string]interface{}{
|
|
"presence": "online",
|
|
})
|
|
bob.Do(t, "PUT", []string{"_matrix", "client", "v3", "presence", bob.UserID, "status"}, reqBody)
|
|
charlie.Do(t, "PUT", []string{"_matrix", "client", "v3", "presence", charlie.UserID, "status"}, reqBody)
|
|
|
|
// Alice should see that Bob and Charlie are online. She may see this happen
|
|
// simultaneously in one /sync response, or separately in two /sync
|
|
// responses.
|
|
seenBobOnline, seenCharlieOnline := false, false
|
|
|
|
alice.MustSyncUntil(t, client.SyncReq{Since: nextBatch}, func(clientUserID string, sync gjson.Result) error {
|
|
presenceArray := sync.Get("presence").Get("events").Array()
|
|
if len(presenceArray) == 0 {
|
|
return fmt.Errorf("presence.events is empty")
|
|
}
|
|
for _, x := range presenceArray {
|
|
if x.Get("content").Get("presence").Str != "online" {
|
|
continue
|
|
}
|
|
if x.Get("sender").Str == bob.UserID {
|
|
seenBobOnline = true
|
|
}
|
|
if x.Get("sender").Str == charlie.UserID {
|
|
seenCharlieOnline = true
|
|
}
|
|
if seenBobOnline && seenCharlieOnline {
|
|
return nil
|
|
}
|
|
}
|
|
return fmt.Errorf("all users not present yet, bob %t charlie %t", seenBobOnline, seenCharlieOnline)
|
|
})
|
|
}
|
|
|
|
func TestRoomSummary(t *testing.T) {
|
|
runtime.SkipIf(t, runtime.Synapse) // Currently more of a Dendrite test, so skip on Synapse
|
|
deployment := complement.Deploy(t, 1)
|
|
defer deployment.Destroy(t)
|
|
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
bob := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
|
|
|
|
_, aliceSince := alice.MustSync(t, client.SyncReq{TimeoutMillis: "0"})
|
|
roomID := alice.MustCreateRoom(t, map[string]interface{}{
|
|
"preset": "public_chat",
|
|
"invite": []string{bob.UserID},
|
|
})
|
|
aliceSince = alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince},
|
|
client.SyncJoinedTo(alice.UserID, roomID),
|
|
func(clientUserID string, syncResp gjson.Result) error {
|
|
summary := syncResp.Get("rooms.join." + client.GjsonEscape(roomID) + ".summary")
|
|
invitedUsers := summary.Get(client.GjsonEscape("m.invited_member_count")).Int()
|
|
joinedUsers := summary.Get(client.GjsonEscape("m.joined_member_count")).Int()
|
|
// We expect there to be one joined and one invited user
|
|
if invitedUsers != 1 || joinedUsers != 1 {
|
|
return fmt.Errorf("expected one invited and one joined user, got %d and %d: %v", invitedUsers, joinedUsers, summary.Raw)
|
|
}
|
|
return nil
|
|
},
|
|
)
|
|
|
|
joinedCheck := func(clientUserID string, syncResp gjson.Result) error {
|
|
summary := syncResp.Get("rooms.join." + client.GjsonEscape(roomID) + ".summary")
|
|
invitedUsers := summary.Get(client.GjsonEscape("m.invited_member_count")).Int()
|
|
joinedUsers := summary.Get(client.GjsonEscape("m.joined_member_count")).Int()
|
|
// We expect there to be two joined and no invited user
|
|
if invitedUsers != 0 || joinedUsers != 2 {
|
|
return fmt.Errorf("expected no invited and two joined user, got %d and %d: %v", invitedUsers, joinedUsers, summary.Raw)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
sinceToken := bob.MustSyncUntil(t, client.SyncReq{}, client.SyncInvitedTo(bob.UserID, roomID))
|
|
bob.MustJoinRoom(t, roomID, []string{})
|
|
// Verify Bob sees the correct room summary
|
|
bob.MustSyncUntil(t, client.SyncReq{Since: sinceToken}, client.SyncJoinedTo(bob.UserID, roomID), joinedCheck)
|
|
// .. and Alice as well.
|
|
alice.MustSyncUntil(t, client.SyncReq{Since: aliceSince}, client.SyncJoinedTo(bob.UserID, roomID), joinedCheck)
|
|
}
|
|
|
|
func sendMessages(t *testing.T, client *client.CSAPI, roomID string, prefix string, count int) {
|
|
t.Helper()
|
|
for i := 0; i < count; i++ {
|
|
client.SendEventSynced(t, roomID, b.Event{
|
|
Sender: client.UserID,
|
|
Type: "m.room.message",
|
|
Content: map[string]interface{}{
|
|
"body": fmt.Sprintf("%s%d", prefix, i),
|
|
"msgtype": "m.text",
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
func checkJoinFieldsExist(t *testing.T, res gjson.Result, roomID string) {
|
|
t.Helper()
|
|
room := res.Get("rooms.join." + client.GjsonEscape(roomID))
|
|
timeline := room.Get("timeline")
|
|
if timeline.Exists() {
|
|
for _, x := range []string{"events", "limited"} {
|
|
if !timeline.Get(x).Exists() {
|
|
t.Errorf("timeline %s does not exist", x)
|
|
}
|
|
}
|
|
}
|
|
state := room.Get("state")
|
|
if state.Exists() {
|
|
if !state.Get("events").Exists() {
|
|
t.Errorf("state events do not exist")
|
|
}
|
|
if !state.Get("events").IsArray() {
|
|
t.Errorf("state events is not an array")
|
|
}
|
|
}
|
|
ephemeral := room.Get("ephemeral")
|
|
if ephemeral.Exists() {
|
|
if !ephemeral.Get("events").Exists() {
|
|
t.Errorf("ephemeral events do not exist")
|
|
}
|
|
if !ephemeral.Get("events").IsArray() {
|
|
t.Errorf("ephemeral events is not an array")
|
|
}
|
|
}
|
|
}
|
|
|
|
// usersInPresenceEvents checks that all users are present in presence.events. If the users list is empty,
|
|
// it is expected that presence.events is empty as well. Also verifies that all needed fields are present.
|
|
func usersInPresenceEvents(t *testing.T, presence gjson.Result, users []string) {
|
|
t.Helper()
|
|
if users == nil {
|
|
t.Fatal("can not use nil as string slice")
|
|
}
|
|
|
|
presenceEvents := presence.Get("events").Array()
|
|
|
|
if len(users) > len(presenceEvents) {
|
|
t.Fatalf("expected at least %d presence events, got %d", len(users), len(presenceEvents))
|
|
}
|
|
|
|
foundCounter := 0
|
|
for i := range users {
|
|
for _, x := range presenceEvents {
|
|
if x.Get("sender").Str == users[i] {
|
|
foundCounter++
|
|
}
|
|
ok := x.Get("type").Exists() && x.Get("sender").Exists() && x.Get("content").Exists()
|
|
if !ok {
|
|
t.Fatalf("missing field for presence event:")
|
|
}
|
|
if x.Get("type").Str != "m.presence" {
|
|
t.Fatalf("expected event type to be 'm.presence', got %s", x.Get("type").Str)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(users) != foundCounter {
|
|
t.Fatalf("expected %d presence events, got %d: %+v", len(users), foundCounter, presenceEvents)
|
|
}
|
|
}
|
|
|
|
func eventIDsFromEvents(he []gomatrixserverlib.PDU) []string {
|
|
eventIDs := make([]string, len(he))
|
|
for i := range he {
|
|
eventIDs[i] = he[i].EventID()
|
|
}
|
|
return eventIDs
|
|
}
|
|
|
|
// Helper method to respond to federation APIs associated with trying to get missing events.
|
|
func respondToGetMissingEventsEndpoints(t *testing.T, srv *federation.Server, room *federation.ServerRoom, missingEvents []gomatrixserverlib.PDU) {
|
|
srv.Mux().HandleFunc(
|
|
"/_matrix/federation/v1/state_ids/{roomID}",
|
|
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
|
|
t.Logf("Got /state_ids for %s", pathParams["roomID"])
|
|
if pathParams["roomID"] != room.RoomID {
|
|
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: "wrong room",
|
|
}
|
|
}
|
|
|
|
roomState := room.AllCurrentState()
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: map[string]interface{}{
|
|
"pdu_ids": eventIDsFromEvents(roomState),
|
|
"auth_chain_ids": eventIDsFromEvents(room.AuthChainForEvents(roomState)),
|
|
},
|
|
}
|
|
})).Methods("GET")
|
|
|
|
srv.Mux().HandleFunc(
|
|
"/_matrix/federation/v1/state/{roomID}",
|
|
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
|
|
t.Logf("Got /state for %s", pathParams["roomID"])
|
|
if pathParams["roomID"] != room.RoomID {
|
|
t.Errorf("Received /state_ids for the wrong room: %s", room.RoomID)
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: "wrong room",
|
|
}
|
|
}
|
|
|
|
roomState := room.AllCurrentState()
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: map[string]interface{}{
|
|
"pdus": roomState,
|
|
"auth_chain": room.AuthChainForEvents(roomState),
|
|
},
|
|
}
|
|
})).Methods("GET")
|
|
|
|
srv.Mux().HandleFunc(
|
|
"/_matrix/federation/v1/event/{eventID}",
|
|
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
|
|
t.Logf("Got /event for %s", pathParams["eventID"])
|
|
|
|
for _, ev := range missingEvents {
|
|
if ev.EventID() == pathParams["eventID"] {
|
|
t.Logf("Returning event %s", pathParams["eventID"])
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: map[string]interface{}{
|
|
"origin": srv.ServerName(),
|
|
"origin_server_ts": 0,
|
|
"pdus": []json.RawMessage{ev.JSON()},
|
|
},
|
|
}
|
|
}
|
|
}
|
|
|
|
t.Logf("No event found")
|
|
return util.JSONResponse{
|
|
Code: 404,
|
|
JSON: map[string]interface{}{},
|
|
}
|
|
})).Methods("GET")
|
|
|
|
srv.Mux().HandleFunc(
|
|
"/_matrix/federation/v1/get_missing_events/{roomID}",
|
|
srv.ValidFederationRequest(t, func(fr *fclient.FederationRequest, pathParams map[string]string) util.JSONResponse {
|
|
t.Logf("Got /get_missing_events for %s", pathParams["roomID"])
|
|
if pathParams["roomID"] != room.RoomID {
|
|
t.Errorf("Received /get_missing_events for the wrong room: %s", room.RoomID)
|
|
return util.JSONResponse{
|
|
Code: 400,
|
|
JSON: "wrong room",
|
|
}
|
|
}
|
|
|
|
return util.JSONResponse{
|
|
Code: 200,
|
|
JSON: map[string]interface{}{
|
|
"events": missingEvents[len(missingEvents)-10:],
|
|
},
|
|
}
|
|
}),
|
|
).Methods("POST")
|
|
}
|