complement/tests/room_timestamp_to_event_test.go
kegsay 503bf6e126
Refactor the match.CheckOff API to use functional options (#711)
This means you don't need to tack on `, nil` all the time, and can
optionally specify when you want to allow unwanted items, obviating
the need for `JSONCheckOffAllowUnwanted`. This composes better and
supports adding additional functionality e.g allowing duplicate items.
2024-02-16 17:26:35 +00:00

468 lines
18 KiB
Go

//go:build !dendrite_blacklist
// +build !dendrite_blacklist
// This file contains tests for the `/timestamp_to_event` client and federation API
// endpoints (also known as *jump to date*). As defined by MSC3030, which you can read
// here: https://github.com/matrix-org/matrix-doc/pull/3030
package tests
import (
"fmt"
"net/url"
"strconv"
"sync/atomic"
"testing"
"time"
"github.com/matrix-org/complement"
"github.com/matrix-org/complement/b"
"github.com/matrix-org/complement/client"
"github.com/matrix-org/complement/helpers"
"github.com/matrix-org/complement/match"
"github.com/matrix-org/complement/must"
"github.com/tidwall/gjson"
"golang.org/x/exp/slices"
)
func TestJumpToDateEndpoint(t *testing.T) {
deployment := complement.OldDeploy(t, b.BlueprintHSWithApplicationService)
defer deployment.Destroy(t)
// Create the normal user which will send messages in the room
alice := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
// Create the federated user which will fetch the messages from a remote homeserver
remoteCharlie := deployment.Register(t, "hs2", helpers.RegistrationOpts{})
// Create the application service bridge user that can use the ?ts query parameter
asUserID := "@the-bridge-user:hs1"
as := deployment.AppServiceUser(t, "hs1", asUserID)
t.Run("parallel", func(t *testing.T) {
t.Run("should find event after given timestmap", func(t *testing.T) {
t.Parallel()
roomID, eventA, _ := createTestRoom(t, alice)
mustCheckEventisReturnedForTime(t, alice, roomID, eventA.BeforeTimestamp, "f", eventA.EventID)
})
t.Run("should find event before given timestmap", func(t *testing.T) {
t.Parallel()
roomID, _, eventB := createTestRoom(t, alice)
mustCheckEventisReturnedForTime(t, alice, roomID, eventB.AfterTimestamp, "b", eventB.EventID)
})
t.Run("should find nothing before the earliest timestmap", func(t *testing.T) {
t.Parallel()
timeBeforeRoomCreation := time.Now()
roomID, _, _ := createTestRoom(t, alice)
mustCheckEventisReturnedForTime(t, alice, roomID, timeBeforeRoomCreation, "b", "")
})
t.Run("should find nothing after the latest timestmap", func(t *testing.T) {
t.Parallel()
roomID, _, eventB := createTestRoom(t, alice)
mustCheckEventisReturnedForTime(t, alice, roomID, eventB.AfterTimestamp, "f", "")
})
t.Run("should find next event topologically before given timestamp when all message timestamps are the same", func(t *testing.T) {
t.Parallel()
roomID, _, _ := createTestRoom(t, alice)
// Join from the application service bridge user so we can use to send
// some messages at a specific time.
as.MustJoinRoom(t, roomID, []string{"hs1"})
// Send a couple messages with the same timestamp after the other test
// messages in the room.
timeBeforeMessageCreation := time.Now()
sendMessageWithTimestamp(t, as, alice, roomID, timeBeforeMessageCreation, "messageWithSameTime1")
messageIDWithSameTime2 := sendMessageWithTimestamp(t, as, alice, roomID, timeBeforeMessageCreation, "messageWithSameTime2")
// Looking backwards from the time the messages were sent, we should find
// message2. A naive MSC3030 implementation that only sorts by timestamp
// will probably return message1 since it's the first one in the database.
mustCheckEventisReturnedForTime(t, alice, roomID, timeBeforeMessageCreation, "b", messageIDWithSameTime2)
})
t.Run("should find next event topologically after given timestmap when all message timestamps are the same", func(t *testing.T) {
t.Parallel()
roomID, _, _ := createTestRoom(t, alice)
// Join from the application service bridge user so we can use to send
// some messages at a specific time.
as.MustJoinRoom(t, roomID, []string{"hs1"})
// Send a couple messages with the same timestamp after the other test
// messages in the room.
timeBeforeMessageCreation := time.Now()
messageIDWithSameTime1 := sendMessageWithTimestamp(t, as, alice, roomID, timeBeforeMessageCreation, "messageWithSameTime1")
sendMessageWithTimestamp(t, as, alice, roomID, timeBeforeMessageCreation, "messageWithSameTime2")
// Looking forwards from the time the messages were sent, we should find
// message1.
mustCheckEventisReturnedForTime(t, alice, roomID, timeBeforeMessageCreation, "f", messageIDWithSameTime1)
})
// Just a sanity check that we're not leaking anything from the `/timestamp_to_event` endpoint
t.Run("should not be able to query a private room you are not a member of", func(t *testing.T) {
t.Parallel()
timeBeforeRoomCreation := time.Now()
// Alice will create the private room
roomID := alice.MustCreateRoom(t, map[string]interface{}{
"preset": "private_chat",
})
// We will use Bob to query the room they're not a member of
nonMemberUser := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
// Make the `/timestamp_to_event` request from Bob's perspective (non room member)
timestamp := makeTimestampFromTime(timeBeforeRoomCreation)
timestampString := strconv.FormatInt(timestamp, 10)
timestampToEventRes := nonMemberUser.Do(t, "GET", []string{"_matrix", "client", "v1", "rooms", roomID, "timestamp_to_event"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"ts": []string{timestampString},
"dir": []string{"f"},
}))
// A random user is not allowed to query for events in a private room
// they're not a member of (forbidden).
if timestampToEventRes.StatusCode != 403 {
t.Fatalf("/timestamp_to_event returned %d HTTP status code but expected %d", timestampToEventRes.StatusCode, 403)
}
})
// Just a sanity check that we're not leaking anything from the `/timestamp_to_event` endpoint
t.Run("should not be able to query a public room you are not a member of", func(t *testing.T) {
t.Parallel()
timeBeforeRoomCreation := time.Now()
// Alice will create the public room
roomID := alice.MustCreateRoom(t, map[string]interface{}{
"preset": "public_chat",
})
// We will use Bob to query the room they're not a member of
nonMemberUser := deployment.Register(t, "hs1", helpers.RegistrationOpts{})
// Make the `/timestamp_to_event` request from Bob's perspective (non room member)
timestamp := makeTimestampFromTime(timeBeforeRoomCreation)
timestampString := strconv.FormatInt(timestamp, 10)
timestampToEventRes := nonMemberUser.Do(t, "GET", []string{"_matrix", "client", "v1", "rooms", roomID, "timestamp_to_event"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"ts": []string{timestampString},
"dir": []string{"f"},
}))
// A random user is not allowed to query for events in a public room
// they're not a member of (forbidden).
if timestampToEventRes.StatusCode != 403 {
t.Fatalf("/timestamp_to_event returned %d HTTP status code but expected %d", timestampToEventRes.StatusCode, 403)
}
})
t.Run("federation", func(t *testing.T) {
t.Run("looking forwards, should be able to find event that was sent before we joined", func(t *testing.T) {
t.Parallel()
roomID, eventA, _ := createTestRoom(t, alice)
remoteCharlie.MustJoinRoom(t, roomID, []string{"hs1"})
mustCheckEventisReturnedForTime(t, remoteCharlie, roomID, eventA.BeforeTimestamp, "f", eventA.EventID)
})
t.Run("looking backwards, should be able to find event that was sent before we joined", func(t *testing.T) {
t.Parallel()
roomID, _, eventB := createTestRoom(t, alice)
remoteCharlie.MustJoinRoom(t, roomID, []string{"hs1"})
mustCheckEventisReturnedForTime(t, remoteCharlie, roomID, eventB.AfterTimestamp, "b", eventB.EventID)
})
t.Run("when looking backwards before the room was created, should be able to find event that was imported", func(t *testing.T) {
t.Parallel()
timeBeforeRoomCreation := time.Now()
roomID, _, _ := createTestRoom(t, alice)
// Join from the application service bridge user so we can use it to send
// some messages at a specific time.
as.MustJoinRoom(t, roomID, []string{"hs1"})
// Import a message in the room before the room was created
importTime := time.Date(2022, 01, 03, 0, 0, 0, 0, time.Local)
importedEventID := sendMessageWithTimestamp(t, as, alice, roomID, importTime, "old imported event")
remoteCharlie.MustJoinRoom(t, roomID, []string{"hs1"})
mustCheckEventisReturnedForTime(t, remoteCharlie, roomID, timeBeforeRoomCreation, "b", importedEventID)
})
t.Run("can paginate after getting remote event from timestamp to event endpoint", func(t *testing.T) {
t.Parallel()
roomID, eventA, eventB := createTestRoom(t, alice)
remoteCharlie.MustJoinRoom(t, roomID, []string{"hs1"})
mustCheckEventisReturnedForTime(t, remoteCharlie, roomID, eventB.AfterTimestamp, "b", eventB.EventID)
// Get a pagination token from eventB
contextRes := remoteCharlie.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "context", eventB.EventID}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"limit": []string{"0"},
}))
contextResResBody := client.ParseJSON(t, contextRes)
paginationToken := client.GetJSONFieldStr(t, contextResResBody, "end")
// Hit `/messages` until `eventA` has been backfilled and replicated across
// workers (the worker persisting events isn't necessarily the same as the worker
// serving `/messages`)
fetchUntilMessagesResponseHas(t, remoteCharlie, roomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == eventA.EventID
})
// Paginate backwards from eventB
messagesRes := remoteCharlie.MustDo(t, "GET", []string{"_matrix", "client", "r0", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
"from": []string{paginationToken},
}))
// Make sure both messages are visible
must.MatchResponse(t, messagesRes, match.HTTPResponse{
JSON: []match.JSON{
match.JSONCheckOff("chunk", []interface{}{eventA.EventID, eventB.EventID}, match.CheckOffMapper(func(r gjson.Result) interface{} {
return r.Get("event_id").Str
}), match.CheckOffAllowUnwanted()),
},
})
})
})
})
}
type eventTime struct {
EventID string
BeforeTimestamp time.Time
AfterTimestamp time.Time
}
var txnCounter int64 = 0
func getTxnID(prefix string) (txnID string) {
txnId := fmt.Sprintf("%s-%d", prefix, atomic.LoadInt64(&txnCounter))
atomic.AddInt64(&txnCounter, 1)
return txnId
}
func createTestRoom(t *testing.T, c *client.CSAPI) (roomID string, eventA, eventB *eventTime) {
t.Helper()
roomID = c.MustCreateRoom(t, map[string]interface{}{
"preset": "public_chat",
})
timeBeforeEventA := time.Now()
eventAID := c.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "Message A",
},
})
timeAfterEventA := time.Now()
eventBID := c.SendEventSynced(t, roomID, b.Event{
Type: "m.room.message",
Content: map[string]interface{}{
"msgtype": "m.text",
"body": "Message B",
},
})
timeAfterEventB := time.Now()
eventA = &eventTime{EventID: eventAID, BeforeTimestamp: timeBeforeEventA, AfterTimestamp: timeAfterEventA}
eventB = &eventTime{EventID: eventBID, BeforeTimestamp: timeAfterEventA, AfterTimestamp: timeAfterEventB}
return roomID, eventA, eventB
}
func sendMessageWithTimestamp(t *testing.T, as *client.CSAPI, c *client.CSAPI, roomID string, messageTime time.Time, message string) (messageEventID string) {
t.Helper()
timestamp := makeTimestampFromTime(messageTime)
timestampString := strconv.FormatInt(timestamp, 10)
// We have to use an application service user because they are the only one
// allowed to use the `?ts` query parameter.
//
// We can't use as.SendEventSynced(...) because application services can't use
// the /sync API.
sendRes := as.Do(t, "PUT", []string{"_matrix", "client", "v3", "rooms", roomID, "send", "m.room.message", getTxnID("sendMessageWithTimestamp-txn")}, client.WithContentType("application/json"), client.WithJSONBody(t, map[string]interface{}{
"body": message,
"msgtype": "m.text",
}), client.WithQueries(url.Values{
"ts": []string{timestampString},
}))
sendBody := client.ParseJSON(t, sendRes)
messageEventID = client.GetJSONFieldStr(t, sendBody, "event_id")
// Make sure the imported event has reached the homeserver
c.MustSyncUntil(t, client.SyncReq{}, client.SyncTimelineHas(roomID, func(ev gjson.Result) bool {
return ev.Get("event_id").Str == messageEventID
}))
return
}
// Fetch event from /timestamp_to_event and ensure it matches the expectedEventId
func mustCheckEventisReturnedForTime(t *testing.T, c *client.CSAPI, roomID string, givenTime time.Time, direction string, expectedEventId string) {
t.Helper()
givenTimestamp := makeTimestampFromTime(givenTime)
timestampString := strconv.FormatInt(givenTimestamp, 10)
timestampToEventRes := c.Do(t, "GET", []string{"_matrix", "client", "v1", "rooms", roomID, "timestamp_to_event"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"ts": []string{timestampString},
"dir": []string{direction},
}))
timestampToEventResBody := client.ParseJSON(t, timestampToEventRes)
// Only allow a 200 response meaning we found an event or when no `expectedEventId` is provided, a
// 404 meaning we didn't find anything. Other status codes will throw and are assumed to
// be application errors.
actualEventId := ""
if timestampToEventRes.StatusCode == 200 {
actualEventId = client.GetJSONFieldStr(t, timestampToEventResBody, "event_id")
} else if timestampToEventRes.StatusCode != 404 || (timestampToEventRes.StatusCode == 404 && expectedEventId != "") {
t.Fatalf("mustCheckEventisReturnedForTime: /timestamp_to_event request failed with status=%d body=%s", timestampToEventRes.StatusCode, string(timestampToEventResBody))
}
if actualEventId != expectedEventId {
debugMessageList := getDebugMessageListFromMessagesResponse(t, c, roomID, expectedEventId, actualEventId, givenTimestamp)
t.Fatalf(
"Want %s given %s but got %s\n%s",
decorateStringWithAnsiColor(expectedEventId, AnsiColorGreen),
decorateStringWithAnsiColor(timestampString, AnsiColorYellow),
decorateStringWithAnsiColor(actualEventId, AnsiColorRed),
debugMessageList,
)
}
}
func fetchUntilMessagesResponseHas(t *testing.T, c *client.CSAPI, roomID string, check func(gjson.Result) bool) {
t.Helper()
start := time.Now()
checkCounter := 0
for {
if time.Since(start) > c.SyncUntilTimeout {
t.Fatalf("fetchUntilMessagesResponseHas timed out. Called check function %d times", checkCounter)
}
messagesRes := c.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
"dir": []string{"b"},
"limit": []string{"100"},
}))
messsageResBody := client.ParseJSON(t, messagesRes)
wantKey := "chunk"
keyRes := gjson.GetBytes(messsageResBody, wantKey)
if !keyRes.Exists() {
t.Fatalf("missing key '%s'", wantKey)
}
if !keyRes.IsArray() {
t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type)
}
events := keyRes.Array()
for _, ev := range events {
if check(ev) {
return
}
}
checkCounter++
// Add a slight delay so we don't hammmer the messages endpoint
time.Sleep(500 * time.Millisecond)
}
}
func getDebugMessageListFromMessagesResponse(t *testing.T, c *client.CSAPI, roomID string, expectedEventId string, actualEventId string, givenTimestamp int64) string {
t.Helper()
messagesRes := c.MustDo(t, "GET", []string{"_matrix", "client", "v3", "rooms", roomID, "messages"}, client.WithContentType("application/json"), client.WithQueries(url.Values{
// The events returned will be from the newest -> oldest since we're going backwards
"dir": []string{"b"},
"limit": []string{"100"},
}))
messsageResBody := client.ParseJSON(t, messagesRes)
wantKey := "chunk"
keyRes := gjson.GetBytes(messsageResBody, wantKey)
if !keyRes.Exists() {
t.Fatalf("missing key '%s'", wantKey)
}
if !keyRes.IsArray() {
t.Fatalf("key '%s' is not an array (was %s)", wantKey, keyRes.Type)
}
// Make the events go from oldest-in-time -> newest-in-time
events := keyRes.Array()
slices.Reverse(events)
if len(events) == 0 {
t.Fatalf(
"getDebugMessageListFromMessagesResponse found no messages in the room(%s).",
roomID,
)
}
// We need some padding for some lines to make them all align with the label.
// Pad this out so it equals whatever the longest label is.
paddingString := " "
resultantString := fmt.Sprintf("%s-- oldest events --\n", paddingString)
givenTimestampAlreadyInserted := false
givenTimestampMarker := decorateStringWithAnsiColor(fmt.Sprintf("%s-- givenTimestamp=%s --\n", paddingString, strconv.FormatInt(givenTimestamp, 10)), AnsiColorYellow)
// We're iterating over the events from oldest-in-time -> newest-in-time
for _, ev := range events {
// As we go, keep checking whether the givenTimestamp is
// older(before-in-time) than the current event and insert a timestamp
// marker as soon as we find the spot
if givenTimestamp < ev.Get("origin_server_ts").Int() && !givenTimestampAlreadyInserted {
resultantString += givenTimestampMarker
givenTimestampAlreadyInserted = true
}
eventID := ev.Get("event_id").String()
eventIDString := eventID
labelString := paddingString
if eventID == expectedEventId {
eventIDString = decorateStringWithAnsiColor(eventID, AnsiColorGreen)
labelString = "(want) "
} else if eventID == actualEventId {
eventIDString = decorateStringWithAnsiColor(eventID, AnsiColorRed)
labelString = " (got) "
}
resultantString += fmt.Sprintf(
"%s%s (%s) - %s\n",
labelString,
eventIDString,
strconv.FormatInt(ev.Get("origin_server_ts").Int(), 10),
ev.Get("type").String(),
)
}
// The givenTimestamp could be newer(after-in-time) than any of the other events
if givenTimestamp > events[len(events)-1].Get("origin_server_ts").Int() && !givenTimestampAlreadyInserted {
resultantString += givenTimestampMarker
givenTimestampAlreadyInserted = true
}
resultantString += fmt.Sprintf("%s-- newest events --\n", paddingString)
return resultantString
}
func makeTimestampFromTime(t time.Time) int64 {
return t.UnixNano() / int64(time.Millisecond)
}
const AnsiColorRed string = "31"
const AnsiColorGreen string = "32"
const AnsiColorYellow string = "33"
func decorateStringWithAnsiColor(inputString, decorationColor string) string {
return fmt.Sprintf("\033[%sm%s\033[0m", decorationColor, inputString)
}