User Tools

Site Tools


Sidebar

openbach:exploitation:reference_scenarios:network:rate:complex_scenario

Complex Rate Scenario

The following scenario allows to compare different parameters (MTU size, ToS, number of parallel flows) on UDP/TCP mode, with different number of iterations per test and with a post-processing phase allowing to plot timeseries of the Throughput results per test and the CDF. This is done thanks to the used of the scenario builder and data_access tools, as well as the auditoriums scripts.

The scenario is created, launched and post-processed within the same script. For that, you should just correctly configure (mainly your project name and entity names, and the configuration of your test) and launch the script within the auditoriums scripts directory as follows:

 # ./rate_metrology.py 

Below, we describe the different script parts:

Variables/constants

The declaration of different parameters:

  • Related to the project/scenario name.
  • The configuration of measurement jobs (iperf3/nutcp): number of parallel flows, mtu size, ToS, UDP/TCP, iterations.
  • Initialisation of some variables for the post-processing.
# Scenario/project information
SCENARIO_NAME = 'Rate_Metrology'
SCENARIO_JSON_FILENAME = '{}.json'.format(SCENARIO_NAME)
SCENARIO_DESC = 'Rate metrology scenario for measuring network bandwidth'
PROJECT_NAME ='rate_jobs'
OVERRIDE = True # Override exising scenario with the same name
 
# Jobs configuration
CLIENT = 'client' # OpenBACH entity name of the client
SERVER = 'server' # OpenBACH entity name of the server
JOBS = ['iperf3', 'nuttcp']  # The list of job names to test (iperf3, nuttcp), default: jobs = ["iperf3"]
PARALLEL_FLOWS = [1, 5] # A list with the number of parallel flows to launch, default: parallel_flows = [1]
MTU_SIZES = [800, 1200] # A list with the mtu sizes to test, default: mtu_sizes = [1100]
TOS_VALUES = ['0x00'] # A list wit the ToS values to test, default: tos_values = ["0x00"]
interval = 0.5 # The stats interval in seconds between the sent of stats (default 1)
iterations = 1 # Number of times you perfom the same test in order to obtain an average (min: 1)
UDP = True # True if you want to perform UDP tests
if UDP: #FYI: UDP tests are only performed with nuttcp job
    JOBS = ['nuttcp']
 
udp_rate_limits = [15000000, 17000000] # min and max UDP rate to test (in b/s)
udp_rate_steps = 4000000 # in b/s
# Postprocessing information
POSTPROC = {}  # Dictionary for saving scenario information used in post-processing
POSTPROC['description'] = 'jobs instance id information for post-processing'
plot = []
colors = 'bgrcmykw'
markers = ['+', '.', 'x', '*', 9, 10, 5, 4]

The main is composed of the:

  1. Creation of the scenario. This is done thanks to the auditoriums scripts, that will also allow to start/monitor the scenario.
  2. Building of the scenario: by means of the scenario builder tool, allowing to generate your scenario (several nuttcp/iperf clients and servers launched with different parameters). See the function create_scenario() used in this part of the code for building the scenario.
  3. Launch of the scenario and wait for its finalisation.
  4. Post-processing of the collected data from jobs, compute an average and plot the results by means of matplotlib. The function used to recover the job instance id of each test is detailed here. An the used function for create/print the graphs is detailed here.
def main():
 
    #### Use of auditorium script for creating/configuring and starting the Scenario ####
    labels = []
    stat_mean_throughput = []
    stat_throughput = []
    # Check if the scenario already exists
    try:
        scenario = GetScenario()
        scenario.parse(['--project', PROJECT_NAME, SCENARIO_NAME])
        r = scenario.execute()
        r.raise_for_status()
        print('Scenario {} already exists in the project {}.'.format(SCENARIO_NAME, PROJECT_NAME))
    except requests.exceptions.HTTPError as ex:
        # if it does not exist
        if ex.response.status_code == 404:
            try:
              scenario_create_json = create_scenario()
              create_scenario = CreateScenario()
              create_scenario.parse(['--project', PROJECT_NAME, SCENARIO_JSON_FILENAME])
              r = create_scenario.execute()
              r.raise_for_status()
              print('Scenario {} has beeen successfully created in the project {}.'.format(SCENARIO_NAME, PROJECT_NAME))
            except requests.exceptions.HTTPError as ex:
                raise ValueError('Error creating scenario' + ex)
    else:
        # if it does exist
        if OVERRIDE:
            try:
                scenario_create_json = create_scenario()
                modify_scenario = ModifyScenario()
                modify_scenario.parse(['--project', PROJECT_NAME, SCENARIO_NAME, SCENARIO_JSON_FILENAME])
                r = modify_scenario.execute()
                r.raise_for_status()
                print('Scenario {} has beeen modified in the project {}.'.format(SCENARIO_NAME, PROJECT_NAME))
            except requests.exceptions.HTTPError as ex:
                 raise ValueError('Error modifying scenario' + ex)
        else:
            raise ValueError('Scenario {} already exists in the project {}'.format(SCENARIO_NAME, PROJECT_NAME))
 
    # Start scenario_instance and Get the scenario_instance_id
    print('# ==> Starting Scenario ...')
    start = StartScenarioInstance()
    start.parse(['--project', PROJECT_NAME, SCENARIO_NAME])
    response = start.execute(False)
    try:
        scenario_id = response.json()['scenario_instance_id']
        print("Scenario instance id: ", scenario_id)
    except KeyError as ex:
        raise KeyError('Error starting scenario instance' + ex)
    except ValueError:
        raise ValueError('Error starting scenario instance' + ex)
 
    # Save scenario id for postprocessing
    POSTPROC['scenario_id'] = scenario_id
    # Get scenario status
    status_scenario = StatusScenarioInstance()
    status_scenario.parse([str(scenario_id)])
    req_status = status_scenario.execute(False)
    status = req_status.json()['status']
 
    # Wait for scenario to finish
    while (status == "Scheduled" or status ==
           "Scheduling" or status == "Running"):
        time.sleep(5)
        req_status = status_scenario.execute(False)
        status = req_status.json()['status']
        print('Scenario {0} status: {1} '.format(SCENARIO_NAME, status))
    if (status == "Finished KO" or status == "Stopped"):
        raise ValueError('Error/problem during scenario')
    else:
          print("Scenario {} is finished OK".format(SCENARIO_NAME))
 
    # Get job instance ids of jobs for postprocessing
    req_status = status_scenario.execute(False)
    save_job_id(req_status)
    print('# ==> Your dictionary with the required post-processing information')
    print (json.dumps(POSTPROC, indent=4))
 
    #### Post-processing and plotting graphs ####
    # Connect to OpenBACH collector for retrieving data with the data access API
    success = collect_agent.register_collect(
            '/opt/openbach/agent/jobs/postprocess_stats/'
            'postprocess_stats_rstats_filter.conf')
    if not success:
        raise ValueError('Error connecting to OpenBACH collector')
    requester = CollectorConnection('localhost')
 
    # Import results from Collector Database using data access API
 
    for test in POSTPROC['plot']:
        if test['iteration'] == 0 and len(stat_throughput) != 0: # if the new group of iterations begins --> compute the mean over all iterations of the previous test type
             stat_mean_throughput.append([sum(t)/len(t) for t in zip(*stat_throughput)])
             labels.append(last_test)
             stat_throughput = []
 
        stat_name = 'throughput' if test['job'] == 'iperf3' else 'rate' #for nuttcp, the stat name is mean_rate
        try:
            scenario, = requester.scenarios(scenario_instance_id=scenario_id, job_name=test['job'])
        except Exception as ex:
            raise ValueError('Error getting stats from collector {}'.format(ex))
 
        job_key = (test['job'], test['job_instance_id'], test['agent'])
        job_data = scenario.job_instances[job_key]
        print(job_data.json)
        for series in job_data.json['statistics']:
            flow = series['suffix'] # In case different flows are launched in parallel: each flow is saved with a suffix
            if test['job'] == 'nuttcp' or (test['job'] == 'iperf3' and flow == None and len(job_data.json['statistics']) > 1) or (test['job'] == 'iperf3' and len(job_data.json['statistics']) == 1 and flow == "Flow1"):
                stat_throughput.append([stat[stat_name] for stat in series['data'][0:-1]])
        if test == POSTPROC['plot'][-1]:
            stat_mean_throughput.append([sum(t)/len(t) for t in zip(*stat_throughput)])
            labels.append((test['plot_name']))
        last_test = test['plot_name']
 
    # Plot graphs
    name_file = '{0}-id{1}'.format(SCENARIO_NAME, scenario_id)
    plt_cdf = plot_figure('CDF', 'Throughput (b/s)', 'CDF of Throughput', 'cdf',
                    'cdf-{}.png'.format(name_file), stat_mean_throughput, labels)
    plt_throughput = plot_figure('Throughput (b/s)', 'Time (s)', 'Comparison of throughput', 'timeseries',
                'throughput-{}.png'.format(name_file), stat_mean_throughput, labels, interval)
    plt_cdf.show()
    plt_throughput.show()

The whole script can be found in https://forge.net4sat.org/openbach/openbach-extra/blob/master/scenario_examples/rate_scenario/rate_metrology.py. Some examples of results are shown below:

openbach/exploitation/reference_scenarios/network/rate/complex_scenario.txt ยท Last modified: 2019/06/11 16:21 (external edit)