#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
This file provides tools for event handling.
"""
from __future__ import print_function, division, unicode_literals, absolute_import, generators
from time import sleep, time
from random import random, seed
import heapq
__author__ = "Pascal Held"
__email__ = "paheld@gmail.com"
[docs]class MakeID(object):
"""Converter object for sender/receiver labels.
"""
ids = {}
ids_reverse = {}
[docs] def make_id(self, generator):
"""Replaces all sender and receiver values by integer keys.
Parameters
----------
generator : iterable or single event
Events
"""
if isinstance(generator, dict):
generator = [dict]
for event in generator:
event = make_lists(event)
for field in ("sender", "receiver"):
if not field in event:
continue
id_list = []
for elem in event[field]:
eid = self.ids.get(elem, None)
if eid is None:
eid = len(self.ids)
self.ids[elem] = eid
self.ids_reverse[eid] = elem
id_list.append(eid)
event[field] = id_list
yield make_literals(event)
__call__ = make_id
[docs]def make_lists(event):
"""Converts all sender and receiver literals into lists
"""
if not isinstance(event["sender"], (list, tuple)):
event["sender"] = [event["sender"]]
if "receiver" in event and not isinstance(event["receiver"], (list, tuple)):
event["receiver"] = [event["receiver"]]
return event
[docs]def make_literals(event):
"""Converts all sender and receiver lists into literals if they contain only one element
"""
if isinstance(event["sender"], (list, tuple)) and len(event["sender"]) == 1:
event["sender"] = event["sender"][0]
if "receiver" in event and isinstance(event["receiver"], (list, tuple)) and len(event["receiver"]) == 1:
event["receiver"] = event["receiver"][0]
return event
[docs]def event_to_string(event):
"""
Converts the Event-Dictionary into a string representation.
Parameters
----------
event : dict
Event in dict representation
Returns
-------
line : str
String representation of the event
Notes
-----
Example: ::
>>> event_to_string({"time":1,"sender":[1,2,3],"receiver":[4,5,6]})
'1;1,2,3;4,5,6'
>>> event_to_string({"time":1,"sender":[1,2,3]})
'1;1,2,3'
>>> event_to_string({"time":1,"sender":1,"receiver":2})
'1;1;2'
"""
if isinstance(event["sender"], (list, set)):
sender = ",".join([str(x) for x in event["sender"]])
else:
sender = str(event["sender"])
if "receiver" in event:
if isinstance(event["receiver"], (list, set)):
receiver = ",".join([str(x) for x in event["receiver"]])
else:
receiver = str(event["receiver"])
return "{};{};{}".format(event["time"], sender, receiver)
else:
return "{};{}".format(event["time"], sender)
[docs]def string_to_event(string):
"""
Converts the string representation of an event into the dictionary representation.
Parameters
----------
string : str
Event in string representation
Returns
-------
event : dict
Dictionary representation of the event
Notes
-----
Example: ::
>>> event = string_to_event('1;1,2,3;4,5,6')
>>> event == {'time': 1, 'sender': [1, 2, 3], 'receiver': [4, 5, 6]}
True
>>> event = string_to_event('1;1,2,3')
>>> event == {'time': 1, 'sender': [1, 2, 3]}
True
>>> event = string_to_event('1;1;2')
>>> event == {'time': 1, 'sender': 1, 'receiver': 2}
True
"""
string = string.strip()
parts = string.split(";")
if not 2 <= len(parts) <= 3:
raise ValueError
event = {}
try:
event["time"] = int(parts[0])
except ValueError:
event["time"] = float(parts[0])
senders = []
for s in parts[1].split(","):
try:
senders.append(int(s))
except ValueError:
senders.append(s)
event["sender"] = senders
if len(parts) > 2:
receivers = []
for r in parts[2].split(","):
try:
receivers.append(int(r))
except ValueError:
receivers.append(r)
event["receivers"] = receivers
return make_literals(event)
[docs]def simplify(generator):
"""Simplifies the compact representation with multiple sender and receivers.
The functions uses an iterator of events and creates for every sender-receiver
combination a new event.
Parameters
----------
generator : iterable or single event
Events
"""
# Maybe only one event
if isinstance(generator, dict):
generator = [generator]
for event in generator:
event = make_lists(event)
sender = event["sender"]
try:
receiver = event["receiver"]
for s in sender:
for r in receiver:
yield {
"time": event["time"],
"sender": s,
"receiver": r
}
# Maybe no receiver
except KeyError:
for s in sender:
yield {
"time": event["time"],
"sender": s
}
[docs]def save(generator, filename):
"""Saves events to file
Parameters
----------
generator : iterable or single event
Events
filename : str
Filename which is used to save the file.
"""
#maybe only one event
if isinstance(generator, dict):
generator = [generator]
with open(filename, "w") as file:
for event in generator:
file.write("{}\n".format(event_to_string(event)))
[docs]def load(filename):
"""Loads events from file
Parameters
----------
filename : str
Filename which is used to load the data.
"""
with open(filename) as file:
for line in file:
yield string_to_event(line)
[docs]def throttle(generator, factor=1.):
"""Throttles event processing
The events will be repressed until the passing time from the first event to the current
one is at least factor times the event time between the first event and the current one.
Parameters
----------
generator : iterable or single event
Events
factor : float
factor of time delay, default 1.0
Notes
-----
The time will be interpreted as seconds.
"""
init_time = None
first_event = None
if isinstance(generator, dict):
generator = [generator]
for event in generator:
if not init_time:
init_time = time()
first_event = event["time"]
sleep_time = (event["time"] - first_event) * factor - (time() - init_time)
if sleep_time > 0:
sleep(sleep_time)
yield event
[docs]def jitter_equal(generator, interval=(-1, 1)):
"""Adds noise to the event time.
This functions adds an equal distributed noise in the given interval to the time of the events.
Parameters
----------
generator : iterable or single event
Events
interval : tuple or list
2 element tuple with min and max jitter
Notes
-----
Events could be out of order after adding jitter.
"""
if isinstance(generator, dict):
generator = [generator]
interval_length = interval[1] - interval[0]
for event in generator:
event["time"] += random() * interval_length + interval[0]
yield event
[docs]def sort(generator, window_size=5.0):
"""Sorts events in an event stream.
Elements of an generator will be sorted in the given time window size.
Parameters
----------
generator : iterable or single event
Events
window_size : int or float
time window size
Notes
-----
Events would be delayed until next event with timedelta > window_size arrives.
"""
if isinstance(generator, dict):
generator = [generator]
heap = []
for event in generator:
t = event["time"]
heapq.heappush(heap, (t, event))
while heap[0][0] < t - window_size:
yield heapq.heappop(heap)[1]
while heap:
yield heapq.heappop(heap)[1]
if __name__ == "__main__":
seed(2)
events = [
{'receiver': [1, 5, "haus"], 'sender': ["maus", 2, 3], 'time': 0},
{'receiver': "maus", 'sender': "haus", 'time': 1},
{'receiver': "maus", 'sender': "haus", 'time': 2}
]
for e in sort(jitter_equal(events, (-2, 2)), 1):
print(e)