fixed interupt handling

This commit is contained in:
Daniel Gultsch 2018-01-20 21:57:09 +01:00
parent 2b39acf352
commit 3a8855a672
2 changed files with 33 additions and 17 deletions

View file

@ -27,7 +27,9 @@ public class TagWriter {
try { try {
AbstractStanza output = writeQueue.take(); AbstractStanza output = writeQueue.take();
outputStream.write(output.toString()); outputStream.write(output.toString());
outputStream.flush(); if (writeQueue.size() == 0) {
outputStream.flush();
}
} catch (Exception e) { } catch (Exception e) {
return; return;
} }

View file

@ -37,6 +37,8 @@ import java.util.Hashtable;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Matcher; import java.util.regex.Matcher;
@ -145,6 +147,8 @@ public class XmppConnection implements Runnable {
private SaslMechanism saslMechanism; private SaslMechanism saslMechanism;
private URL redirectionUrl = null; private URL redirectionUrl = null;
private String verifiedHostname = null; private String verifiedHostname = null;
private Thread mThread;
private CountDownLatch mStreamCountDownLatch;
private class MyKeyManager implements X509KeyManager { private class MyKeyManager implements X509KeyManager {
@Override @Override
@ -502,7 +506,8 @@ public class XmppConnection implements Runnable {
@Override @Override
public void run() { public void run() {
synchronized (this) { synchronized (this) {
if (Thread.currentThread().isInterrupted()) { this.mThread = Thread.currentThread();
if (this.mThread.isInterrupted()) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": aborting connect because thread was interrupted"); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": aborting connect because thread was interrupted");
return; return;
} }
@ -512,6 +517,8 @@ public class XmppConnection implements Runnable {
} }
private void processStream() throws XmlPullParserException, IOException, NoSuchAlgorithmException { private void processStream() throws XmlPullParserException, IOException, NoSuchAlgorithmException {
final CountDownLatch streamCountDownLatch = new CountDownLatch(1);
this.mStreamCountDownLatch = streamCountDownLatch;
Tag nextTag = tagReader.readTag(); Tag nextTag = tagReader.readTag();
while (nextTag != null && !nextTag.isEnd("stream")) { while (nextTag != null && !nextTag.isEnd("stream")) {
if (nextTag.isStart("error")) { if (nextTag.isStart("error")) {
@ -681,6 +688,9 @@ public class XmppConnection implements Runnable {
} }
nextTag = tagReader.readTag(); nextTag = tagReader.readTag();
} }
if (nextTag != null && nextTag.isEnd("stream")) {
streamCountDownLatch.countDown();
}
} }
private void acknowledgeStanzaUpTo(int serverCount) { private void acknowledgeStanzaUpTo(int serverCount) {
@ -1460,7 +1470,9 @@ public class XmppConnection implements Runnable {
} }
public void interrupt() { public void interrupt() {
Thread.currentThread().interrupt(); if (this.mThread != null) {
this.mThread.interrupt();
}
} }
public void disconnect(final boolean force) { public void disconnect(final boolean force) {
@ -1469,28 +1481,30 @@ public class XmppConnection implements Runnable {
if (force) { if (force) {
forceCloseSocket(); forceCloseSocket();
} else { } else {
if (tagWriter.isActive()) { final TagWriter currentTagWriter = this.tagWriter;
tagWriter.finish(); if (currentTagWriter.isActive()) {
final Socket currentSocket = socket; currentTagWriter.finish();
final Socket currentSocket = this.socket;
final CountDownLatch streamCountDownLatch = this.mStreamCountDownLatch;
try { try {
for (int i = 0; i <= 10 && !tagWriter.finished() && !currentSocket.isClosed(); ++i) { for (int i = 0; i <= 10 && !currentTagWriter.finished() && !currentSocket.isClosed(); ++i) {
uninterruptedSleep(100); Thread.sleep(100);
} }
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": closing stream"); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": closing stream");
tagWriter.writeTag(Tag.end("stream:stream")); currentTagWriter.writeTag(Tag.end("stream:stream"));
for (int i = 0; i <= 20 && !currentSocket.isClosed(); ++i) { if (streamCountDownLatch != null) {
uninterruptedSleep(100); if (streamCountDownLatch.await(1, TimeUnit.SECONDS)) {
} Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": remote ended stream");
if (currentSocket.isClosed()) { } else {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": remote closed socket"); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": remote has not closed socket. force closing");
} else { }
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": remote has not closed socket. force closing");
} }
} catch (InterruptedException e) {
Log.d(Config.LOGTAG,account.getJid().toBareJid()+": interrupted while gracefully closing stream");
} catch (final IOException e) { } catch (final IOException e) {
Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": io exception during disconnect (" + e.getMessage() + ")"); Log.d(Config.LOGTAG, account.getJid().toBareJid() + ": io exception during disconnect (" + e.getMessage() + ")");
} finally { } finally {
FileBackend.close(currentSocket); FileBackend.close(currentSocket);
forceCloseSocket();
} }
} else { } else {
forceCloseSocket(); forceCloseSocket();