Source code for dynamix.events.tools

#!/usr/bin/env python
# -*- coding: utf-8 -*-

"""
This file provides tools for event handling.

"""

from __future__ import print_function, division, unicode_literals, absolute_import, generators

__author__ = "Pascal Held"
__email__ = "paheld@gmail.com"


[docs]class MakeID(object): """Converter object for sender/reciever labels. """ ids = {} ids_reverse = {}
[docs] def make_id(self, generator): """Replaces all sender and receiver values by integer keys. Parameters ---------- generator : iterable or single event Events """ if isinstance(generator, dict): generator = [dict] for event in generator: event = make_lists(event) for field in ("sender","reciever"): if not field in event: continue id_list = [] for elem in event[field]: eid = self.ids.get(elem, None) if eid is None: eid = len(self.ids) self.ids[elem] = eid self.ids_reverse[eid] = elem id_list.append(eid) event[field] = id_list yield make_literals(event)
__call__ = make_id
[docs]def make_lists(event): """Converts all sender and receiver literals into lists """ if not isinstance(event["sender"], (list, tuple)): event["sender"] = [event["sender"]] if "reveiver" in event and not isinstance(event["reveiver"], (list, tuple)): event["reveiver"] = [event["reveiver"]] return event
[docs]def make_literals(event): """Converts all sender and receiver lists into literals if they contain only one element """ if isinstance(event["sender"], (list, tuple)) and len(event["sender"]) == 1: event["sender"] = event["sender"][0] if "reveiver" in event and isinstance(event["reveiver"], (list, tuple)) and len(event["reveiver"] == 1): event["reveiver"] = event["reveiver"][0] return event
[docs]def event_to_string(event): """ Converts the Event-Dictionary into a string representation. Parameters ---------- event : dict Event in dict representation Returns ------- line : str String representation of the event Notes ----- Example: :: >>> event_to_string({"time":1,"sender":[1,2,3],"receiver":[4,5,6]}) '1;1,2,3;4,5,6' >>> event_to_string({"time":1,"sender":[1,2,3]}) '1;1,2,3' >>> event_to_string({"time":1,"sender":1,"receiver":2}) '1;1;2' """ if isinstance(event["sender"], (list, set)): sender = ",".join([str(x) for x in event["sender"]]) else: sender = str(event["sender"]) if "receiver" in event: if isinstance(event["receiver"], (list, set)): receiver = ",".join([str(x) for x in event["receiver"]]) else: receiver = str(event["receiver"]) return "{};{};{}".format(event["time"], sender, receiver) else: return "{};{}".format(event["time"], sender)
[docs]def string_to_event(string): """ Converts the string representation of an event into the dictionary representation. Parameters ---------- string : str Event in string representation Returns ------- event : dict Dictionary representation of the event Notes ----- Example: :: >>> event = string_to_event('1;1,2,3;4,5,6') >>> event == {'time': 1, 'sender': [1, 2, 3], 'reciever': [4, 5, 6]} True >>> event = string_to_event('1;1,2,3') >>> event == {'time': 1, 'sender': [1, 2, 3]} True >>> event = string_to_event('1;1;2') >>> event == {'time': 1, 'sender': 1, 'reciever': 2} True """ string = string.strip() parts = string.split(";") if not 2 <= len(parts) <= 3: raise ValueError event = {} try: event["time"] = int(parts[0]) except ValueError: event["time"] = float(parts[0]) senders = [] for s in parts[1].split(","): try: senders.append(int(s)) except ValueError: senders.append(s) event["sender"] = senders if len(parts) > 2: recievers = [] for r in parts[2].split(","): try: recievers.append(int(r)) except ValueError: recievers.append(r) event["reciever"] = recievers return make_literals(event)
[docs]def simplify(generator): """Simplifies the compact representation with multiple sender and receivers. The functions uses an iterator of events and creates for every sender-receiver combination a new event. Parameters ---------- generator : iterable or single event Events """ # Maybe only one event if isinstance(generator, dict): generator = [generator] for event in generator: event = make_lists(event) sender = event["sender"] try: receiver = event["receiver"] for s in sender: for r in receiver: yield { "time": event["time"], "sender": s, "receiver": r } # Maybe no receiver except KeyError: for s in sender: yield { "time": event["time"], "sender": s }
[docs]def save(generator, filename): """Saves events to file Parameters ---------- generator : iterable or single event Events filename : str Filename which is used to save the file. """ #maybe only one event if isinstance(generator, dict): generator = [generator] with open(filename, "w") as file: for event in generator: file.write("{}\n".format(event_to_string(event)))
[docs]def load(filename): """Loads events from file Parameters ---------- filename : str Filename which is used to load the data. """ with open(filename) as file: for line in file: yield string_to_event(line)
if __name__ == "__main__": events = [ {'reciever': [1,5,"haus"], 'sender': ["maus", 2, 3], 'time': 0}, {'reciever': "maus", 'sender': "haus", 'time': 1} ] ids = MakeID() for e in MakeID(events): print(e) print(ids.ids) print(ids.ids_reverse)