From 08023210ba875f3cd088eca2b4b3df7410966344 Mon Sep 17 00:00:00 2001 From: Daniel Gultsch Date: Mon, 10 Mar 2014 19:22:13 +0100 Subject: basic stream managment functionality --- .../siacs/conversations/xmpp/XmppConnection.java | 141 +++++++++++++++++---- 1 file changed, 113 insertions(+), 28 deletions(-) (limited to 'src/eu/siacs/conversations/xmpp/XmppConnection.java') diff --git a/src/eu/siacs/conversations/xmpp/XmppConnection.java b/src/eu/siacs/conversations/xmpp/XmppConnection.java index 53f9b85a7..2ffe21fb1 100644 --- a/src/eu/siacs/conversations/xmpp/XmppConnection.java +++ b/src/eu/siacs/conversations/xmpp/XmppConnection.java @@ -26,8 +26,10 @@ import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; import javax.net.ssl.X509TrustManager; +import org.json.JSONException; import org.xmlpull.v1.XmlPullParserException; +import android.content.IntentSender.SendIntentException; import android.os.Bundle; import android.os.PowerManager; import android.util.Log; @@ -38,6 +40,14 @@ import eu.siacs.conversations.xml.Element; import eu.siacs.conversations.xml.Tag; import eu.siacs.conversations.xml.TagWriter; import eu.siacs.conversations.xml.XmlReader; +import eu.siacs.conversations.xmpp.stanzas.AbstractStanza; +import eu.siacs.conversations.xmpp.stanzas.IqPacket; +import eu.siacs.conversations.xmpp.stanzas.MessagePacket; +import eu.siacs.conversations.xmpp.stanzas.PresencePacket; +import eu.siacs.conversations.xmpp.stanzas.streammgmt.AckPacket; +import eu.siacs.conversations.xmpp.stanzas.streammgmt.EnablePacket; +import eu.siacs.conversations.xmpp.stanzas.streammgmt.RequestPacket; +import eu.siacs.conversations.xmpp.stanzas.streammgmt.ResumePacket; public class XmppConnection implements Runnable { @@ -56,6 +66,11 @@ public class XmppConnection implements Runnable { private boolean shouldAuthenticate = true; private Element streamFeatures; private HashSet discoFeatures = new HashSet(); + + private String streamId = null; + + private int stanzasReceived = 0; + private int stanzasSent = 0; private static final int PACKET_IQ = 0; private static final int PACKET_MESSAGE = 1; @@ -176,6 +191,34 @@ public class XmppConnection implements Runnable { } else if (nextTag.isStart("failure")) { Element failure = tagReader.readElement(nextTag); changeStatus(Account.STATUS_UNAUTHORIZED); + } else if (nextTag.isStart("enabled")) { + this.stanzasSent = 0; + Element enabled = tagReader.readElement(nextTag); + if ("true".equals(enabled.getAttribute("resume"))) { + this.streamId = enabled.getAttribute("id"); + Log.d(LOGTAG,account.getJid()+": stream managment enabled (resumable)"); + } else { + Log.d(LOGTAG,account.getJid()+": stream managment enabled"); + } + this.stanzasReceived = 0; + RequestPacket r = new RequestPacket(); + tagWriter.writeStanzaAsync(r); + } else if (nextTag.isStart("resumed")) { + tagReader.readElement(nextTag); + changeStatus(Account.STATUS_ONLINE); + Log.d(LOGTAG,account.getJid()+": session resumed"); + } else if (nextTag.isStart("r")) { + tagReader.readElement(nextTag); + AckPacket ack = new AckPacket(this.stanzasReceived); + //Log.d(LOGTAG,ack.toString()); + tagWriter.writeStanzaAsync(ack); + } else if (nextTag.isStart("a")) { + Element ack = tagReader.readElement(nextTag); + int serverSequence = Integer.parseInt(ack.getAttribute("h")); + if (serverSequence>this.stanzasSent) { + this.stanzasSent = serverSequence; + } + //Log.d(LOGTAG,"server ack"+ack.toString()+" ("+this.stanzasSent+")"); } else if (nextTag.isStart("iq")) { processIq(nextTag); } else if (nextTag.isStart("message")) { @@ -221,6 +264,7 @@ public class XmppConnection implements Runnable { } nextTag = tagReader.readTag(); } + ++stanzasReceived; return element; } @@ -271,7 +315,7 @@ public class XmppConnection implements Runnable { } } - private void sendStartTLS() { + private void sendStartTLS() throws IOException { Tag startTLS = Tag.empty("starttls"); startTLS.setAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-tls"); tagWriter.writeTag(startTLS); @@ -378,23 +422,43 @@ public class XmppConnection implements Runnable { } else if (this.streamFeatures.hasChild("mechanisms") && shouldAuthenticate) { sendSaslAuth(); - } - if (this.streamFeatures.hasChild("bind") && shouldBind) { + } else if (this.streamFeatures.hasChild("sm") && streamId != null) { + Log.d(LOGTAG,"found old stream id. trying to remuse"); + ResumePacket resume = new ResumePacket(this.streamId,stanzasReceived); + this.tagWriter.writeStanzaAsync(resume); + } else if (this.streamFeatures.hasChild("bind") && shouldBind) { sendBindRequest(); if (this.streamFeatures.hasChild("session")) { + Log.d(LOGTAG,"sending session"); IqPacket startSession = new IqPacket(IqPacket.TYPE_SET); Element session = new Element("session"); session.setAttribute("xmlns", "urn:ietf:params:xml:ns:xmpp-session"); session.setContent(""); startSession.addChild(session); - sendIqPacket(startSession, null); - tagWriter.writeElement(startSession); + this.sendIqPacket(startSession, null); } - Element presence = new Element("presence"); + } + } - tagWriter.writeElement(presence); + private void sendInitialPresence() { + PresencePacket packet = new PresencePacket(); + packet.setAttribute("from", account.getFullJid()); + if (account.getKeys().has("pgp_signature")) { + try { + String signature = account.getKeys().getString("pgp_signature"); + Element status = new Element("status"); + status.setContent("online"); + packet.addChild(status); + Element x = new Element("x"); + x.setAttribute("xmlns", "jabber:x:signed"); + x.setContent(signature); + packet.addChild(x); + } catch (JSONException e) { + // + } } + this.sendPresencePacket(packet); } private void sendBindRequest() throws IOException { @@ -412,10 +476,15 @@ public class XmppConnection implements Runnable { .getContent().split("/")[1]; account.setResource(resource); account.setStatus(Account.STATUS_ONLINE); + if (streamFeatures.hasChild("sm")) { + EnablePacket enable = new EnablePacket(); + tagWriter.writeStanzaAsync(enable); + } + sendInitialPresence(); + sendServiceDiscovery(); if (statusListener != null) { statusListener.onStatusChanged(account); } - sendServiceDiscovery(); } }); } @@ -471,7 +540,7 @@ public class XmppConnection implements Runnable { Log.d(LOGTAG, "processStreamError"); } - private void sendStartStream() { + private void sendStartStream() throws IOException { Tag stream = Tag.start("stream:stream"); stream.setAttribute("from", account.getJid()); stream.setAttribute("to", account.getServer()); @@ -489,39 +558,36 @@ public class XmppConnection implements Runnable { public void sendIqPacket(IqPacket packet, OnIqPacketReceived callback) { String id = nextRandomId(); packet.setAttribute("id", id); - tagWriter.writeElement(packet); - if (callback != null) { - packetCallbacks.put(id, callback); - } + this.sendPacket(packet, callback); } public void sendMessagePacket(MessagePacket packet) { - this.sendMessagePacket(packet, null); + this.sendPacket(packet, null); } public void sendMessagePacket(MessagePacket packet, OnMessagePacketReceived callback) { - String id = nextRandomId(); - packet.setAttribute("id", id); - tagWriter.writeElement(packet); - if (callback != null) { - packetCallbacks.put(id, callback); - } + this.sendPacket(packet, callback); } public void sendPresencePacket(PresencePacket packet) { - this.sendPresencePacket(packet, null); + this.sendPacket(packet, null); } - public PresencePacket sendPresencePacket(PresencePacket packet, + public void sendPresencePacket(PresencePacket packet, OnPresencePacketReceived callback) { - String id = nextRandomId(); - packet.setAttribute("id", id); - tagWriter.writeElement(packet); + this.sendPacket(packet, callback); + } + + private synchronized void sendPacket(final AbstractStanza packet, PacketReceived callback) { + ++stanzasSent; + tagWriter.writeStanzaAsync(packet); if (callback != null) { - packetCallbacks.put(id, callback); + if (packet.getId()==null) { + packet.setId(nextRandomId()); + } + packetCallbacks.put(packet.getId(), callback); } - return packet; } public void setOnMessagePacketReceivedListener( @@ -547,8 +613,23 @@ public class XmppConnection implements Runnable { this.tlsListener = listener; } - public void disconnect() { + public void disconnect(boolean force) { + Log.d(LOGTAG,"disconnecting"); + try { + if (force) { + socket.close(); + } + tagWriter.finish(); + while(!tagWriter.finished()) { + Log.d(LOGTAG,"not yet finished"); + Thread.sleep(100); + } tagWriter.writeTag(Tag.end("stream:stream")); + } catch (IOException e) { + Log.d(LOGTAG,"io exception during disconnect"); + } catch (InterruptedException e) { + Log.d(LOGTAG,"interupted while waiting for disconnect"); + } } public boolean hasFeatureRosterManagment() { @@ -558,4 +639,8 @@ public class XmppConnection implements Runnable { return this.streamFeatures.hasChild("ver"); } } + + public void r() { + this.tagWriter.writeStanzaAsync(new RequestPacket()); + } } -- cgit v1.2.3