Code source de xphs1903.outils

# Copyright (C) 2026 Émile Jetzer, Polytechnique Montréal
"""Fonctionnalités de base."""

# Activer la journalisation
import logging
from threading import Thread
from queue import Queue

logging.getLogger(__name__).addHandler(logging.NullHandler())


[docs] def paires(x: iter, n: int = 2): """Retourne des tranches de x de taille n. # Source - https://stackoverflow.com/a/1335618 # Posted by Nadia Alramli, modified by community. See post 'Timeline' for change history # Retrieved 2026-05-14, License - CC BY-SA 2.5 list_of_slices = zip(*(iter(the_list),) * slice_size) """ yield from zip(*(iter(x),) * n)
def se_tourner_les_pouces(): while True: try: time.sleep(0.1) except KeyboardInterrupt: break
[docs] class FilCopie(Thread): def __init__(self, orig: Queue, *args: Queue | Callable): self.orig = orig self.args = args super().__init__() def shutdown(self): for q, conv in paires(self.args): q.shutdown() super().shutdown()
[docs] def join(self): for q, conv in paires(self.args): q.join() super().join()
[docs] def run(self): while True: try: x = self.orig.get() for q, conv in paires(self.args): q.put(conv(x)) self.orig.task_done() except ShutDown: self.shutdown() break