sprig.streamutils module¶
-
class
sprig.streamutils.
BucketMerger
(sort_key: Callable[[_T], _SortableT], bucket_key: Callable[[_T], _HashableT], callback: Callable[[_T], Any])¶ Bases:
Generic
[sprig.streamutils._T
,sprig.streamutils._HashableT
,sprig.streamutils._SortableT
]-
close
() → None¶
-
put
(msg: _T) → None¶
-
register
(src: _HashableT) → None¶
-
unregister
(src: _HashableT) → None¶
-
-
class
sprig.streamutils.
ManagedMerger
(callback: Callable[[_T], Any])¶ Bases:
Generic
[sprig.streamutils._T
,sprig.streamutils._HashableT
,sprig.streamutils._SortableT
]Sort a partially sorted iterator
It must be possible to bucket the iterator into individually sorted iterators.
A typical use case for this would be sorting incoming TCP messages from several sources; Thanks to TCP the messages from each source are guaranteed to be ordered and thanks to IP the messages can be bucketed by the source.
Notice in the example below that * the vowels and consonants are each sorted in the input, and * the output is sorted without regard for the case of the letter.
>>> emitted = [] >>> merger = ManagedMerger(emitted.append) >>> merger.register(False) >>> merger.register(True) >>> for c in "aEbcdifgh": merger.put(c, c.lower() in "aeiou", c.lower()) >>> merger.close() >>> "".join(emitted) 'abcdEfghi'
-
close
() → None¶
-
put
(msg: _T, sender: _HashableT, time: _SortableT) → None¶
-
register
(sender: _HashableT) → None¶
-
unregister
(sender: _HashableT) → None¶
-
-
class
sprig.streamutils.
SimpleBucketMerger
(sort_key: Callable[[_T], sprig.streamutils._SupportsSort], bucket_key: Callable[[_T], _HashableT], bucket_keys: Iterable[_HashableT], callback: Callable[[_T], Any])¶ Bases:
Generic
[sprig.streamutils._T
]A simpler and less flexible version
Implemented as warm up, kept for comparison (for now).
-
close
() → None¶
-
put
(item: _T) → None¶
-
-
class
sprig.streamutils.
TimeoutMerger
(callback: Callable[[_T], Any], timeout: _TimeT)¶ Bases:
Generic
[sprig.streamutils._T
,sprig.streamutils._HashableT
,sprig.streamutils._TimeT
]Merge sorted streams of messages
This class automatically handles the registering of senders joining the cluster and the unregistering of senders leaving the clusters using a timeout heuristic; if one sender falls behind the leading sender by more than a given duration, then the silent sender is presumed to have left.
Convenient when senders leaving the cluster is a rare occurrence but probably a bad fit otherwise.
-
put
(msg: _T, sender: _HashableT, time: _TimeT) → None¶
-
property
senders
¶
-