Source code for dynamix.events.tools

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
This file provides tools for event handling.

"""

from __future__ import print_function, division, unicode_literals, absolute_import, generators

from time import sleep, time
from random import random, seed
import heapq

__author__ = "Pascal Held"
__email__ = "paheld@gmail.com"


[docs]class MakeID(object): """Converter object for sender/receiver labels. """ ids = {} ids_reverse = {}
[docs] def make_id(self, generator): """Replaces all sender and receiver values by integer keys. Parameters ---------- generator : iterable or single event Events """ if isinstance(generator, dict): generator = [dict] for event in generator: event = make_lists(event) for field in ("sender", "receiver"): if not field in event: continue id_list = [] for elem in event[field]: eid = self.ids.get(elem, None) if eid is None: eid = len(self.ids) self.ids[elem] = eid self.ids_reverse[eid] = elem id_list.append(eid) event[field] = id_list yield make_literals(event)
__call__ = make_id
[docs]def make_lists(event): """Converts all sender and receiver literals into lists """ if not isinstance(event["sender"], (list, tuple)): event["sender"] = [event["sender"]] if "receiver" in event and not isinstance(event["receiver"], (list, tuple)): event["receiver"] = [event["receiver"]] return event
[docs]def make_literals(event): """Converts all sender and receiver lists into literals if they contain only one element """ if isinstance(event["sender"], (list, tuple)) and len(event["sender"]) == 1: event["sender"] = event["sender"][0] if "receiver" in event and isinstance(event["receiver"], (list, tuple)) and len(event["receiver"]) == 1: event["receiver"] = event["receiver"][0] return event
[docs]def event_to_string(event): """ Converts the Event-Dictionary into a string representation. Parameters ---------- event : dict Event in dict representation Returns ------- line : str String representation of the event Notes ----- Example: :: >>> event_to_string({"time":1,"sender":[1,2,3],"receiver":[4,5,6]}) '1;1,2,3;4,5,6' >>> event_to_string({"time":1,"sender":[1,2,3]}) '1;1,2,3' >>> event_to_string({"time":1,"sender":1,"receiver":2}) '1;1;2' """ if isinstance(event["sender"], (list, set)): sender = ",".join([str(x) for x in event["sender"]]) else: sender = str(event["sender"]) if "receiver" in event: if isinstance(event["receiver"], (list, set)): receiver = ",".join([str(x) for x in event["receiver"]]) else: receiver = str(event["receiver"]) return "{};{};{}".format(event["time"], sender, receiver) else: return "{};{}".format(event["time"], sender)
[docs]def string_to_event(string): """ Converts the string representation of an event into the dictionary representation. Parameters ---------- string : str Event in string representation Returns ------- event : dict Dictionary representation of the event Notes ----- Example: :: >>> event = string_to_event('1;1,2,3;4,5,6') >>> event == {'time': 1, 'sender': [1, 2, 3], 'receiver': [4, 5, 6]} True >>> event = string_to_event('1;1,2,3') >>> event == {'time': 1, 'sender': [1, 2, 3]} True >>> event = string_to_event('1;1;2') >>> event == {'time': 1, 'sender': 1, 'receiver': 2} True """ string = string.strip() parts = string.split(";") if not 2 <= len(parts) <= 3: raise ValueError event = {} try: event["time"] = int(parts[0]) except ValueError: event["time"] = float(parts[0]) senders = [] for s in parts[1].split(","): try: senders.append(int(s)) except ValueError: senders.append(s) event["sender"] = senders if len(parts) > 2: receivers = [] for r in parts[2].split(","): try: receivers.append(int(r)) except ValueError: receivers.append(r) event["receivers"] = receivers return make_literals(event)
[docs]def simplify(generator): """Simplifies the compact representation with multiple sender and receivers. The functions uses an iterator of events and creates for every sender-receiver combination a new event. Parameters ---------- generator : iterable or single event Events """ # Maybe only one event if isinstance(generator, dict): generator = [generator] for event in generator: event = make_lists(event) sender = event["sender"] try: receiver = event["receiver"] for s in sender: for r in receiver: yield { "time": event["time"], "sender": s, "receiver": r } # Maybe no receiver except KeyError: for s in sender: yield { "time": event["time"], "sender": s }
[docs]def expand_sender_events(generator, directed=True): """Expands sender events from each sender to each other sender The functions uses an iterator of events and creates for every sender-sender combination a new event. Parameters ---------- generator : iterable or single event Events directed : boolean If true generates A->B and B->A """ # Maybe only one event if isinstance(generator, dict): generator = [generator] for event in generator: event = make_lists(event) sender = event["sender"] for s in sender: for r in sender: if s == r or not directed and s>r: continue yield { "time": event["time"], "sender": s, "receiver": r }
[docs]def save(generator, filename): """Saves events to file Parameters ---------- generator : iterable or single event Events filename : str Filename which is used to save the file. """ #maybe only one event if isinstance(generator, dict): generator = [generator] with open(filename, "w") as file: for event in generator: file.write("{}\n".format(event_to_string(event)))
[docs]def load(filename): """Loads events from file Parameters ---------- filename : str Filename which is used to load the data. """ with open(filename) as file: for line in file: yield string_to_event(line)
[docs]def throttle(generator, factor=1.): """Throttles event processing The events will be repressed until the passing time from the first event to the current one is at least factor times the event time between the first event and the current one. Parameters ---------- generator : iterable or single event Events factor : float factor of time delay, default 1.0 Notes ----- The time will be interpreted as seconds. """ init_time = None first_event = None if isinstance(generator, dict): generator = [generator] for event in generator: if not init_time: init_time = time() first_event = event["time"] sleep_time = (event["time"] - first_event) * factor - (time() - init_time) if sleep_time > 0: sleep(sleep_time) yield event
[docs]def jitter_equal(generator, interval=(-1, 1)): """Adds noise to the event time. This functions adds an equal distributed noise in the given interval to the time of the events. Parameters ---------- generator : iterable or single event Events interval : tuple or list 2 element tuple with min and max jitter Notes ----- Events could be out of order after adding jitter. """ if isinstance(generator, dict): generator = [generator] interval_length = interval[1] - interval[0] for event in generator: event["time"] += random() * interval_length + interval[0] yield event
[docs]def sort(generator, window_size=5.0): """Sorts events in an event stream. Elements of an generator will be sorted in the given time window size. Parameters ---------- generator : iterable or single event Events window_size : int or float time window size Notes ----- Events would be delayed until next event with timedelta > window_size arrives. """ if isinstance(generator, dict): generator = [generator] heap = [] for event in generator: t = event["time"] heapq.heappush(heap, (t, event)) while heap[0][0] < t - window_size: yield heapq.heappop(heap)[1] while heap: yield heapq.heappop(heap)[1]
if __name__ == "__main__": seed(2) events = [ {'receiver': [1, 5, "haus"], 'sender': ["maus", 2, 3], 'time': 0}, {'receiver': "maus", 'sender': "haus", 'time': 1}, {'receiver': "maus", 'sender': "haus", 'time': 2} ] for e in sort(jitter_equal(events, (-2, 2)), 1): print(e)