User Tools

Site Tools


Sidebar

openbach:exploitation:reference_scenarios:network:rate:new_scenario_dev

The following scenario lib allows to launch nuttcp to measure link capacity by means of an UDP flow.

lib_rate_subscenario.py
def analyse_rate(
        scenario, server_entity, client_entity, command_port,
        server_ip, port, receiver, duration, rate_limit,
        wait_finished=None, wait_launched=None, wait_delay=0):
    server = scenario.add_function(
            'start_job_instance',
            wait_finished=wait_finished,
            wait_launched=wait_launched,
            wait_delay=wait_delay)
    server.configure(
            'nuttcp', server_entity, offset=0,
            command_port=command_port, server={})
 
    client = scenario.add_function(
            'start_job_instance',
            wait_launched=[server],
            wait_delay=2)
    client.configure(
            'nuttcp', client_entity, offset=0,
            command_port=command_port,
            client={
                'server_ip': server_ip,
                'port': port,
                'receiver': receiver,
                'duration': duration,
                'rate_limit': rate_limit,
                'udp': {},
            })
 
    stopper = scenario.add_function('stop_job_instance', wait_finished=[client])
    stopper.configure(server)
 
    return [server]
 

The scenario lib above can then be used to build an OpenBACH scenario (with the scenario_builder), to launch the scenario and monitor the nuttcp stats it (with the ScenarioObserver). And finally to launch the post-processing job time-series to plot the results in terms of rate (b/s).

build_scenario.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
 
from scenario_observer import ScenarioConstructor
 
import scenario_builder as sb
from scenario_builder.openbach_functions import StartJobInstance
 
from lib.lib_owd_subscenario import (analyse_one_way_delay)
 
def extract_jobs_to_postprocess(scenario):
    for function_id, function in enumerate(scenario.openbach_functions):
        if isinstance(function, StartJobInstance):
            if function.job_name == 'nuttcp':
                if 'client' in function.start_job_instance['nuttcp']:
                    yield function_id
 
 
def build_metrology_scenario(
        client1, server1, rate_test_duration):
    scenario = sb.Scenario(
            'QoS metrics',
            'This scenario analyse the path between server and client in '
            'terms of and one-way delay (s)')
    scenario.add_constant('com_port', '6000')
    scenario.add_constant('udp_rate', '30M')
    scenario.add_constant('duration', str(rate_test_duration))
    scenario.add_argument('port', 'The port of the nuttcp/iperf3 server')
    scenario.add_argument('dest_ip_server_A', 'ServerA IP')
    scenario.add_argument('interface_server_A', 'Iface of serverA')
 
    wait_finished = []
    wait = analyse_rate(
                scenario, server1, client1, '$com_port',
                '$dest_ip_server_A', '$port', 'False',
                '$duration', '$udp_rate', wait_finished, wait_delay=2)
 
    return scenario  
 
 
def main(project_name='Rate_metrology', scenario_name='Rate test', client1='my_client_entity', server1='my_server_entity', server1_IP='192.168.42.1'):
    observer = ScenarioObserver()
    observer.parse()
    if not observer.args.project:
        observer.args.project = project_name
    if not observer.args.name:
        observer.args.name = scenario_name
 
 
    metrology_scenario = build_metrology_scenario(
            client1, server1,
            observer.args.duration)
 
    scenario = sb.Scenario(
            observer.args.name,
            'This scenario aims at measuring the UDP Rate (b/s) of a link')
 
    start_metrology = scenario.add_function(
            'start_scenario_instance',
            wait_launched=[start_emulation],
            wait_delay=30)
    start_metrology.configure(
            metrology_scenario,
            dest_ip_server_A=server1_IP,
 
    entity = observer.args.post_processing
    post_processed = [
            [start_metrology, function_id]
            for function_id in extract_jobs_to_postprocess(metrology_scenario)
    ]
 
    time_series = scenario.add_function(
            'start_job_instance',
            wait_finished=[start_metrology],
            wait_delay=2)
    time_series.configure(
            'time_series', entity, offset=0,
            jobs=[post_processed],
            statistics=[['rate']],
            label=[['Transmitted data (bits)']],
            title=[['Validation of rate']])
 
    observer.launch_and_wait(scenario)
 
 
if __name__ == '__main__':
    main()
openbach/exploitation/reference_scenarios/network/rate/new_scenario_dev.txt ยท Last modified: 2019/06/11 16:21 (external edit)