feat: Support processing concurrent background transactions #1428
No reviewers
Labels
No labels
Blocked
Bug
Cherry-picking
Database
Dependencies
Dependencies/Renovate
Difficulty
Easy
Difficulty
Hard
Difficulty
Medium
Documentation
Enhancement
Good first issue
Help wanted
Inherited
Matrix/Administration
Matrix/Appservices
Matrix/Auth
Matrix/Client
Matrix/Core
Matrix/E2EE
Matrix/Federation
Matrix/Hydra
Matrix/MSC
Matrix/Media
Matrix/T&S
Meta
Meta/CI
Meta/Packaging
Priority
Blocking
Priority
High
Priority
Low
Security
Status
Confirmed
Status
Duplicate
Status
Invalid
Status
Needs Investigation
Support
To-Merge
Wont fix
old/ci/cd
old/rust
No project
No assignees
3 participants
Notifications
Due date
No due date set.
Dependencies
No dependencies set.
Reference
continuwuation/continuwuity!1428
Loading…
Add table
Add a link
Reference in a new issue
No description provided.
Delete branch "nex/feat/better-inbound-txn-handle"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
This pull request fixes an issue where servers that send you transactions with events for large rooms may time out, causing them to back off on your server. This can result in things like missed encryption keys, sometimes for weeks on end. This also causes huge amounts of repeated work, which can mean your server may be burning through CPU and RAM for no real reason.
It fixes this by adding some new constraints:
This is a huge performance and reliability improvement for servers that are in large or old rooms that may take a long time to process.
Pull request checklist:
mainbranch, and the branch is named something other thanmain.myself, if applicable. This includes ensuring code compiles.
process_inbound_transaction879da73d904c506df99fWarning to prospective testers and anyone tempted to review this before I mark it as ready: while this does work (and it instantly shows massive improvements, I've got it deployed to my main server rn) the caches are unbounded in size and the channels are potentially leaky with a capacity to deadlock until I re-add proper error handling to the internal handle call. There should not be explosions but have a fire extinguisher on standby.
max_active_txnsactually configurable 333bc67550!admin federation incoming-federation1ed87b0adaWIP: feat: Support processing concurrent background transactionsto feat: Support processing concurrent background transactions@ -67,0 +72,4 @@return Ok(response);}// Or are currently processing itif let Some(receiver) = services.transaction_ids.get_active_federation_txn(&txn_key) {There's technically a race condition here but I don't care that much
It seems to only result in an error rather than duplicate transactions,
Yeah, that's something I accounted for. The chances of this race condition actually being triggered are slim to none, and because of the way
get_active_federation_txnholds the lock anyway, it'd just result in the duplicate being rejected.b8f22af64247fd9ea6ed@ -209,0 +301,4 @@if let Err(e) = services.server.check_running() {debug_warn!("Server shutting down, returning partial transaction results: {e}");results.push((event_id, Err(e)));results.extend(event_ids.map(|id| (id, Err(err!("Server is shutting down")))));We should not allow partial failures in this condition - servers don't use returned transaction errors for anything but debugging at the moment, so this will induce event loss. A 5XX error (semantically 503) should be used to get the sender to re-try the whole transaction - re-processing part of the transaction is better than losing part of it.
Ah, didn't know that.
Should be better now
@ -6,0 +27,4 @@/// Minimum interval between cache cleanup runs./// Exists to prevent thrashing when the cache is full of things that can't be/// clearedconst CLEANUP_INTERVAL_SECS: u64 = 30;60 seconds would make my brain happier but i don't think it matters that much
:3
05f8536a18914a8ab2eb@ -131,0 +222,4 @@/// Converts a TransactionError into an appropriate HTTP error response.fn transaction_error_to_response(err: &TransactionError) -> Error {match err {| TransactionError::ShuttingDown => Error::Request(Isn't a match with one arm a bit redundant
This is meant to be if we actually get any other error types. Not sure if that'll actually happen tho
@ -6,0 +37,4 @@impl fmt::Display for TransactionError {fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {match self {| Self::ShuttingDown => write!(f, "Server is shutting down"),again a single-arm match feels redundant
914a8ab2eb92351df925I am nex and I approve this pull request ✅
very solid PR in general :3 just a few nits. also consider renaming the service to just
transactions@ -89,0 +110,4 @@async fn wait_for_result(mut recv: Receiver<WrappedTransactionResponse>,) -> Result<send_transaction_message::v1::Response> {if tokio::time::timeout(Duration::from_secs(50), recv.changed())why 50 seconds 🧌
Synapse only waits for 60 seconds, elsewhere (and in my own patches) I use 55 seconds, but there needs to be a few seconds of leeway to return the response within the deadline. Since a timeout means the sender will just retry the now de-duplicated transaction immediately after, this doesn't really have any issues
Or more concisely: waiting the exact amount of time we think the sender is also going to wait may result in them disconnecting before we can finish returning the response to them
sounds good 👍
@ -131,0 +216,4 @@// Send the error to any waiterssender.send(Some(Err(err))).expect("couldn't send error to channel");this probably shouldn't be an expect(), it could panic if (for example) there's only one waiter and it hits the timeout and stops listening. I think this should be a
let _ =The channels are buffered aren't they? Not sure why sending to a sender with no receiver would cause a panic, but maybe I haven't read the docs enough
https://docs.rs/tokio/latest/tokio/sync/watch/struct.Sender.html#method.send:~:text=This%20method%20fails%20if%20the%20channel%20is%20closed%2C%20which%20is%20the%20case%20when%20every%20receiver%20has%20been%20dropped%2E
yeah okay I just didn't read the docs for it then lmao
@ -172,0 +276,4 @@/// dependencies, however it is ultimately the sender's responsibility to send/// them in a processable order, so this is just a best effort attempt. It does/// not account for power levels or other tie breaks.async fn build_local_dag(this will panic if the
CanonicalJsonObjectis shaped weird, is that a good thing?don't send weird shaped objects
yes but what if someone does
their transaction will fail
@ -54,0 +192,4 @@let max_active_txns = self.services.config.max_concurrent_inbound_transactions;// Check if we're at capacityif state.len() >= max_active_txnscould a semaphore be used for this capacity logic?
Wouldn't a semaphore imply waiting for a free slot rather than rejecting when there's no more free slots
shrug. I guess you know what you're doing
@ -54,0 +243,4 @@.send(Some(Ok(response))).expect("couldn't send response to channel");// explicitly closeinconsistent comment capitalization 🤨
transaction_ids->transactions