Event Processing¶
Overview¶
The Event Processing library allows you to run arbitrary code (event handlers) at predefined events (also called hooks) within the PredictiveGrid platform. A sample use case might be if you would like to upload new COMTRADE files using the COMTRADE ingress and have a transform or analytical process run immediately after import.
Notifications¶
After your event handler is executed, an email is sent to the success or failure email address supplied during registration. If successful, the email will contain any text that was returned by your Python function. If an exception is raised, then the handler code is regarded as unsuccesful and a notification is sent to the corresponding failure email address with exception details.
Dependencies¶
Our platform will provide a base python 3.7.3 environment similar to our Jupyter servers. We provide some standard libraries such as:
btrdb
matplotlib
numpy
pandas
ray
scikit-learn
scipy
statsmodels
tensorflow
torch
torchvision
Connections¶
Each of the following calls require a Connection object (different from the btrdb library connection object) as the first argument. When registering a new event handler, the API key in the Connection object will be used as the execution time security context - meaning all code will be executed as the registering user.
The Connection object requires the BTrDB address/port as well as an API key as follows.
>>> from btrdbextras import Connection
>>> conn = Connection("api.example.com:4411", "C27489F2BFACE794A3")
However, the Connection object will also look for the BTRDB_ENDPOINTS and BTRDB_API_KEY environment variables if no arguments are supplied.
>>> from btrdbextras import Connection
>>> conn = Connection()
Interactions¶
The following calls can be made to the platform to manage your event handling code.
Listing Known Hooks¶
You can think of hooks as just labels that represent a specific point during the execution of some process (such as importing data). In order to see the allowed hooks, you may invoke the hooks function. Over time, new hooks will be added to the platform allowing you to integrate your own code into different processes.
>>> from btrdbextras import Connection
>>> from btrdbextras.eventproc import hooks
>>> conn = Connection()
>>> hooks(conn)
['ctingress.on_complete']
Registering Event Handlers¶
The event handlers are supplied by you. This is code you would like executed when a specific hook/event has occurred within the larger platform. Your Python callable, typically just a function, will be pickled using the dill library for execution in the platform. As such there are certain Python code serialization topics to keep in mind such as: total size of the callable, libraries available in the execution environment, etc.
To submit a new event handler, you can use the register decorator around your own Python callable. Like all objects in this library, you can use the help function to view the docstring.
>>> from btrdbextras.eventproc import register
>>> help(register)
Help on function register in module btrdbextras.eventproc.eventproc:
register(conn, name, hook, notify_on_success, notify_on_failure, tags=None)
decorator to submit (register) an event handler function
Parameters
----------
conn: Connection
btrdbextras Connection object containing a valid address and api key.
name: str
Friendly name of this event handler for display purposes.
hook: str
Name of the hook that this event handler responds to.
notify_on_success: str
Email address of user to notify when event handler completes successfully.
notify_on_failure: str
Email address of user to notify when event handler does not complete
successfully.
tags: list of str
Filtering tags that users can choose when identifying handlers to
execute. An empty list will match all tags.
As you can see, this decorator does have required arguments. A trivial example is shown below.
>>> from btrdbextras import Connection
>>> from btrdbextras.eventproc import register
>>> conn = Connection()
>>>
>>> @register(
... conn=conn,
... name="trivial-handler",
... hook="ctingress.on_complete",
... notify_on_success="success@example.com",
... notify_on_failure="failure@example.com",
... tags=["demo", "anomaly-detection"]
... )
... def trivial(btrdb, *args, **kwargs):
... print(args, kwargs)
... return "completed successfully"
The library will also allow you to update an existing event handler using the register decorator. No extra steps are needed, just register the new version with the same hook and name, and the old one will get replaced.
Listing Registered Handlers¶
To view the existing event handlers, call the list_handlers function and a list of Handler objects will be returned. You may also provide an optional hooks argument in order to filter the results to only the hook/event you are interested in.
>>> from btrdbextras import Connection
>>> from btrdbextras.eventproc import list_handlers
>>> conn = Connection()
>>>
>>> list_handlers(conn)
[Handler(id=3, name='sample-73', hook='ctingress.on_complete', version=0, notify_on_success='success@example.com', notify_on_failure='failure@example.com', tags=['red', 'blue'], created_at=datetime.datetime(2020, 10, 21, 21, 35, 20, 365664, tzinfo=<UTC>), created_by='allen', updated_at=datetime.datetime(2020, 10, 21, 21, 35, 20, 365664, tzinfo=<UTC>), updated_by='allen')]
The Handler object is just a lightweight namedtuple and does not offer any functionality itself.
Deleting Existing Handlers¶
The remove an existing event handler, a deregister function is available. It requires only the ID of your handler which was determined when you first registered. You can use list_handlers to find the ID for your handler if needed.
>>> from btrdbextras import Connection
>>> from btrdbextras.eventproc import list_handlers, deregister
>>> conn = Connection()
>>>
>>> list_handlers(conn)
[Handler(id=5, name='trivial-handler', hook='ctingress.on_complete', version=0, notify_on_success='success@example.com', notify_on_failure='failure@example.com', tags=['demo', 'anomaly-detection'], created_at=datetime.datetime(2020, 10, 23, 15, 6, 48, 390720, tzinfo=<UTC>), created_by='allen', updated_at=datetime.datetime(2020, 10, 23, 15, 6, 48, 390720, tzinfo=<UTC>), updated_by='allen')]
>>>
>>> deregister(conn, 5)
True
Writing Handlers¶
Code executing in a handler can import and use packages and read/write files, similarly to how the handler function works in the environment you defined/registered the handler in.
Using Packages¶
Any packages used within a handler must be imported within that handler as follows.
>>> def import_handler(*args, **kwargs):
... import pandas as pd
... df = pd.DataFrame([['a', 'b'], ['c', 'd']],
index=['row 1', 'row 2'],
columns=['col 1', 'col 2'])
... return "done"
The exception to this is the btrdbextras.eventproc module, which is already imported as ep in the handler’s execution environment. As a result, both of the following uses of the eventproc module are valid.
>>> def upload_handler_v1(*args, **kwargs):
... from btrdbextras.eventproc import upload_file
... path = "demo_filepath"
... f = open(path, "w")
... f.write("hello world!")
... f.close()
... url = upload_file(path, name_on_download)
... return url
>>>
>>> def upload_handler_v2(*args, **kwargs):
... path = "demo_filepath"
... f = open(path, "w")
... f.write("hello world!")
... f.close()
... url = ep.upload_file(path, name_on_download)
... return url
Working with Files¶
Any file created by a handler will be destroyed after the handler completes, and will not persist in the handler’s execution environment. To access files written by a handler outside of the handler’s execution context, use the upload_file function. For example, the following code registers a handler that creates and uploads an excel file. The return value is a link to download the file (username and password authentication will be necessary for file download).
>>> @register(
... conn=conn,
... name="excel-demo",
... hook="ctingress.on_complete",
... notify_on_success="success@example.com",
... notify_on_failure="failure@example.com",
... tags=["excel-demo"]
... )
>>>
>>> def upload_handler(*args, **kwargs):
... import pandas as pd
... path = "output.xlsx"
... name_on_download = "excel_test.xlsx"
... df = pd.DataFrame([['a', 'b'], ['c', 'd']],
index=['row 1', 'row 2'],
columns=['col 1', 'col 2'])
... df.to_excel(path)
... url = ep.upload_file(path, name_on_download)
... return url
Troubleshooting¶
At the moment there are only a few troubleshooting tips however we expect to add more content over time.
Most importantly, ensure you are using Python 3.7.3. This is the version supplied by our Jupyter servers so you should be fine on this front unless you are making calls on your local machine. In the future we plan on providing support for all versions of Python greater than 3.6.0.
Hooks¶
The available hooks are listed below. Over time new hooks will be added to the platform.
ctingress.on_complete¶
Event handlers for this hook will be executed after processing an entire archive of COMTRADE files. The signature for your event handler should match the following arguments:
- handler(btrdb, DataSets, File, RequestID, SubmittedAt, **kwargs):
This function is executed after the COMTRADE ingress processes a new archive containing COMTRADE data. All inputs are sent as keyword arguments.
Implementers will find the ‘DataSets’ parameter to be the most useful as it will contain a list of dictionaries representing the status of each COMTRADE config/data file pairing. If an error is encountered this will be displayed in the ‘Error’ key. Similarly, if no issues were encountered the ‘Success’ key will equal True. The ‘Name’ key represents the config file path within the archive. The ‘Streams’ key contians a list of dictionaries representing individual streams within the COMTRADE file. Key/values for stream status include the start and end time of the included data (StartNs, EndNs), the time ingress began processing the stream (ProcessedAtNs), and the UUID of the stream.
While listed in documentation, **kwargs is not needed at this time though further arguments may be added in the future.
- btrdb
A Python btrdb connection object for querying the time series database.
- DataSets
A list of dictionaries representing the processing status of each COMTRADE config/data file pair.
- File
The name of the uploaded archive file containing COMTRADE file pairs.
- RequestID
A unique identifier for this submission.
- SubmittedAt
The date/time for this submission.
Assuming you chose to use variadic arguments (i.e., **kwargs) in your event handler, the input would look similar to the following. This sample shows an archive file that included one pair of config/data COMTRADE files along with an erroneous file to show what an error would look like.
{
'DataSets': [
{
'Error': '',
'Name': '/DFR 2/200610,230000000,-4t,R108-North Anna #1,APP601,Dominion,M1094.cfg',
'Streams': [{'EndNs': 1591844539983333333,
'ProcessedAtNs': 0,
'StartNs': 1591844350000000000,
'Uuid': '585cc764-3c80-5d3f-a7d7-59d48abbab0b'},
{'EndNs': 1591844539983333333,
'ProcessedAtNs': 0,
'StartNs': 1591844350000000000,
'Uuid': 'ee614a71-803a-511e-bbb7-6fb84210228d'},
{'EndNs': 1591844539983333333,
'ProcessedAtNs': 0,
'StartNs': 1591844350000000000,
'Uuid': '6575e59b-e486-5630-8596-9ef657f65a99'},
{'EndNs': 1591844539983333333,
'ProcessedAtNs': 0,
'StartNs': 1591844350000000000,
'Uuid': '801674d5-d6eb-5031-84f6-8746dce0761c'},
{'EndNs': 1591844539983333333,
'ProcessedAtNs': 0,
'StartNs': 1591844350000000000,
'Uuid': '597f8584-2acb-5a92-85bd-0002733fb920'}],
'Success': True
},
{
'Error': 'cfg format error',
'Name': '/__MACOSX/DFR 2/._200610,230000000,-4t,R108-North Anna #1,APP601,Dominion,M1094.cfg',
'Streams': None,
'Success': False
}
],
'File': 'comtrade_samples.zip',
'RequestID': '0bbcacdb-e7aa-4e01-af88-9ba865ea32bd',
'SubmittedAt': '2020-10-29T21:58:15.09246084Z'
}