2019-06-23 12:53:18 +00:00
using Gee ;
using Xmpp ;
using Xmpp.Xep ;
namespace Xmpp.Xep.InBandBytestreams {
private const string NS_URI = " http://jabber.org/protocol/ibb " ;
private const int SEQ_MODULUS = 65536 ;
2019-06-23 12:51:33 +00:00
public class Module : XmppStreamModule , Iq . Handler {
2019-06-23 12:53:18 +00:00
public static Xmpp . ModuleIdentity < Module > IDENTITY = new Xmpp . ModuleIdentity < Module > ( NS_URI , " 0047_in_band_bytestreams " ) ;
public override void attach ( XmppStream stream ) {
stream . add_flag ( new Flag ( ) ) ;
2019-06-23 12:51:33 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . register_for_namespace ( NS_URI , this ) ;
2019-06-23 12:53:18 +00:00
}
2019-08-06 13:37:49 +00:00
public override void detach ( XmppStream stream ) {
stream . get_module ( Iq . Module . IDENTITY ) . unregister_from_namespace ( NS_URI , this ) ;
}
2019-06-23 12:53:18 +00:00
2020-04-24 12:19:42 +00:00
public async void on_iq_set ( XmppStream stream , Iq . Stanza iq ) {
2019-06-23 12:51:33 +00:00
// the iq module ensures that there's only one child node
StanzaNode ? node = null ;
node = ( node ! = null ) ? node : iq . stanza . get_subnode ( " open " , NS_URI ) ;
node = ( node ! = null ) ? node : iq . stanza . get_subnode ( " data " , NS_URI ) ;
node = ( node ! = null ) ? node : iq . stanza . get_subnode ( " close " , NS_URI ) ;
if ( node = = null ) {
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . bad_request ( " unknown IBB action " ) ) { to = iq . from } ) ;
2019-06-23 12:53:18 +00:00
return ;
}
2019-06-23 12:51:33 +00:00
string ? sid = node . get_attribute ( " sid " ) ;
if ( sid = = null ) {
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . bad_request ( " missing sid " ) ) { to = iq . from } ) ;
2019-06-23 12:53:18 +00:00
return ;
}
2019-06-23 12:51:33 +00:00
Connection ? conn = stream . get_flag ( Flag . IDENTITY ) . get_connection ( sid ) ;
if ( node . name = = " open " ) {
if ( conn = = null ) {
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . not_acceptable ( " unexpected IBB connection " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
2019-07-18 01:12:05 +00:00
if ( conn . state ! = Connection . State . WAITING_FOR_CONNECT ) {
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . bad_request ( " IBB open for already open connection " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
conn . handle_open ( stream , node , iq ) ;
} else {
if ( conn = = null | | conn . state ! = Connection . State . CONNECTED ) {
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . item_not_found ( ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
if ( node . name = = " close " ) {
conn . handle_close ( stream , node , iq ) ;
} else {
conn . handle_data ( stream , node , iq ) ;
}
2019-06-23 12:53:18 +00:00
}
}
public override string get_ns ( ) { return NS_URI ; }
public override string get_id ( ) { return IDENTITY . id ; }
}
2019-06-23 12:51:33 +00:00
public class Connection : IOStream {
public class Input : InputStream {
2019-08-05 15:05:33 +00:00
private weak Connection connection ;
2019-06-23 12:51:33 +00:00
public Input ( Connection connection ) {
this . connection = connection ;
}
public override ssize_t read ( uint8 [ ] buffer , Cancellable ? cancellable = null ) throws IOError {
throw new IOError . NOT_SUPPORTED ( " can't do non-async reads on in-band bytestreams " ) ;
}
public override async ssize_t read_async ( uint8 [ ] ? buffer , int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
return yield connection . read_async ( buffer , io_priority , cancellable ) ;
}
public override bool close ( Cancellable ? cancellable = null ) throws IOError {
2019-08-05 15:05:33 +00:00
throw new IOError . NOT_SUPPORTED ( " can't do non-async closes on in-band bytestreams " ) ;
2019-06-23 12:51:33 +00:00
}
public override async bool close_async ( int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
return yield connection . close_read_async ( io_priority , cancellable ) ;
}
}
public class Output : OutputStream {
2019-08-05 15:05:33 +00:00
private weak Connection connection ;
2019-06-23 12:51:33 +00:00
public Output ( Connection connection ) {
this . connection = connection ;
}
public override ssize_t write ( uint8 [ ] buffer , Cancellable ? cancellable = null ) throws IOError {
throw new IOError . NOT_SUPPORTED ( " can't do non-async writes on in-band bytestreams " ) ;
}
public override async ssize_t write_async ( uint8 [ ] ? buffer , int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
return yield connection . write_async ( buffer , io_priority , cancellable ) ;
}
public override bool close ( Cancellable ? cancellable = null ) throws IOError {
2019-08-05 15:05:33 +00:00
throw new IOError . NOT_SUPPORTED ( " can't do non-async closes on in-band bytestreams " ) ;
2019-06-23 12:51:33 +00:00
}
public override async bool close_async ( int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
return yield connection . close_write_async ( io_priority , cancellable ) ;
}
}
private Input input ;
private Output output ;
public override InputStream input_stream { get { return input ; } }
public override OutputStream output_stream { get { return output ; } }
2019-06-23 12:53:18 +00:00
public enum State {
2019-06-23 12:51:33 +00:00
WAITING_FOR_CONNECT ,
2019-06-23 12:53:18 +00:00
CONNECTING ,
CONNECTED ,
DISCONNECTING ,
DISCONNECTED ,
ERROR ,
}
2019-06-23 12:51:33 +00:00
public State state { get ; private set ; }
2019-06-23 12:53:18 +00:00
Jid receiver_full_jid ;
public string sid { get ; private set ; }
int block_size ;
int local_seq = 0 ;
int remote_ack = 0 ;
internal int remote_seq = 0 ;
2019-06-23 12:51:33 +00:00
bool input_closed = false ;
bool output_closed = false ;
2019-06-23 12:53:18 +00:00
2019-06-23 12:51:33 +00:00
// ERROR
string ? error = null ;
XmppStream stream ;
2019-07-11 23:53:28 +00:00
int read_callback_priority ;
Cancellable ? read_callback_cancellable = null ;
ulong read_callback_cancellable_id ;
2019-06-23 12:51:33 +00:00
SourceFunc ? read_callback = null ;
2019-07-11 23:53:28 +00:00
int write_callback_priority ;
2019-06-23 12:51:33 +00:00
SourceFunc ? write_callback = null ;
2019-07-11 23:53:28 +00:00
ulong write_callback_cancellable_id ;
Cancellable ? write_callback_cancellable = null ;
2019-06-23 12:51:33 +00:00
// Need `Bytes` instead of `uint8[]` because the latter doesn't work in
// parameter position of `LinkedList`.
LinkedList < Bytes > received = new LinkedList < Bytes > ( ) ;
private Connection ( XmppStream stream , Jid receiver_full_jid , string sid , int block_size , bool initiate ) {
this . stream = stream ;
2019-06-23 12:53:18 +00:00
this . receiver_full_jid = receiver_full_jid ;
this . sid = sid ;
this . block_size = block_size ;
2019-06-23 12:51:33 +00:00
this . state = initiate ? State . CONNECTING : State . WAITING_FOR_CONNECT ;
2019-06-23 12:53:18 +00:00
2019-06-23 12:51:33 +00:00
input = new Input ( this ) ;
output = new Output ( this ) ;
}
2019-06-23 12:53:18 +00:00
2019-07-22 08:41:26 +00:00
public void set_read_callback ( owned SourceFunc callback , Cancellable ? cancellable , int io_priority ) throws IOError {
2019-06-23 12:51:33 +00:00
if ( read_callback ! = null ) {
throw new IOError . PENDING ( " only one async read is permitted at a time on an in-band bytestream " ) ;
}
2019-07-11 23:53:28 +00:00
if ( cancellable ! = null ) {
read_callback_cancellable_id = cancellable . connect ( trigger_read_callback ) ;
}
2019-07-22 08:41:26 +00:00
read_callback = ( owned ) callback ;
2019-07-11 23:53:28 +00:00
read_callback_cancellable = cancellable ;
read_callback_priority = io_priority ;
2019-06-23 12:51:33 +00:00
}
2019-07-22 08:41:26 +00:00
public void set_write_callback ( owned SourceFunc callback , Cancellable ? cancellable , int io_priority ) throws IOError {
2019-06-23 12:51:33 +00:00
if ( write_callback ! = null ) {
throw new IOError . PENDING ( " only one async write is permitted at a time on an in-band bytestream " ) ;
}
2019-07-11 23:53:28 +00:00
if ( cancellable ! = null ) {
write_callback_cancellable_id = cancellable . connect ( trigger_write_callback ) ;
}
2019-07-22 08:41:26 +00:00
write_callback = ( owned ) callback ;
2019-07-11 23:53:28 +00:00
write_callback_cancellable = cancellable ;
write_callback_priority = io_priority ;
2019-06-23 12:51:33 +00:00
}
public void trigger_read_callback ( ) {
if ( read_callback ! = null ) {
2019-07-11 23:53:28 +00:00
Idle . add ( ( owned ) read_callback , read_callback_priority ) ;
2019-06-23 12:51:33 +00:00
read_callback = null ;
2019-07-11 23:53:28 +00:00
if ( read_callback_cancellable ! = null ) {
read_callback_cancellable . disconnect ( read_callback_cancellable_id ) ;
}
read_callback_cancellable = null ;
2019-06-23 12:51:33 +00:00
}
}
public void trigger_write_callback ( ) {
if ( write_callback ! = null ) {
2019-07-11 23:53:28 +00:00
Idle . add ( ( owned ) write_callback , write_callback_priority ) ;
2019-06-23 12:51:33 +00:00
write_callback = null ;
2019-07-11 23:53:28 +00:00
if ( write_callback_cancellable ! = null ) {
write_callback_cancellable . disconnect ( write_callback_cancellable_id ) ;
}
write_callback_cancellable = null ;
2019-06-23 12:51:33 +00:00
}
2019-06-23 12:53:18 +00:00
}
2019-06-23 12:51:33 +00:00
public async ssize_t read_async ( uint8 [ ] ? buffer , int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
while ( true ) {
2019-07-11 23:53:28 +00:00
if ( cancellable ! = null ) {
cancellable . set_error_if_cancelled ( ) ;
}
2019-06-23 12:51:33 +00:00
if ( input_closed ) {
return 0 ;
}
Bytes ? chunk = received . poll ( ) ;
if ( chunk ! = null ) {
int read = int . min ( buffer . length , chunk . length ) ;
for ( int i = 0 ; i < read ; i + + ) {
buffer [ i ] = chunk [ i ] ;
}
if ( buffer . length < chunk . length ) {
received . offer_head ( chunk [ buffer . length : chunk . length ] ) ;
}
return read ;
}
2019-07-18 01:12:05 +00:00
if ( state = = Connection . State . DISCONNECTED ) {
2019-06-23 12:51:33 +00:00
return 0 ;
}
2019-07-11 23:53:28 +00:00
set_read_callback ( read_async . callback , cancellable , io_priority ) ;
2019-06-23 12:51:33 +00:00
yield ;
}
2019-06-23 12:53:18 +00:00
}
2019-06-23 12:51:33 +00:00
public async ssize_t write_async ( uint8 [ ] ? buffer , int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
2019-07-18 01:12:05 +00:00
while ( state = = State . WAITING_FOR_CONNECT | | state = = State . CONNECTING ) {
2019-07-11 23:53:28 +00:00
if ( cancellable ! = null ) {
cancellable . set_error_if_cancelled ( ) ;
}
set_write_callback ( write_async . callback , cancellable , io_priority ) ;
2019-06-23 12:51:33 +00:00
yield ;
}
throw_if_closed ( ) ;
2019-07-18 01:12:05 +00:00
assert ( state = = State . CONNECTED ) ;
2019-06-23 12:51:33 +00:00
// TODO(hrxi): merging?
2019-06-23 12:53:18 +00:00
int seq = local_seq ;
local_seq = ( local_seq + 1 ) % SEQ_MODULUS ;
2019-06-23 12:51:33 +00:00
if ( buffer . length > block_size ) {
buffer = buffer [ 0 : block_size ] ;
}
2019-06-23 12:53:18 +00:00
StanzaNode data = new StanzaNode . build ( " data " , NS_URI )
. add_self_xmlns ( )
. put_attribute ( " sid " , sid )
. put_attribute ( " seq " , seq . to_string ( ) )
2019-06-23 12:51:33 +00:00
. put_node ( new StanzaNode . text ( Base64 . encode ( buffer ) ) ) ;
2019-06-23 12:53:18 +00:00
Iq . Stanza iq = new Iq . Stanza . set ( data ) { to = receiver_full_jid } ;
2019-07-11 23:53:28 +00:00
set_write_callback ( write_async . callback , cancellable , io_priority ) ;
2019-06-23 12:53:18 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , iq , ( stream , iq ) = > {
if ( iq . is_error ( ) ) {
2019-06-23 12:51:33 +00:00
set_error ( " sending failed " ) ;
} else if ( remote_ack ! = seq ) {
set_error ( " out of order acks " ) ;
} else {
remote_ack = ( remote_ack + 1 ) % SEQ_MODULUS ;
if ( local_seq = = remote_ack ) {
trigger_write_callback ( ) ;
}
2019-06-23 12:53:18 +00:00
}
} ) ;
2019-06-23 12:51:33 +00:00
yield ;
2019-07-11 23:53:28 +00:00
if ( cancellable ! = null ) {
cancellable . set_error_if_cancelled ( ) ;
}
2019-06-23 12:51:33 +00:00
throw_if_error ( ) ;
return buffer . length ;
2019-06-23 12:53:18 +00:00
}
2019-06-23 12:51:33 +00:00
public async bool close_read_async ( int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
input_closed = true ;
if ( ! output_closed ) {
return true ;
}
return yield close_async_impl ( io_priority , cancellable ) ;
}
public async bool close_write_async ( int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
output_closed = true ;
if ( ! input_closed ) {
return true ;
}
return yield close_async_impl ( io_priority , cancellable ) ;
}
delegate void OnClose ( bool success ) ;
2019-08-05 15:05:33 +00:00
private bool close_impl ( Cancellable ? cancellable , OnClose on_close ) {
2019-07-18 01:12:05 +00:00
if ( state = = State . DISCONNECTING | | state = = State . DISCONNECTED | | state = = State . ERROR ) {
2019-06-23 12:51:33 +00:00
on_close ( true ) ;
return true ;
}
2019-07-18 01:12:05 +00:00
if ( state = = State . WAITING_FOR_CONNECT ) {
state = State . DISCONNECTED ;
2019-06-23 12:51:33 +00:00
stream . get_flag ( Flag . IDENTITY ) . remove_connection ( this ) ;
trigger_read_callback ( ) ;
on_close ( true ) ;
return true ;
}
2019-07-18 01:12:05 +00:00
state = State . DISCONNECTING ;
2019-06-23 12:53:18 +00:00
StanzaNode close = new StanzaNode . build ( " close " , NS_URI )
. add_self_xmlns ( )
. put_attribute ( " sid " , sid ) ;
Iq . Stanza iq = new Iq . Stanza . set ( close ) { to = receiver_full_jid } ;
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , iq , ( stream , iq ) = > {
2019-07-18 01:12:05 +00:00
assert ( state = = State . DISCONNECTING ) ;
2019-06-23 12:53:18 +00:00
if ( iq . is_error ( ) ) {
2019-06-23 12:51:33 +00:00
set_error ( " disconnecting failed " ) ;
} else {
2019-07-18 01:12:05 +00:00
state = State . DISCONNECTED ;
2019-06-23 12:53:18 +00:00
}
2019-06-23 12:51:33 +00:00
stream . get_flag ( Flag . IDENTITY ) . remove_connection ( this ) ;
trigger_read_callback ( ) ;
on_close ( ! iq . is_error ( ) ) ;
2019-06-23 12:53:18 +00:00
} ) ;
2019-06-23 12:51:33 +00:00
return true ;
}
private async bool close_async_impl ( int io_priority = GLib . Priority . DEFAULT , Cancellable ? cancellable = null ) throws IOError {
SourceFunc callback = close_async_impl . callback ;
close_impl ( cancellable , ( ) = > { Idle . add ( ( owned ) callback ) ; } ) ;
yield ;
throw_if_error ( ) ;
return true ;
}
public static Connection create ( XmppStream stream , Jid receiver_full_jid , string sid , int block_size , bool initiate ) {
Connection conn = new Connection ( stream , receiver_full_jid , sid , block_size , initiate ) ;
if ( initiate ) {
StanzaNode open = new StanzaNode . build ( " open " , NS_URI )
. add_self_xmlns ( )
. put_attribute ( " block-size " , block_size . to_string ( ) )
. put_attribute ( " sid " , sid ) ;
Iq . Stanza iq = new Iq . Stanza . set ( open ) { to = receiver_full_jid } ;
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , iq , ( stream , iq ) = > {
2019-07-18 01:12:05 +00:00
if ( conn . state ! = State . CONNECTING ) {
assert ( conn . state ! = State . CONNECTED ) ;
2019-06-23 12:51:33 +00:00
return ;
}
if ( ! iq . is_error ( ) ) {
2019-07-18 01:12:05 +00:00
conn . state = State . CONNECTED ;
2019-06-23 12:51:33 +00:00
stream . get_flag ( Flag . IDENTITY ) . add_connection ( conn ) ;
conn . trigger_write_callback ( ) ;
} else {
conn . set_error ( " connection failed " ) ;
}
} ) ;
} else {
stream . get_flag ( Flag . IDENTITY ) . add_connection ( conn ) ;
}
return conn ;
}
void throw_if_error ( ) throws IOError {
2019-07-18 01:12:05 +00:00
if ( state = = State . ERROR ) {
2019-06-23 12:51:33 +00:00
throw new IOError . FAILED ( error ) ;
}
}
void throw_if_closed ( ) throws IOError {
throw_if_error ( ) ;
2019-07-18 01:12:05 +00:00
if ( state = = State . DISCONNECTING | | state = = State . DISCONNECTED ) {
2019-06-23 12:51:33 +00:00
throw new IOError . CLOSED ( " can't read/write on a closed connection " ) ;
}
}
void set_error ( string error ) {
2019-07-18 01:12:05 +00:00
if ( state ! = State . WAITING_FOR_CONNECT & & state ! = State . DISCONNECTING & & state ! = State . DISCONNECTED & & state ! = State . ERROR ) {
2019-06-23 12:51:33 +00:00
close_async . begin ( ) ;
}
2019-07-18 01:12:05 +00:00
state = State . ERROR ;
2019-06-23 12:51:33 +00:00
this . error = error ;
stream . get_flag ( Flag . IDENTITY ) . remove_connection ( this ) ;
}
public void handle_open ( XmppStream stream , StanzaNode open , Iq . Stanza iq ) {
2019-07-18 01:12:05 +00:00
assert ( state = = State . WAITING_FOR_CONNECT ) ;
2019-06-23 12:51:33 +00:00
int block_size = open . get_attribute_int ( " block-size " ) ;
string ? stanza = open . get_attribute ( " stanza " ) ;
if ( block_size < 0 | | ( stanza ! = null & & stanza ! = " iq " & & stanza ! = " message " ) ) {
set_error ( " invalid open " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . bad_request ( " missing block_size or invalid stanza " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
if ( stanza ! = null & & stanza ! = " iq " ) {
set_error ( " invalid open " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . feature_not_implemented ( " cannot use message stanzas for IBB " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
if ( block_size > this . block_size ) {
set_error ( " invalid open " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . build ( ErrorStanza . TYPE_CANCEL , ErrorStanza . CONDITION_RESOURCE_CONSTRAINT , " opening a connection with a greater than negotiated/acceptable block size " , null ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
this . block_size = block_size ;
2019-07-18 01:12:05 +00:00
state = State . CONNECTED ;
2019-06-23 12:51:33 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . result ( iq ) ) ;
trigger_write_callback ( ) ;
}
public void handle_data ( XmppStream stream , StanzaNode data , Iq . Stanza iq ) {
2019-07-18 01:12:05 +00:00
assert ( state = = State . CONNECTED ) ;
2019-06-23 12:51:33 +00:00
if ( input_closed ) {
set_error ( " unexpected data " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . not_allowed ( " unexpected data " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
int seq = data . get_attribute_int ( " seq " ) ;
// TODO(hrxi): return an error on malformed base64 (need to do this
// according to the xep)
uint8 [ ] content = Base64 . decode ( data . get_string_content ( ) ) ;
if ( content . length > block_size ) {
set_error ( " data longer than negotiated block size " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . bad_request ( " data longer than negotiated block size " ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
if ( seq < 0 | | seq ! = remote_seq ) {
set_error ( " out of order data packets " ) ;
2019-08-06 18:20:43 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . error ( iq , new ErrorStanza . build ( ErrorStanza . TYPE_CANCEL , ErrorStanza . CONDITION_UNEXPECTED_REQUEST , " out of order data packets " , null ) ) { to = iq . from } ) ;
2019-06-23 12:51:33 +00:00
return ;
}
remote_seq = ( remote_seq + 1 ) % SEQ_MODULUS ;
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . result ( iq ) ) ;
if ( content . length ! = 0 ) {
received . offer ( new Bytes . take ( content ) ) ;
trigger_read_callback ( ) ;
}
}
public void handle_close ( XmppStream stream , StanzaNode close , Iq . Stanza iq ) {
2019-07-18 01:12:05 +00:00
assert ( state = = State . CONNECTED ) ;
2019-06-23 12:51:33 +00:00
stream . get_module ( Iq . Module . IDENTITY ) . send_iq ( stream , new Iq . Stanza . result ( iq ) ) ;
stream . get_flag ( Flag . IDENTITY ) . remove_connection ( this ) ;
input_closed = true ;
output_closed = true ;
2019-07-18 01:12:05 +00:00
state = State . DISCONNECTED ;
2019-06-23 12:51:33 +00:00
trigger_read_callback ( ) ;
2019-06-23 12:53:18 +00:00
}
}
public class Flag : XmppStreamFlag {
public static FlagIdentity < Flag > IDENTITY = new FlagIdentity < Flag > ( NS_URI , " in_band_bytestreams " ) ;
private HashMap < string , Connection > active = new HashMap < string , Connection > ( ) ;
public void add_connection ( Connection conn ) {
active [ conn . sid ] = conn ;
}
public Connection ? get_connection ( string sid ) {
return active . has_key ( sid ) ? active [ sid ] : null ;
}
public void remove_connection ( Connection conn ) {
active . unset ( conn . sid ) ;
}
public override string get_ns ( ) { return NS_URI ; }
public override string get_id ( ) { return IDENTITY . id ; }
}
}