Common API

class aioftp.StreamIO(reader: StreamReader, writer: StreamWriter, *, timeout: float | int | None = None, read_timeout: float | int | None = None, write_timeout: float | int | None = None)

Stream input/output wrapper with timeout.

Parameters:
  • reader (asyncio.StreamReader) – stream reader

  • writer (asyncio.StreamWriter) – stream writer

  • timeout (int, float or None) – socket timeout for read/write operations

  • read_timeout (int, float or None) – socket timeout for read operations, overrides timeout

  • write_timeout (int, float or None) – socket timeout for write operations, overrides timeout

close() None

Close connection.

async read(count: int = -1) bytes

asyncio.coroutine()

Proxy for asyncio.StreamReader.read().

Parameters:

count (int) – block size for read operation

async readexactly(count: int) bytes

asyncio.coroutine()

Proxy for asyncio.StreamReader.readexactly().

Parameters:

count (int) – block size for read operation

async readline() bytes

asyncio.coroutine()

Proxy for asyncio.StreamReader.readline().

async start_tls(sslcontext: SSLContext, server_hostname: str | None) None

Upgrades the connection to TLS

async write(data: bytes) None

asyncio.coroutine()

Combination of asyncio.StreamWriter.write() and asyncio.StreamWriter.drain().

Parameters:

data (bytes) – data to write

class aioftp.Throttle(*, limit: int | None = None, reset_rate: float | int = 10)

Throttle for streams.

Parameters:
  • limit (int or None) – speed limit in bytes or None for unlimited

  • reset_rate (int or float) – time in seconds for «round» throttle memory (to deal with float precision when divide)

append(data: bytes, start: float) None

Count data for throttle

Parameters:
  • data (bytes) – bytes of data for count

  • start (float) – start of read/write time from asyncio.BaseEventLoop.time()

clone() Throttle

Clone throttle without memory

property limit: int | None

Throttle limit

async wait() None

asyncio.coroutine()

Wait until can do IO

class aioftp.StreamThrottle(read, write)

Stream throttle with read and write aioftp.Throttle

Parameters:
clone() StreamThrottle

Clone throttles without memory

classmethod from_limits(read_speed_limit: int | None = None, write_speed_limit: int | None = None) StreamThrottle

Simple wrapper for creation aioftp.StreamThrottle

Parameters:
  • read_speed_limit (int or None) – stream read speed limit in bytes or None for unlimited

  • write_speed_limit (int or None) – stream write speed limit in bytes or None for unlimited

class aioftp.ThrottleStreamIO(reader: StreamReader, writer: StreamWriter, throttles: dict[str, StreamThrottle] = {}, *, timeout: float | int | None = None, read_timeout: float | int | None = None, write_timeout: float | int | None = None)

Bases: StreamIO

Throttled aioftp.StreamIO. ThrottleStreamIO is subclass of aioftp.StreamIO. throttles attribute is dictionary of name: aioftp.StreamThrottle pairs

Parameters:
>>> self.stream = ThrottleStreamIO(
...     reader,
...     writer,
...     throttles={
...         "main": StreamThrottle(
...             read=Throttle(...),
...             write=Throttle(...)
...         )
...     },
...     timeout=timeout
... )
append(name: str, data: bytes, start: float) None

Update timeout for all throttles

Parameters:
  • name (str) – name of throttle to append to (“read” or “write”)

  • data (bytes) – bytes of data for count

  • start (float) – start of read/write time from asyncio.BaseEventLoop.time()

iter_by_block(count: int = 8192) AsyncStreamIterator

Read/iterate stream by block.

Return type:

aioftp.AsyncStreamIterator

>>> async for block in stream.iter_by_block(block_size):
...     ...
iter_by_line() AsyncStreamIterator

Read/iterate stream by line.

Return type:

aioftp.AsyncStreamIterator

>>> async for line in stream.iter_by_line():
...     ...
async read(count: int = -1) bytes

asyncio.coroutine()

aioftp.StreamIO.read() proxy

async readline() bytes

asyncio.coroutine()

aioftp.StreamIO.readline() proxy

async wait(name: str) None

asyncio.coroutine()

Wait for all throttles

Parameters:

name (str) – name of throttle to acquire (“read” or “write”)

async write(data: bytes) None

asyncio.coroutine()

aioftp.StreamIO.write() proxy

class aioftp.AsyncListerMixin

Add ability to async for context to collect data to list via await.

>>> class Context(AsyncListerMixin):
...     ...
>>> results = await Context(...)
class aioftp.AbstractAsyncLister(*, timeout: float | int | None = None)

Abstract context with ability to collect all iterables into list via await with optional timeout (via aioftp.with_timeout())

Parameters:

timeout (None, int or float) – timeout for __anext__ operation

>>> class Lister(AbstractAsyncLister):
...
...     @with_timeout
...     async def __anext__(self):
...         ...
>>> async for block in Lister(...):
...     ...
>>> result = await Lister(...)
>>> result
[block, block, block, ...]
aioftp.with_timeout(name: str) Callable[[Callable[[WithTimeOutParamSpec], Awaitable[WithTimeOutReturnType]]], Callable[[WithTimeOutParamSpec], Awaitable[WithTimeOutReturnType]]]
aioftp.with_timeout(name: Callable[[WithTimeOutParamSpec], Awaitable[WithTimeOutReturnType]]) Callable[[WithTimeOutParamSpec], Awaitable[WithTimeOutReturnType]]

Method decorator, wraps method with asyncio.wait_for(). timeout argument takes from name decorator argument or “timeout”.

Parameters:

name (str) – name of timeout attribute

Raises:

asyncio.TimeoutError – if coroutine does not finished in timeout

Wait for self.timeout

>>> def __init__(self, ...):
...
...     self.timeout = 1
...
... @with_timeout
... async def foo(self, ...):
...
...     pass

Wait for custom timeout

>>> def __init__(self, ...):
...
...     self.foo_timeout = 1
...
... @with_timeout("foo_timeout")
... async def foo(self, ...):
...
...     pass
aioftp.async_enterable(f: Callable[[AsyncEnterableParamSpec], Awaitable[AsyncEnterableReturnType]]) Callable[[AsyncEnterableParamSpec], AsyncEnterableInstanceProtocol[AsyncEnterableReturnType]]

Decorator. Bring coroutine result up, so it can be used as async context

>>> async def foo():
...
...     ...
...     return AsyncContextInstance(...)
...
... ctx = await foo()
... async with ctx:
...
...     # do
>>> @async_enterable
... async def foo():
...
...     ...
...     return AsyncContextInstance(...)
...
... async with foo() as ctx:
...
...     # do
...
... ctx = await foo()
... async with ctx:
...
...     # do
aioftp.setlocale(name: str) Generator[str, None, None]

Context manager with threading lock for set locale on enter, and set it back to original state on exit.

>>> with setlocale("C"):
...     ...