Source code for gearshift.core

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 European Union;
# Licensed under the EUPL, Version 1.2 or – as soon they will be approved by the European Commission
# – subsequent versions of the EUPL (the "Licence");
#
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
"""
Functions and `dsp` model to processes a JR-Shift input file.

Sub-Modules:

.. currentmodule:: gearshift.core

.. autosummary::
    :nosignatures:
    :toctree: core/

    load
    model
    write
"""
import logging
import schedula as sh
import os.path as osp
from .load import dsp as _load
from .model import dsp as _model
from .write import dsp as _write

log = logging.getLogger(__name__)

dsp = sh.BlueDispatcher(name="core", description="Processes a GEARSHIFT input file.")

dsp.add_dispatcher(
    dsp=_load, inputs=("input_file_name", "input_file"), outputs=("base", "dice")
)


[docs]@sh.add_function(dsp, outputs=["model"]) def register_model(): """ Register core model. :return: GEARSHIFT core model. :rtype: schedula.Dispatcher """ from .model import dsp return dsp.register(memo={})
def _obtain_inputs(case, base): case_rename = { "do_dsc": "ApplyDownscaling", "do_cap": "ApplySpeedCap", "do_cmp": "ApplyDistanceCompensation", "calc_dsc": "UseCalculatedDownscalingPercentage", "f_dsc": "DownscalingPercentage", "v_max": "MaximumVelocityDefined", "v_cap": "CappedSpeed", "n_min1": "MinDriveEngineSpeed1st", "n_min12": "MinDriveEngineSpeed1stTo2nd", "n_min2d": "MinDriveEngineSpeed2ndDecel", "n_min2": "MinDriveEngineSpeed2nd", "n_min3": "MinDriveEngineSpeedGreater2nd", "n_min3a": "MinDriveEngineSpeedGreater2ndAccel", "n_min3d": "MinDriveEngineSpeedGreater2ndDecel", "n_min3as": "MinDriveEngineSpeedGreater2ndAccelStartPhase", "n_min3ds": "MinDriveEngineSpeedGreater2ndDecelStartPhase", "t_start": "TimeEndOfStartPhase", "supp0": "SuppressGear0DuringDownshifts", "excl1": "ExcludeCrawlerGear", "autom": "AutomaticClutchOperation", "n_lim": "EngineSpeedLimitVMax", "asm_0": "AdditionalSafetyMargin0", "n_asm_s": "StartEngineSpeed", "n_asm_e": "EndEngineSpeed", } vehicle_rename = { "p_rated": "RatedEnginePower", "n_rated": "RatedEngineSpeed", "n_idle": "IdlingEngineSpeed", "n_max1": "Max95EngineSpeed", "#g": "NoOfGears", "m_test": "VehicleTestMass", "f0": "f0", "f1": "f1", "f2": "f2", "SM": "SafetyMargin", } phase_rename = {"length": "PhaseLengths"} gear_box_rename = {"gear": "gear_nbrs", "ndv": "Ndv"} scale_rename = { "algo": "ScalingAlgorithms", "t_beg": "ScalingStartTimes", "t_max": "ScalingCorrectionTimes", "t_end": "ScalingEndTimes", "r0": "r0", "a1": "a1", "b1": "b1", } FullPowerCurve = ( base["engine"] .loc[base["engine"]["vehicle"] == case["vehicle"]] .drop(["vehicle"], axis=1) .to_numpy() ) FullPowerCurve[:, 2] *= 100 gearbox_ratios = { gear_box_rename[k]: v for k, v in base["gearbox_ratios"] .loc[base["gearbox_ratios"]["vehicle"] == case["vehicle"]] .to_dict("list") .items() if k in gear_box_rename } Trace = ( base["trace"] .loc[base["trace"]["class"] == case["class"]] .drop(["class"], axis=1) .to_numpy() ) input_case = { case_rename[k]: v for k, v in case.to_dict().items() if k in case_rename } vehicle = { vehicle_rename[k]: v for k, v in base["vehicle"] .loc[base["vehicle"]["vehicle"] == case["vehicle"]] .to_dict("records")[0] .items() if k in vehicle_rename } if case["class"] not in ["class1", "class2", "class3a", "class3b"]: case_scale_phase = case["class"].replace(" ", "")[:-1] else: case_scale_phase = case["class"] scale = { scale_rename[k]: v for k, v in base["scale"] .loc[base["scale"]["class"] == case_scale_phase] .to_dict("records")[0] .items() if k in scale_rename } phase = { phase_rename[k]: v for k, v in base["phase"] .loc[base["phase"]["class"] == case_scale_phase] .to_dict("list") .items() if k in phase_rename } dicts = [ {"FullPowerCurve": FullPowerCurve, "Trace": Trace}, gearbox_ratios, input_case, vehicle, scale, phase, ] final_dict = {k: v for d in dicts for k, v in d.items()} input = {"execution_case": final_dict} return input
[docs]@sh.add_function(dsp, outputs=["sol"]) def run_model(base, model): """ Run whole model (scale trace and gearshifts) for each case. :return sol: List of dictionaries that contains the solution for different cases. :rtype: list """ from tqdm import tqdm sol, input, case = [], {}, base["case"] pbar = tqdm( total=len(list(case.iterrows())), desc="Executing gearshift model", position=0 ) for index, row in case.iterrows(): pbar.update(1) input = _obtain_inputs(row, base) sol_case = model(dict(input)) dict_case = { "Case": row.to_dict()["case"], "NoOfGears": sol_case["shift_points"]["NoOfGearsFinal"], } for k, v in sh.stack_nested_keys(sol_case.get("shift_points", {}), depth=2): if len(k) >= 2: dict_case[str(k[1])] = v sol.append(dict_case) return sol
dsp.add_dispatcher( dsp=_write, inputs=("output_folder", "sol", "timestamp", "output_format"), outputs=("output_file_name", sh.SINK), )