Source code for slixmpp.plugins.xep_0319.idle
# Slixmpp: The Slick XMPP Library
# Copyright (C) 2013 Nathanael C. Fritz, Lance J.T. Stout
# This file is part of Slixmpp.
# See the file LICENSE for copying permission.
from datetime import datetime, timezone
from slixmpp import JID
from slixmpp.stanza import Presence
from slixmpp.plugins import BasePlugin
from slixmpp.xmlstream import register_stanza_plugin
from slixmpp.xmlstream.handler import Callback
from slixmpp.xmlstream.matcher import StanzaPath
from slixmpp.plugins.xep_0319 import stanza
def get_local_timezone():
return datetime.now(timezone.utc).astimezone().tzinfo
[docs]
class XEP_0319(BasePlugin):
name = 'xep_0319'
description = 'XEP-0319: Last User Interaction in Presence'
dependencies = {'xep_0012'}
stanza = stanza
def plugin_init(self):
self._idle_stamps = {}
register_stanza_plugin(Presence, stanza.Idle)
self.api.register(self._set_idle, 'set_idle', default=True)
self.api.register(self._get_idle, 'get_idle', default=True)
self.xmpp.register_handler(Callback(
'Idle Presence',
StanzaPath('presence/idle'),
self._idle_presence
))
self.xmpp.add_filter('out', self._stamp_idle_presence)
def session_bind(self, jid):
self.xmpp['xep_0030'].add_feature('urn:xmpp:idle:1')
def plugin_end(self):
self.xmpp['xep_0030'].del_feature(feature='urn:xmpp:idle:1')
self.xmpp.del_filter('out', self._stamp_idle_presence)
self.xmpp.remove_handler('Idle Presence')
[docs]
async def idle(self, jid: JID | None = None,
since: datetime | None = None):
"""Set an idle duration for a JID
.. versionchanged:: 1.8.0
This function is now a coroutine.
"""
seconds = None
timezone = get_local_timezone()
if since is None:
since = datetime.now(timezone)
else:
seconds = datetime.now(timezone) - since
await self.api['set_idle'](jid, None, None, since)
await self.xmpp['xep_0012'].set_last_activity(jid=jid, seconds=seconds)
[docs]
async def active(self, jid: JID | None = None):
"""Reset the idle timer.
.. versionchanged:: 1.8.0
This function is now a coroutine.
"""
await self.api['set_idle'](jid, None, None, None)
await self.xmpp['xep_0012'].del_last_activity(jid)
def _set_idle(self, jid, node, ifrom, data):
self._idle_stamps[jid] = data
def _get_idle(self, jid, node, ifrom, data):
return self._idle_stamps.get(jid, None)
def _idle_presence(self, pres):
self.xmpp.event('presence_idle', pres)
async def _stamp_idle_presence(self, stanza):
if isinstance(stanza, Presence):
since = await self.api['get_idle'](stanza['from'] or self.xmpp.boundjid)
if since:
stanza['idle']['since'] = since
return stanza