# Slixmpp: The Slick XMPP Library
# This file is part of Slixmpp
# See the file LICENSE for copying permission
import uuid
import logging
from slixmpp import JID
from slixmpp.stanza import Message, Iq
from slixmpp.exceptions import XMPPError
from slixmpp.xmlstream.handler import CoroutineCallback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.plugins import BasePlugin
from slixmpp.plugins.xep_0047 import stanza, Open, Close, Data, IBBytestream
log = logging.getLogger(__name__)
[docs]
class XEP_0047(BasePlugin):
"""
XEP-0047: In-Band Bytestreams
Events registered by this plugin:
- :term:`ibb_stream_start`
- :term:`ibb_stream_end`
- :term:`ibb_stream_data`
- :term:`stream:[stream id]:[peer jid]`
Plugin Parameters:
- ``block_size`` (default: ``4096``): default block size to negotiate
- ``max_block_size`` (default: ``8192``): max block size to accept
- ``auto_accept`` (default: ``False``): if incoming streams should be
accepted automatically.
- :term:`authorized (0047 version)`
- :term:`authorized_sid (0047 version)`
- :term:`preauthorize_sid (0047 version)`
- :term:`get_stream`
- :term:`set_stream`
- :term:`del_stream`
"""
name = 'xep_0047'
description = 'XEP-0047: In-Band Bytestreams'
dependencies = {'xep_0030'}
stanza = stanza
default_config = {
'block_size': 4096,
'max_block_size': 8192,
'auto_accept': False,
}
def plugin_init(self):
self._streams = {}
self._preauthed_sids = {}
register_stanza_plugin(Iq, Open)
register_stanza_plugin(Iq, Close)
register_stanza_plugin(Iq, Data)
register_stanza_plugin(Message, Data)
self.xmpp.register_handler(CoroutineCallback(
'IBB Open',
StanzaPath('iq@type=set/ibb_open'),
self._handle_open_request))
self.xmpp.register_handler(CoroutineCallback(
'IBB Close',
StanzaPath('iq@type=set/ibb_close'),
self._handle_close))
self.xmpp.register_handler(CoroutineCallback(
'IBB Data',
StanzaPath('iq@type=set/ibb_data'),
self._handle_data))
self.xmpp.register_handler(CoroutineCallback(
'IBB Message Data',
StanzaPath('message/ibb_data'),
self._handle_data))
self.api.register(self._authorized, 'authorized', default=True)
self.api.register(self._authorized_sid, 'authorized_sid', default=True)
self.api.register(self._preauthorize_sid, 'preauthorize_sid', default=True)
self.api.register(self._get_stream, 'get_stream', default=True)
self.api.register(self._set_stream, 'set_stream', default=True)
self.api.register(self._del_stream, 'del_stream', default=True)
def plugin_end(self):
self.xmpp.remove_handler('IBB Open')
self.xmpp.remove_handler('IBB Close')
self.xmpp.remove_handler('IBB Data')
self.xmpp.remove_handler('IBB Message Data')
self.xmpp['xep_0030'].del_feature(feature='http://jabber.org/protocol/ibb')
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('http://jabber.org/protocol/ibb')
def _get_stream(self, jid, sid, peer_jid, data):
return self._streams.get((jid, sid, peer_jid), None)
def _set_stream(self, jid, sid, peer_jid, stream):
self._streams[(jid, sid, peer_jid)] = stream
def _del_stream(self, jid, sid, peer_jid, data):
if (jid, sid, peer_jid) in self._streams:
del self._streams[(jid, sid, peer_jid)]
async def _accept_stream(self, iq):
receiver = iq['to']
sender = iq['from']
sid = iq['ibb_open']['sid']
if await self.api['authorized_sid'](receiver, sid, sender, iq):
return True
return await self.api['authorized'](receiver, sid, sender, iq)
def _authorized(self, jid, sid, ifrom, iq):
if self.auto_accept:
return True
return False
def _authorized_sid(self, jid, sid, ifrom, iq):
if (jid, sid, ifrom) in self._preauthed_sids:
del self._preauthed_sids[(jid, sid, ifrom)]
return True
return False
def _preauthorize_sid(self, jid, sid, ifrom, data):
self._preauthed_sids[(jid, sid, ifrom)] = True
[docs]
async def open_stream(self, jid: JID, *, block_size: int | None = None,
sid: str | None = None, use_messages: bool = False,
ifrom: JID | None = None,
**iqkwargs) -> IBBytestream:
"""Open an IBB stream with a peer JID.
.. versionchanged:: 1.8.0
This function is now a coroutine and must be awaited.
All parameters except ``jid`` are keyword-args only.
:param jid: The remote JID to initiate the stream with.
:param block_size: The block size to advertise.
:param sid: The IBB stream id (if not provided, will be auto-generated).
:param use_messages: If the stream should use message stanzas instead of iqs.
:returns: The opened byte stream with the remote JID
:raises .IqError: When the remote entity denied the stream.
"""
if sid is None:
sid = str(uuid.uuid4())
if block_size is None:
block_size = self.block_size
iq = self.xmpp.make_iq_set(ito=jid, ifrom=ifrom)
iq['ibb_open']['block_size'] = block_size
iq['ibb_open']['sid'] = sid
iq['ibb_open']['stanza'] = 'message' if use_messages else 'iq'
stream = IBBytestream(self.xmpp, sid, block_size,
iq['from'], iq['to'], use_messages)
callback = iqkwargs.pop('callback', None)
result = await iq.send(**iqkwargs)
log.debug('IBB stream (%s) accepted by %s', stream.sid, result['from'])
stream.self_jid = result['to']
stream.peer_jid = result['from']
stream.stream_started = True
await self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
if callback is not None:
self.xmpp.add_event_handler('ibb_stream_start', callback, disposable=True)
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (stream.sid, stream.peer_jid), stream)
return stream
async def _handle_open_request(self, iq: Iq):
sid = iq['ibb_open']['sid']
size = iq['ibb_open']['block_size'] or self.block_size
log.debug('Received IBB stream request from %s', iq['from'])
if not sid:
raise XMPPError(etype='modify', condition='bad-request')
if not await self._accept_stream(iq):
raise XMPPError(etype='cancel', condition='not-acceptable')
if size > self.max_block_size:
raise XMPPError('resource-constraint')
stream = IBBytestream(self.xmpp, sid, size,
iq['to'], iq['from'])
stream.stream_started = True
await self.api['set_stream'](stream.self_jid, stream.sid, stream.peer_jid, stream)
iq.reply().send()
self.xmpp.event('ibb_stream_start', stream)
self.xmpp.event('stream:%s:%s' % (sid, stream.peer_jid), stream)
async def _handle_data(self, stanza: Iq | Message):
sid = stanza['ibb_data']['sid']
stream = await self.api['get_stream'](stanza['to'], sid, stanza['from'])
if stream is not None and stanza['from'] == stream.peer_jid:
stream._recv_data(stanza)
else:
raise XMPPError('item-not-found')
async def _handle_close(self, iq: Iq):
sid = iq['ibb_close']['sid']
stream = await self.api['get_stream'](iq['to'], sid, iq['from'])
if stream is not None and iq['from'] == stream.peer_jid:
stream._closed(iq)
await self.api['del_stream'](stream.self_jid, stream.sid, stream.peer_jid)
else:
raise XMPPError('item-not-found')