From 0f9058ffefb6c1481ffc0c517a235d301a158fe1 Mon Sep 17 00:00:00 2001 From: Daniel Gultsch Date: Thu, 17 Dec 2015 15:19:58 +0100 Subject: throw exception at the end of the stream --- .../siacs/conversations/xmpp/XmppConnection.java | 250 ++++++++++----------- 1 file changed, 122 insertions(+), 128 deletions(-) (limited to 'src/main/java/eu') diff --git a/src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java b/src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java index efb67072..af3c7533 100644 --- a/src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java +++ b/src/main/java/eu/siacs/conversations/xmpp/XmppConnection.java @@ -357,135 +357,129 @@ public class XmppConnection implements Runnable { } private void processStream() throws XmlPullParserException, IOException, NoSuchAlgorithmException { - Tag nextTag = tagReader.readTag(); - while (nextTag != null && !nextTag.isEnd("stream")) { - if (nextTag.isStart("error")) { - processStreamError(nextTag); - } else if (nextTag.isStart("features")) { - processStreamFeatures(nextTag); - } else if (nextTag.isStart("proceed")) { - switchOverToTls(nextTag); - } else if (nextTag.isStart("success")) { - final String challenge = tagReader.readElement(nextTag).getContent(); - try { - saslMechanism.getResponse(challenge); - } catch (final SaslMechanism.AuthenticationException e) { - disconnect(true); - Log.e(Config.LOGTAG, String.valueOf(e)); - } - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": logged in"); - account.setKey(Account.PINNED_MECHANISM_KEY, - String.valueOf(saslMechanism.getPriority())); - tagReader.reset(); - sendStartStream(); - final Tag tag = tagReader.readTag(); - if (tag != null && tag.isStart("stream")) { - processStream(); - } else { - throw new IOException("server didn't restart stream after successful auth"); - } - break; - } else if (nextTag.isStart("failure")) { - throw new UnauthorizedException(); - } else if (nextTag.isStart("challenge")) { - final String challenge = tagReader.readElement(nextTag).getContent(); - final Element response = new Element("response"); - response.setAttribute("xmlns", - "urn:ietf:params:xml:ns:xmpp-sasl"); - try { - response.setContent(saslMechanism.getResponse(challenge)); - } catch (final SaslMechanism.AuthenticationException e) { - // TODO: Send auth abort tag. - Log.e(Config.LOGTAG, e.toString()); - } - tagWriter.writeElement(response); - } else if (nextTag.isStart("enabled")) { - final Element enabled = tagReader.readElement(nextTag); - if ("true".equals(enabled.getAttribute("resume"))) { - this.streamId = enabled.getAttribute("id"); - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() - + ": stream managment(" + smVersion - + ") enabled (resumable)"); - } else { - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() - + ": stream management(" + smVersion + ") enabled"); - } - this.stanzasReceived = 0; - final RequestPacket r = new RequestPacket(smVersion); - tagWriter.writeStanzaAsync(r); - } else if (nextTag.isStart("resumed")) { - lastPacketReceived = SystemClock.elapsedRealtime(); - final Element resumed = tagReader.readElement(nextTag); - final String h = resumed.getAttribute("h"); - try { - final int serverCount = Integer.parseInt(h); - if (serverCount != stanzasSent) { - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() - + ": session resumed with lost packages"); - stanzasSent = serverCount; - } else { - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed"); - } - acknowledgeStanzaUpTo(serverCount); - ArrayList failedStanzas = new ArrayList<>(); - for(int i = 0; i < this.mStanzaQueue.size(); ++i) { - failedStanzas.add(mStanzaQueue.valueAt(i)); - } - mStanzaQueue.clear(); - Log.d(Config.LOGTAG,"resending "+failedStanzas.size()+" stanzas"); - for(AbstractAcknowledgeableStanza packet : failedStanzas) { - if (packet instanceof MessagePacket) { - MessagePacket message = (MessagePacket) packet; - mXmppConnectionService.markMessage(account, - message.getTo().toBareJid(), - message.getId(), - Message.STATUS_UNSEND); - } - sendPacket(packet); - } - } catch (final NumberFormatException ignored) { - } - Log.d(Config.LOGTAG, account.getJid().toBareJid()+ ": online with resource " + account.getResource()); - changeStatus(Account.State.ONLINE); - } else if (nextTag.isStart("r")) { - tagReader.readElement(nextTag); - if (Config.EXTENDED_SM_LOGGING) { - Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": acknowledging stanza #" + this.stanzasReceived); - } - final AckPacket ack = new AckPacket(this.stanzasReceived, smVersion); - tagWriter.writeStanzaAsync(ack); - } else if (nextTag.isStart("a")) { - final Element ack = tagReader.readElement(nextTag); - lastPacketReceived = SystemClock.elapsedRealtime(); - try { - final int serverSequence = Integer.parseInt(ack.getAttribute("h")); - acknowledgeStanzaUpTo(serverSequence); - } catch (NumberFormatException e) { - Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number"); - } - } else if (nextTag.isStart("failed")) { - tagReader.readElement(nextTag); - Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": resumption failed"); - streamId = null; - if (account.getStatus() != Account.State.ONLINE) { - sendBindRequest(); - } - } else if (nextTag.isStart("iq")) { - processIq(nextTag); - } else if (nextTag.isStart("message")) { - processMessage(nextTag); - } else if (nextTag.isStart("presence")) { - processPresence(nextTag); - } - nextTag = tagReader.readTag(); - } - Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": last tag was " + nextTag); - if (account.getStatus() == Account.State.ONLINE) { - account. setStatus(Account.State.OFFLINE); - if (statusListener != null) { - statusListener.onStatusChanged(account); - } + Tag nextTag = tagReader.readTag(); + while (nextTag != null && !nextTag.isEnd("stream")) { + if (nextTag.isStart("error")) { + processStreamError(nextTag); + } else if (nextTag.isStart("features")) { + processStreamFeatures(nextTag); + } else if (nextTag.isStart("proceed")) { + switchOverToTls(nextTag); + } else if (nextTag.isStart("success")) { + final String challenge = tagReader.readElement(nextTag).getContent(); + try { + saslMechanism.getResponse(challenge); + } catch (final SaslMechanism.AuthenticationException e) { + disconnect(true); + Log.e(Config.LOGTAG, String.valueOf(e)); + } + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": logged in"); + account.setKey(Account.PINNED_MECHANISM_KEY, + String.valueOf(saslMechanism.getPriority())); + tagReader.reset(); + sendStartStream(); + final Tag tag = tagReader.readTag(); + if (tag != null && tag.isStart("stream")) { + processStream(); + } else { + throw new IOException("server didn't restart stream after successful auth"); + } + break; + } else if (nextTag.isStart("failure")) { + throw new UnauthorizedException(); + } else if (nextTag.isStart("challenge")) { + final String challenge = tagReader.readElement(nextTag).getContent(); + final Element response = new Element("response"); + response.setAttribute("xmlns", + "urn:ietf:params:xml:ns:xmpp-sasl"); + try { + response.setContent(saslMechanism.getResponse(challenge)); + } catch (final SaslMechanism.AuthenticationException e) { + // TODO: Send auth abort tag. + Log.e(Config.LOGTAG, e.toString()); + } + tagWriter.writeElement(response); + } else if (nextTag.isStart("enabled")) { + final Element enabled = tagReader.readElement(nextTag); + if ("true".equals(enabled.getAttribute("resume"))) { + this.streamId = enabled.getAttribute("id"); + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + + ": stream managment(" + smVersion + + ") enabled (resumable)"); + } else { + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + + ": stream management(" + smVersion + ") enabled"); + } + this.stanzasReceived = 0; + final RequestPacket r = new RequestPacket(smVersion); + tagWriter.writeStanzaAsync(r); + } else if (nextTag.isStart("resumed")) { + lastPacketReceived = SystemClock.elapsedRealtime(); + final Element resumed = tagReader.readElement(nextTag); + final String h = resumed.getAttribute("h"); + try { + final int serverCount = Integer.parseInt(h); + if (serverCount != stanzasSent) { + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + + ": session resumed with lost packages"); + stanzasSent = serverCount; + } else { + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": session resumed"); + } + acknowledgeStanzaUpTo(serverCount); + ArrayList failedStanzas = new ArrayList<>(); + for(int i = 0; i < this.mStanzaQueue.size(); ++i) { + failedStanzas.add(mStanzaQueue.valueAt(i)); + } + mStanzaQueue.clear(); + Log.d(Config.LOGTAG,"resending "+failedStanzas.size()+" stanzas"); + for(AbstractAcknowledgeableStanza packet : failedStanzas) { + if (packet instanceof MessagePacket) { + MessagePacket message = (MessagePacket) packet; + mXmppConnectionService.markMessage(account, + message.getTo().toBareJid(), + message.getId(), + Message.STATUS_UNSEND); } + sendPacket(packet); + } + } catch (final NumberFormatException ignored) { + } + Log.d(Config.LOGTAG, account.getJid().toBareJid()+ ": online with resource " + account.getResource()); + changeStatus(Account.State.ONLINE); + } else if (nextTag.isStart("r")) { + tagReader.readElement(nextTag); + if (Config.EXTENDED_SM_LOGGING) { + Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": acknowledging stanza #" + this.stanzasReceived); + } + final AckPacket ack = new AckPacket(this.stanzasReceived, smVersion); + tagWriter.writeStanzaAsync(ack); + } else if (nextTag.isStart("a")) { + final Element ack = tagReader.readElement(nextTag); + lastPacketReceived = SystemClock.elapsedRealtime(); + try { + final int serverSequence = Integer.parseInt(ack.getAttribute("h")); + acknowledgeStanzaUpTo(serverSequence); + } catch (NumberFormatException e) { + Log.d(Config.LOGTAG,account.getJid().toBareJid()+": server send ack without sequence number"); + } + } else if (nextTag.isStart("failed")) { + tagReader.readElement(nextTag); + Log.d(Config.LOGTAG, account.getJid().toBareJid().toString() + ": resumption failed"); + streamId = null; + if (account.getStatus() != Account.State.ONLINE) { + sendBindRequest(); + } + } else if (nextTag.isStart("iq")) { + processIq(nextTag); + } else if (nextTag.isStart("message")) { + processMessage(nextTag); + } else if (nextTag.isStart("presence")) { + processPresence(nextTag); + } + nextTag = tagReader.readTag(); + } + throw new IOException("reached end of stream. last tag was "+nextTag); } private void acknowledgeStanzaUpTo(int serverCount) { -- cgit v1.2.3