sprig.streamutils module

class sprig.streamutils.BucketMerger(sort_key: Callable[[_T], _SortableT], bucket_key: Callable[[_T], _HashableT], callback: Callable[[_T], Any])

Bases: Generic[sprig.streamutils._T, sprig.streamutils._HashableT, sprig.streamutils._SortableT]

close()None
put(msg: _T)None
register(src: _HashableT)None
unregister(src: _HashableT)None
class sprig.streamutils.ManagedMerger(callback: Callable[[_T], Any])

Bases: Generic[sprig.streamutils._T, sprig.streamutils._HashableT, sprig.streamutils._SortableT]

Sort a partially sorted iterator

It must be possible to bucket the iterator into individually sorted iterators.

A typical use case for this would be sorting incoming TCP messages from several sources; Thanks to TCP the messages from each source are guaranteed to be ordered and thanks to IP the messages can be bucketed by the source.

Notice in the example below that * the vowels and consonants are each sorted in the input, and * the output is sorted without regard for the case of the letter.

>>> emitted = []
>>> merger = ManagedMerger(emitted.append)
>>> merger.register(False)
>>> merger.register(True)
>>> for c in "aEbcdifgh": merger.put(c, c.lower() in "aeiou", c.lower())
>>> merger.close()
>>> "".join(emitted)
'abcdEfghi'
close()None
put(msg: _T, sender: _HashableT, time: _SortableT)None
register(sender: _HashableT)None
unregister(sender: _HashableT)None
class sprig.streamutils.SimpleBucketMerger(sort_key: Callable[[_T], sprig.streamutils._SupportsSort], bucket_key: Callable[[_T], _HashableT], bucket_keys: Iterable[_HashableT], callback: Callable[[_T], Any])

Bases: Generic[sprig.streamutils._T]

A simpler and less flexible version

Implemented as warm up, kept for comparison (for now).

close()None
put(item: _T)None
class sprig.streamutils.TimeoutMerger(callback: Callable[[_T], Any], timeout: _TimeT)

Bases: Generic[sprig.streamutils._T, sprig.streamutils._HashableT, sprig.streamutils._TimeT]

Merge sorted streams of messages

This class automatically handles the registering of senders joining the cluster and the unregistering of senders leaving the clusters using a timeout heuristic; if one sender falls behind the leading sender by more than a given duration, then the silent sender is presumed to have left.

Convenient when senders leaving the cluster is a rare occurrence but probably a bad fit otherwise.

put(msg: _T, sender: _HashableT, time: _TimeT)None
property senders