The data access is a Python tool used to access data associated to the various scenario instances of an OpenBACH platform. Optionally, the data access can be used to modify, create, or delete such data.
Please refer to the Introduction to OpenBACH-extra and OpenBACH API page to see how to install and set up your platform to use the API described in this page.
The data access contains tools to interact with and parse responses from the InfluxDB and ElasticSearch APIs used as part of the OpenBACH platform. These tools are hosted respectively in the data_access.influxdb_tools
and data_access.elasticsearch_tools
modules. Both modules are tied together in the data_access.collector
module which is considered the public interface of the data access. The data_access
package itself only export content from the collector module as its main class, for instance.
Each of these modules will return Scenario
objects from the data_access.result_data
module when asked for data about one or several scenario instances.
The data_access.async_collector
module simply wraps the methods of the data_access.collector.CollectorConnection
class into an asynchronous executor call.
Lastly the data_access.post_processing
module is an extension to the data_access.influxdb_tools
module that helps building jobs whose aim are to plot data from job instances data.
data_access.collector.CollectorConnection
, data_access.influxdb_tools.InfluxDBConnection
and data_access.elasticsearch_tools.ElasticSearchConnection
all share some common methods. They query the associated database and return a set containing the requested data. As you may imagine, InfluxDBConnection
will query the InfluxDB database to return its information, ElasticSearchConnection
will query the ElasticSearch database to return its information and CollectorConnection
will return the union of the sets returned by the other two classes.
On these methods, parameters are shared by both databases except for:
WHERE
clause, see below for informations on how to build conditions;WHERE time=…
or a clause WHERE (time >= …) AND (time ⇐ …)
.Other possible parameters are used to restrict the data gathered:
The following common methods are found on all three classes and return a set:
True
(only a pair)ElasticSearchConnection
. Optional parameters:
The following method uses data_access.result_data.Scenario
objects rather than mere identifiers:
Scenario
instances based off the data matching the given constraints. This is the only method that will give you access to the actual data generated by your OpenBACH scenarios. Dispatches to statistics in InfluxDBConnection
and logs in ElasticSearchConnection
. Optional parameters:SELECT
clause when querying InfluxDB, defaults to SELECT *
if not provided. Can also be a string to fine tune the select clause, in which case it will be used as-is (“SELECT {}”.format(fields)
); note that, in this case, you most likely want to include the OpenBACH tags manually or the Scenario
instance would be impossible to construct and return. Scenario
instance as parameter and dumps its data into both databases. Dispatches to import_job in both databases classes. Does not return anything.Log
and a Scenario
instance. Optional parameters:ElasticSearchConnection.orphans
return a Log
instance (first element of the pair) that holds ElasticSearch data that were not emitted using the collect-agent API. This is most likely warnings and errors generated by the controller or the agents themselves, out of a job execution;InfluxDBConnection.orphans
return a Scenario
instance (second element of the pair) that has no instance ID and whose Job
s consist of measurements found without the proper OpenBACH tags.
On top of the common methods, InfluxDBConnection
provide a few more specific methods:
Scenario
instance. Raw results are an iterable of pairs measurement_name, dictionary of a line of the measurement.
The data_access.influxdb_tools
module also provide utility functions:
Condition
instance from some OpenBACH tags, ready to be used in one of the following *_query
function.Scenario
instances; requires that the OpenBACH tags are included in the InfluxDB response.Scenario
instance; do not try to extract tags out of each row of data and consider every column as a statistic.Job
instance, ready to be imported into InfluxDB through the data_write method.
It also provide the Condition
classes hierarchy that can be used as the condition parameter in the timestamps, scenarios, remove_statistics, and orphans methods:
Operator
enum that hold comparison operators usable in InfluxDBConditionField(name, operator, value)
to compare a field value to the given one (example: ConditionField(“throughput”, Operator.GreaterThan, 100)
or ConditionField(“status”, Operator.Matches, “Failed”)
)ConditionTag(name, operator, value)
to compare a tag value to the given one (example: ConditionTag(“@scenario_instance_id”, Operator.Equal, 1234)
); automatically used by the aforementioned methods to wrap the optional arguments.ConditionTimestamp(operator, value, unit='ms', frow_now=False)
to compare the time of a record to the given value (example: ConditionTimestamp(Operator.LowerThan, 123456789)
); automatically used by the aforementionned methods to wrap the optional timestamps arguments through the ConditionTimestamp.from_timestamps
classmethod.ConditionAnd(*conditions)
construct a condition that must match all provided conditions to succeed.ConditionOr(*conditions)
construct a condition that must match any provided condition to succeed.
On top of the common methods, ElasticSearchConnection
provides a few more specific methods:
The query dictionaries must conform to the ElasticSearch Query DSL
The data_access.elasticsearch_tools
also provide some utility functions:
Log
instance from it; only uses records that does not include any OpenBACH tag.Log
instance and some metadata, ready to be imported into ElasticSearch through the data_write method.
Scenario
objects hold the state of a single scenario instance. It can also hold the state of sub-scenarios if they exist.
The following attributes are used to read data from a Scenario
instance:
instance_id
)Job
instances launched directly by this scenarioJob
instances recursively launched by this scenario and all its sub-scenariosScenario
instances corresponding to sub-scenarios launched directly by this scenarioScenario
instances corresponding to sub-scenarios recursively launched by this scenario and its sub-scenariosAgent
instances that groups jobs launched by this scenario on each agent they were launchedAgent
instances that recursively groups jobs launched by this scenario and all its sub-scenarios on each agent they were launched
Job
objects holds the state of a single job instance. They group statistics emitted by the job as well as logs it generated.
The following attributes are used to read the state of a Job
:
Log
instance holding logs sent by this jobStatistic
instance holding data generated by this job without suffix. Can also be called as a method to specify the desired suffix (e.g. job.statistics(suffix='Flow1')
).A collection of data generated by a single job instance under a given suffix.
Statistic
objects store their data into the dated_data
instance attribute which is a dictionary:
A collection of logs generated by a single job instance.
Log
objects store their data into the numbered_data instance attribute which is a dictionary:
_LogEntry
instances which provide the following attributes:
Each of the described classes have a .json
attribute that can serialize the content of an instance into a dictionary suitable to be turned into a JSON string by the json
module. They also have a .load(data)
class-method that can re-create an instance directly from such dictionary. The read_scenario(filename)
utility function is a shortcut to recreate a Scenario
instance from a file containing such JSON data.
The described classes also define some get_or_create_*
functions that rely on the _get_or_create(container, constructor, *key, args=None)
utility function: if the container does not contain the requested key, it will create it by calling the constructor with the unpacked args
(if args
is None
, it uses key
instead); it then returns container[key]
. This utility function is used to implement the following method that help populate various instances:
Scenario.get_or_create_subscenario(self, instance_id)
Scenario.get_or_create_job(self, name, instance_id, agent)
Agent.get_or_create_job(self, name, instance_id)
Job.get_or_create_statistics(self, suffix=None)
The get_or_create_scenario(scenario_id, cache)
utility function also make use of this mechanism to help maintain a dictionary (cache
) of Scenario
instances and ease building the owner/subscenario hierarchy.
Lastly, extract_jobs(scenario)
is a utility generator that is alike to scenario.jobs
but instead of Job
instances yields triplets of (scenario_instance_id, owner_scenario_instance_id, Job_instance)
.
The following example demonstrate simple usage of data retrieval and import:
from data_access import CollectorConnection def main(src_address, dest_address, scenario_id): src_collector = CollectorConnection(src_address) scenario, = src_collector.scenarios(scenario_instance_id=scenario_id) # Note the comma as we're unpacking a generator dest_collector = CollectorConnection(dest_address) dest_collector.import_scenario(scenario) if __name__ == '__main__': src = input('Address of the collector to extract data from: ') dest = input('Address of the collector to import data into: ') scenario = input('Scenario instance ID to transfer between collectors: ') main(src, dest, scenario)