flush stanzas in batches
This commit is contained in:
parent
cdc239b040
commit
6bd552f6a3
|
@ -8,12 +8,15 @@ import java.io.OutputStreamWriter;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import eu.siacs.conversations.Config;
|
import eu.siacs.conversations.Config;
|
||||||
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
|
import eu.siacs.conversations.xmpp.stanzas.AbstractStanza;
|
||||||
|
|
||||||
public class TagWriter {
|
public class TagWriter {
|
||||||
|
|
||||||
|
private static final int FLUSH_DELAY = 400;
|
||||||
|
|
||||||
private OutputStreamWriter outputStream;
|
private OutputStreamWriter outputStream;
|
||||||
private boolean finished = false;
|
private boolean finished = false;
|
||||||
private final LinkedBlockingQueue<AbstractStanza> writeQueue = new LinkedBlockingQueue<AbstractStanza>();
|
private final LinkedBlockingQueue<AbstractStanza> writeQueue = new LinkedBlockingQueue<AbstractStanza>();
|
||||||
|
@ -21,6 +24,8 @@ public class TagWriter {
|
||||||
|
|
||||||
private final Thread asyncStanzaWriter = new Thread() {
|
private final Thread asyncStanzaWriter = new Thread() {
|
||||||
|
|
||||||
|
private final AtomicInteger batchStanzaCount = new AtomicInteger(0);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
stanzaWriterCountDownLatch = new CountDownLatch(1);
|
stanzaWriterCountDownLatch = new CountDownLatch(1);
|
||||||
|
@ -29,12 +34,21 @@ public class TagWriter {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
AbstractStanza output = writeQueue.take();
|
final AbstractStanza stanza = writeQueue.poll(FLUSH_DELAY, TimeUnit.MILLISECONDS);
|
||||||
outputStream.write(output.toString());
|
if (stanza != null) {
|
||||||
if (writeQueue.size() == 0) {
|
batchStanzaCount.incrementAndGet();
|
||||||
|
outputStream.write(stanza.toString());
|
||||||
|
} else {
|
||||||
|
final int batch = batchStanzaCount.getAndSet(0);
|
||||||
|
if (batch > 1) {
|
||||||
|
Log.d(Config.LOGTAG, "flushing " + batch + " stanzas");
|
||||||
|
}
|
||||||
outputStream.flush();
|
outputStream.flush();
|
||||||
|
final AbstractStanza nextStanza = writeQueue.take();
|
||||||
|
batchStanzaCount.incrementAndGet();
|
||||||
|
outputStream.write(nextStanza.toString());
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (final Exception e) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue