Programming Guide

This guide covers the txaio API and common usage patterns.


Application Programming Interface

The API is identical whether you’re using Twisted or asyncio under the hood. Two bool variables are available if you need to know which framework is in use, and two helpers to enforce one or the other framework.

Explicitly Selecting a Framework

Until you explicitly select a framework, all txaio API methods just throw a usage error. So, you must call .use_twisted() or .use_asyncio() as appropriate. These will fail with ImportError if you don’t have the correct dependencies.

import txaio
txaio.use_twisted()
txaio.use_asyncio()

Set an Event Loop / Reactor

You can set txaio.config.loop to either an EventLoop instance (if using asyncio) or an explicit reactor (if using Twisted). By default, reactor is imported from twisted.internet on the first call_later invocation. For asyncio, asyncio.get_event_loop() is called at import time.

If you’ve installed your reactor before import txaio you shouldn’t need to do anything.

Note that under Twisted, only the IReactorTime interface is required.

Test Helpers

Test utilities are in txaio.testutil. There is a context-manager for testing delayed calls; see test_call_later.py for an example.

txaio module

using_twisted

True only if we’re using Twisted as our underlying event framework

using_asyncio

True only if we’re using asyncio as our underlying event framework

use_asyncio()[source]

Select asyncio framework (uses trollius/tulip on Pythons that lack asyncio).

use_twisted()[source]

Select the Twisted framework (will fail if Twisted is not installed).

create_future(result=None, error=None, canceller=None)[source]

Create and return a new framework-specific future object. On asyncio this returns a Future, on Twisted it returns a Deferred.

Parameters:
  • result – if not None, the future is already fulfilled, with the given result.

  • error (IFailedFuture or Exception) – if not None then the future is already failed, with the given error.

  • canceller – a single-argument callable which is invoked if this future is cancelled (the single argument is the future object which has been cancelled)

Raises:

ValueError – if both value and error are provided.

Returns:

under Twisted a Deferred, under asyncio a Future

as_future(func, *args, **kwargs)[source]

Call func with the provided arguments and keyword arguments, and always return a Future/Deferred. If func itself returns a future, that is directly returned. If it immediately succeed or failed then an already-resolved Future/Deferred is returned instead.

This allows you to write code that calls functions (e.g. possibly provided from user-code) and treat them uniformly. For example:

p = txaio.as_future(some_function, 1, 2, key='word')
txaio.add_callbacks(p, do_something, it_failed)

You therefore don’t have to worry if the underlying function was itself asynchronous or not – your code always treats it as asynchronous.

reject(future, error=None)[source]

Resolve the given future as failed. This will call any errbacks registered against this Future/Deferred. On Twisted, the errback is called with a bare Failure instance; on asyncio we provide an object that implements IFailedFuture because there is no equivalent in asyncio (this mimics part of the Failure API).

Parameters:
  • future – an unresolved Deferred/Future as returned by create_future()

  • error (IFailedFuture or Exception) – The error to fail the Deferred/Future with. If this is None, sys.exc_info() is used to create an IFailedFuture (or Failure) wrapping the current exception (so in this case it must be called inside an except: clause).

cancel(future)[source]

Cancel the given future. If a canceller was registered, it is invoked now. It is invalid to resolve or reject the future after cancelling it.

Parameters:

future – an unresolved Deferred/Future as returned by create_future()

resolve(future, value)[source]

Resolve the given future with the provided value. This triggers any callbacks registered against this Future/Deferred.

add_callbacks(future, callback, errback)[source]

Adds the provided callback and/or errback to the given future. To add multiple callbacks, call this method multiple times. For example, to add just an errback, call add_callbacks(p, None, my_errback)

Note that txaio doesn’t do anything special with regards to callback or errback chaining – it is highly recommended that you always return the incoming argument unmodified in your callback/errback so that Twisted and asyncio behave the same. For example:

def callback_or_errback(value):
    # other code
    return value
Raises:

ValueError – if both callback and errback are None

failure_message(fail)[source]

Takes an IFailedFuture instance and returns a formatted message suitable to show to a user. This will be a str with no newlines for the form: {exception_name}: {error_message} where error_message is the result of running str() on the exception instance (under asyncio) or the result of .getErrorMessage() on the Failure under Twisted.

failure_traceback(fail)[source]

Take an IFailedFuture instance and returns the Python traceback instance associated with the failure.

failure_format_traceback(fail):

Take an IFailedFuture instance and returns a formatted string showing the traceback. Typically, this will have many newlines in it and look like a “normal” Python traceback.

call_later(delay, func, *args, **kwargs)[source]

This calls the function func with the given parameters at the specified time in the future. Although asyncio doesn’t directly support kwargs with loop.call_later we wrap it in a functools.partial, as asyncio documentation suggests.

Note: see txaio.make_batched_timer() if you may have a lot of timers, and their absolute accuracy isn’t very important.

Parameters:

delay – how many seconds in the future to make the call

Returns:

The underlying library object, which will at least have a .cancel() method on it. It’s really IDelayedCall in Twisted and a Handle in asyncio.

make_batched_timer(seconds_per_bucket, chunk_size)[source]

This returns an object implementing IBatchedTimer such that any .call_later calls done through it (instead of via txaio.call_later()) will be “quantized” into buckets and processed in chunk_size batches “near” the time they are supposed to fire. seconds_per_bucket is only accurate to “milliseconds”.

When there are “tens of thousands” of outstanding timers, CPU usage can become a problem – if the accuracy of the timers isn’t very important, using “batched” timers can greatly reduce the number of “real” delayed calls in the event loop.

For example, Autobahn uses this feature for auto-ping timeouts, where the exact time of the event isn’t extremely important – but there are 2 outstanding calls per connection.

gather(futures, consume_exceptions=True)[source]

Returns a new Future that waits for the results from all the futures provided.

The Future/Deferred returned will callback with a list the same length as futures containing either the return value from each future, or an IFailedFuture/Failure instance if it failed.

Note that on Twisted, we use DeferredList which usually returns a list of 2-tuples of (status, value). We do inject a callback that unpacks this list to be just the value (or Failure) so that your callback can be identical on Twisted and asyncio.

make_logger()[source]

Creates and returns an instance of ILogger. This can pick up context from where it’s instantiated (e.g. the containing class or module) so the best way to use this is to create a logger for each class that produces logs; see the example in ILogger ‘s documentation

class ILogger[source]

This defines the methods you can call on the object returned from txaio.make_logger() – although the actual object may have additional methods, you should only call the methods listed here.

All the log methods have the same signature, they just differ in what “log level” they represent to the handlers/emitters. The message argument is a format string using PEP3101-style references to things from the kwargs. Note that there are also the following keys added to the kwargs: log_time and log_level.

For example:

class MyThing(object):
    log = txaio.make_logger()

    def something_interesting(self, things=dict(one=1, two=2)):
        try:
            self.log.debug("Called with {things[one]}", things=things)
            result = self._method_call()
            self.log.info("Got '{result}'.", result=result)
        except Exception:
            fail = txaio.create_failure()
            self.log.critical(txaio.failure_format_traceback(fail))

The philsophy behind txaio’s interface is fairly similar to Twisted’s logging APIs after version 15. See Twisted’s documentation for details.

class IFailedFuture[source]

This defines the interface for a common object encapsulating a failure from either an asyncio task/coroutine or a Twisted Deferred.

An instance implementing this interface is given to any errback callables you provide via txaio.add_callbacks()

In your errback you can extract information from an IFailedFuture with txaio.failure_message() and txaio.failure_traceback() or use .value to get the Exception instance.

Depending on other details or methods will probably cause incompatibilities between asyncio and Twisted.


Explicit Event Loops

Twisted has a single, global reactor (for now). As such, txaio was built with a single, global (but configurable) event-loop. However, asyncio supports multiple event-loops.

After version 2.7.0 it is possible to use txaio with multiple event-loops, and thereby offer asyncio users the chance to pass one. Of course, it’s still not possible to use multiple event-loops at once with Twisted.

To start using multiple event-loops with txaio, use txaio.with_config to return a new “instance” of the txaio API with the given config (the only thing you can configure currently is the event-loop). On Twisted, it’s an error if you try to use a different reactor.

The object returned by txaio.with_config is a drop-in replacement for every txaio.* call, so you can go from code like this:

import txaio
f = txaio.create_future()

…and instead make your code do look like this:

import asyncio
import txaio
txa = txaio.with_config(loop=asyncio.new_event_loop())
f = txa.create_future()

If you’re doing this inside a class, you could use self._txa or similar instead. This gives you an easy path to opt-in to this multiple event-loop API:

  • replace all txaio.* calls to use an object, like self._txa.

  • assign this to the txaio module (self._txa = txaio) or use the new API right away (self._txa = txaio.with_config())

  • add a public API to your library to pass in an event loop

  • when this is used, you set self._txa = txaio.with_config(loop=loop)

See the example in examples/multiloop.py.

Logging

If you are developing a new application, you can take advantage of more structured logging by using txaio’s APIs throughout. This API is similar to Twisted’s logging in many ways, but not identical. If you’re integrating txaio into existing code, it should “play nicely” with the logging module, Twisted’s newest logger, and the pre-15.2.0 “legacy” Twisted logger.

To create an object suitable for logging, call txaio.make_logger(). This will return an instance which has a series of methods indicating the “severity” or “level” of the log – see txaio.interfaces.ILogger for an example and more details.

So, given some code like:

import txaio
txaio.use_twisted()

class Bunny(object):
    log = txaio.make_logger()

    def hop(self, times=1):
        self.log.trace("Bunny.hop(times={times})", times=times)
        self.log.debug("Hopping {times} times.", times=times)
        try:
            1 / 0
        except Exception:
            fail = txaio.create_failure()
            self.log.critical(txaio.failure_format_traceback(fail))

print("output before start_logging")
txaio.start_logging(level='debug')
print("output after start_logging")
jack = Bunny()
jack.hop(42)

Then you should see output approximately like this:

output before start_logging
2016-01-21T01:02:03-0100 output after start_logging
2016-01-21T01:02:03-0100 Hopping 42 times.
2016-01-21T01:02:03-0100 Traceback (most recent call last):
  File "logging-example.py", line 21, in <module>
    jack.hop(42)
--- <exception caught here> ---
  File "logging-example.py", line 12, in hop
    raise RuntimeError("Fox spotted!")
exceptions.RuntimeError: Fox spotted!

Note that the trace-level message wasn’t logged. If you don’t like to see full tracebacks except with debugging, you can use this idiom:

self.log.critical(txaio.failure_message(fail))
self.log.debug(txaio.failure_format_traceback(fail))

It’s worth noting the code doesn’t change at all if you do .use_asyncio() at the top instead – of course this is the whole point of txaio!

Logging Interoperability

When you’re using libraries that are already doing logging, but not using the txaio APIs, you shouldn’t need to do anything. For example:

import txaio
txaio.use_twisted()


def existing_code():
    from twisted.python import log
    log.msg("A legacy Twisted logger message")

txaio.start_logging(level='debug')
existing_code()

If you’re using asyncio (or just built-in Python logging), it could look like this:

import txaio
txaio.use_asyncio()


def existing_code():
    import logging
    log = logging.getLogger("roy")
    log.info("Python stdlib message: %s", "txaio was here")

txaio.start_logging(level='debug')
existing_code()

Starting Logging Yourself

If you are already starting your favourite logging system yourself (be that Twiste’d logger via globalLogBeginner or Python stdlib logging), any library using txaio’s logging should play nicely with it. Not ever calling txaio.start_logging has a slight drawback, however: as part of setting up logging, we re-bind all the “unused” logging methods to do-nothing. For example, if the log level is set to 'info' than the .debug method on all txaio-created logger instances becomes a no-op.

For fully-worked examples of this, look in examples/log_interop_stdlib.py and examples/log_interop_twisted.py.