Send attachments in order

Not perfect. It means we lose paralellism of uploads which is maybe bad?
If any one errors the subsequent ones stay queued I think. If you're
offline or similar then it all gets written to the db and we stil lose
ordering.

May still be worth it, worth testing.

(cherry picked from commit 0683b065f5a99d214e343fa92aa468b62e1cada8)
This commit is contained in:
Stephen Paul Weber 2024-11-20 05:49:39 +01:00 committed by Arne
parent 1b57462cdb
commit 2d76569b9f
5 changed files with 116 additions and 87 deletions

View file

@ -111,6 +111,10 @@ public class HttpConnectionManager extends AbstractConnectionManager {
}
public void createNewUploadConnection(final Message message, boolean delay) {
createNewUploadConnection(message, delay, null);
}
public void createNewUploadConnection(final Message message, boolean delay, final Runnable cb) {
synchronized (this.uploadConnections) {
for (HttpUploadConnection connection : this.uploadConnections) {
if (connection.getMessage() == message) {
@ -118,7 +122,7 @@ public class HttpConnectionManager extends AbstractConnectionManager {
return;
}
}
HttpUploadConnection connection = new HttpUploadConnection(message, Method.determine(message.getConversation().getAccount()), this);
HttpUploadConnection connection = new HttpUploadConnection(message, Method.determine(message.getConversation().getAccount()), this, cb);
connection.init(delay);
this.uploadConnections.add(connection);
}

View file

@ -52,12 +52,14 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
private long transmitted = 0;
private Call mostRecentCall;
private ListenableFuture<SlotRequester.Slot> slotFuture;
private Runnable cb;
public HttpUploadConnection(Message message, Method method, HttpConnectionManager httpConnectionManager) {
public HttpUploadConnection(Message message, Method method, HttpConnectionManager httpConnectionManager, Runnable cb) {
this.message = message;
this.method = method;
this.mHttpConnectionManager = httpConnectionManager;
this.mXmppConnectionService = httpConnectionManager.getXmppConnectionService();
this.cb = cb;
}
@Override
@ -104,6 +106,7 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
final Future<SlotRequester.Slot> slotFuture = this.slotFuture;
final boolean cancelled = (call != null && call.isCanceled()) || (slotFuture != null && slotFuture.isCancelled());
mXmppConnectionService.markMessage(message, Message.STATUS_SEND_FAILED, cancelled ? Message.ERROR_MESSAGE_CANCELLED : errorMessage);
if (cb != null) cb.run();
}
private void finish() {
@ -191,7 +194,7 @@ public class HttpUploadConnection implements Transferable, AbstractConnectionMan
if (!message.isPrivateMessage()) {
message.setCounterpart(message.getConversation().getJid().asBareJid());
}
mXmppConnectionService.resendMessage(message, delayed);
mXmppConnectionService.resendMessage(message, delayed, cb);
} else {
Log.d(Config.LOGTAG, "http upload failed because response code was " + code);
fail("http upload failed because response code was " + code);

View file

@ -63,8 +63,7 @@ public class AttachFileToConversationRunnable implements Runnable, TranscoderLis
final int encryption = message.getEncryption();
mXmppConnectionService.getHttpConnectionManager().createNewDownloadConnection(message, false, (file) -> {
message.setEncryption(encryption);
mXmppConnectionService.sendMessage(message);
callback.success(message);
mXmppConnectionService.sendMessage(message, () -> callback.success(message));
});
} else if (path != null && !FileBackend.isPathBlacklisted(path)) {
message.setRelativeFilePath(path);
@ -72,8 +71,7 @@ public class AttachFileToConversationRunnable implements Runnable, TranscoderLis
if (message.getEncryption() == Message.ENCRYPTION_DECRYPTED) {
mXmppConnectionService.getPgpEngine().encrypt(message, callback);
} else {
mXmppConnectionService.sendMessage(message);
callback.success(message);
mXmppConnectionService.sendMessage(message, () -> callback.success(message));
}
} else {
try {
@ -87,8 +85,7 @@ public class AttachFileToConversationRunnable implements Runnable, TranscoderLis
callback.error(R.string.unable_to_connect_to_keychain, null);
}
} else {
mXmppConnectionService.sendMessage(message);
callback.success(message);
mXmppConnectionService.sendMessage(message, () -> callback.success(message));
}
} catch (FileBackend.FileCopyException e) {
callback.error(e.getResId(), message);
@ -163,8 +160,7 @@ public class AttachFileToConversationRunnable implements Runnable, TranscoderLis
if (message.getEncryption() == Message.ENCRYPTION_DECRYPTED) {
mXmppConnectionService.getPgpEngine().encrypt(message, callback);
} else {
mXmppConnectionService.sendMessage(message);
callback.success(message);
mXmppConnectionService.sendMessage(message, () -> callback.success(message));
}
}

View file

@ -747,8 +747,7 @@ public class XmppConnectionService extends Service {
callback.error(R.string.unable_to_connect_to_keychain, null);
}
} else {
sendMessage(message);
callback.success(message);
sendMessage(message, false, false, false, () -> callback.success(message));
}
});
}
@ -1912,22 +1911,27 @@ public class XmppConnectionService extends Service {
}
}
private void sendFileMessage(final Message message, final boolean delay) {
private void sendFileMessage(final Message message, final boolean delay, final Runnable cb) {
Log.d(Config.LOGTAG, "send file message");
final Account account = message.getConversation().getAccount();
if (account.httpUploadAvailable(fileBackend.getFile(message, false).getSize())
|| message.getConversation().getMode() == Conversation.MODE_MULTI) {
mHttpConnectionManager.createNewUploadConnection(message, delay);
mHttpConnectionManager.createNewUploadConnection(message, delay, cb);
} else {
mJingleConnectionManager.startJingleFileTransfer(message);
if (cb != null) cb.run();
}
}
public void sendMessage(final Message message) {
sendMessage(message, false, false, false);
sendMessage(message, false, false, false, null);
}
private void sendMessage(final Message message, final boolean resend, final boolean previewedLinks, final boolean delay) {
public void sendMessage(final Message message, final Runnable cb) {
sendMessage(message, false, false, false, cb);
}
private void sendMessage(final Message message, final boolean resend, final boolean previewedLinks, final boolean delay, final Runnable cb) {
final Account account = message.getConversation().getAccount();
if (account.setShowErrorNotification(true)) {
databaseBackend.updateAccount(account);
@ -1936,7 +1940,6 @@ public class XmppConnectionService extends Service {
final Conversation conversation = (Conversation) message.getConversation();
account.deactivateGracePeriod();
if (QuickConversationsService.isQuicksy() && conversation.getMode() == Conversation.MODE_SINGLE) {
final Contact contact = conversation.getContact();
if (!contact.showInRoster() && contact.getOption(Contact.Options.SYNCED_VIA_OTHER)) {
@ -2014,7 +2017,7 @@ public class XmppConnectionService extends Service {
getHttpConnectionManager().createNewDownloadConnection(message, false, (file) -> {
message.setEncryption(encryption);
synchronized (message.getConversation()) {
if (message.getStatus() == Message.STATUS_WAITING) sendMessage(message, true, true, false);
if (message.getStatus() == Message.STATUS_WAITING) sendMessage(message, true, true, false, cb);
}
});
return;
@ -2068,13 +2071,14 @@ public class XmppConnectionService extends Service {
}
}
synchronized (message.getConversation()) {
if (message.getStatus() == Message.STATUS_WAITING) sendMessage(message, true, true, false);
if (message.getStatus() == Message.STATUS_WAITING) sendMessage(message, true, true, false, cb);
}
});
}
}
}
boolean passedCbOn = false;
if (account.isOnlineAndConnected() && !inProgressJoin && !waitForPreview && message.getTimeSent() <= System.currentTimeMillis()) {
switch (message.getEncryption()) {
case Message.ENCRYPTION_NONE:
@ -2082,7 +2086,8 @@ public class XmppConnectionService extends Service {
if (account.httpUploadAvailable(fileBackend.getFile(message, false).getSize())
|| conversation.getMode() == Conversation.MODE_MULTI
|| message.fixCounterpart()) {
this.sendFileMessage(message, delay);
this.sendFileMessage(message, delay, cb);
passedCbOn = true;
} else {
break;
}
@ -2096,7 +2101,8 @@ public class XmppConnectionService extends Service {
if (account.httpUploadAvailable(fileBackend.getFile(message, false).getSize())
|| conversation.getMode() == Conversation.MODE_MULTI
|| message.fixCounterpart()) {
this.sendFileMessage(message, delay);
this.sendFileMessage(message, delay, cb);
passedCbOn = true;
} else {
break;
}
@ -2134,7 +2140,8 @@ public class XmppConnectionService extends Service {
if (account.httpUploadAvailable(fileBackend.getFile(message, false).getSize())
|| conversation.getMode() == Conversation.MODE_MULTI
|| message.fixCounterpart()) {
this.sendFileMessage(message, delay);
this.sendFileMessage(message, delay, cb);
passedCbOn = true;
} else {
break;
}
@ -2172,6 +2179,7 @@ public class XmppConnectionService extends Service {
Log.e(Config.LOGTAG, "error updated message in DB after edit");
}
updateConversationUi();
if (!waitForPreview && cb != null) cb.run();
return;
} else {
databaseBackend.createMessage(message);
@ -2242,6 +2250,7 @@ public class XmppConnectionService extends Service {
if (message.getConversation() instanceof Conversation) presenceToMuc((Conversation) message.getConversation());
}
}
if (!waitForPreview && !passedCbOn && cb != null) { Log.d("WUT cb", message.getRawBody()); cb.run(); }
}
private boolean isJoinInProgress(final Conversation conversation) {
@ -2268,11 +2277,15 @@ public class XmppConnectionService extends Service {
}
public void resendMessage(final Message message, final boolean delay) {
sendMessage(message, true, false, delay);
sendMessage(message, true, false, delay, null);
}
public void resendMessage(final Message message, final boolean delay, final Runnable cb) {
sendMessage(message, true, false, delay, cb);
}
public void resendMessage(final Message message, final boolean delay, final boolean previewedLinks) {
sendMessage(message, true, previewedLinks, delay);
sendMessage(message, true, previewedLinks, delay, null);
}
public Pair<Account,Account> onboardingIncomplete() {

View file

@ -1076,7 +1076,7 @@ public class ConversationFragment extends XmppFragment
});
}
private void attachFileToConversation(Conversation conversation, Uri uri, String type) {
private void attachFileToConversation(Conversation conversation, Uri uri, String type, Runnable next) {
if (conversation == null) {
return;
}
@ -1100,10 +1100,14 @@ public class ConversationFragment extends XmppFragment
@Override
public void success(Message message) {
if (next == null) {
runOnUiThread(() -> {
activity.hideToast();
messageSent();
});
} else {
runOnUiThread(next);
}
hidePrepareFileToast(prepareFileToast);
}
@ -1126,7 +1130,7 @@ public class ConversationFragment extends XmppFragment
toggleInputMethod();
}
private void attachImageToConversation(Conversation conversation, Uri uri, String type) {
private void attachImageToConversation(Conversation conversation, Uri uri, String type, Runnable next) {
if (conversation == null) {
return;
}
@ -1150,7 +1154,11 @@ public class ConversationFragment extends XmppFragment
@Override
public void success(Message message) {
hidePrepareFileToast(prepareFileToast);
if (next == null) {
runOnUiThread(() -> messageSent());
} else {
runOnUiThread(next);
}
}
@Override
@ -1506,26 +1514,31 @@ public class ConversationFragment extends XmppFragment
}
final PresenceSelector.OnPresenceSelected callback =
() -> {
for (Iterator<Attachment> i = attachments.iterator(); i.hasNext(); i.remove()) {
final Iterator<Attachment> i = attachments.iterator();
final Runnable next = new Runnable() {
@Override
public void run() {
final Attachment attachment = i.next();
if (attachment.getType() == Attachment.Type.LOCATION) {
attachLocationToConversation(conversation, attachment.getUri());
if (i.hasNext()) runOnUiThread(this);
} else if (attachment.getType() == Attachment.Type.IMAGE) {
Log.d(
Config.LOGTAG,
"ConversationsActivity.commitAttachments() - attaching image to conversations. CHOOSE_IMAGE");
attachImageToConversation(
conversation, attachment.getUri(), attachment.getMime());
attachImageToConversation(conversation, attachment.getUri(), attachment.getMime(), i.hasNext() ? this : null);
} else {
Log.d(
Config.LOGTAG,
"ConversationsActivity.commitAttachments() - attaching file to conversations. CHOOSE_FILE/RECORD_VOICE/RECORD_VIDEO");
attachFileToConversation(
conversation, attachment.getUri(), attachment.getMime());
}
attachFileToConversation(conversation, attachment.getUri(), attachment.getMime(), i.hasNext() ? this : null);
}
i.remove();
mediaPreviewAdapter.notifyDataSetChanged();
toggleInputMethod();
}
};
next.run();
};
if (conversation == null
|| conversation.getMode() == Conversation.MODE_MULTI