Source code for gearshift

# -*- coding: utf-8 -*-
#
# Copyright (c) 2021 European Union;
# Licensed under the EUPL, Version 1.2 or – as soon they will be approved by the European Commission
# – subsequent versions of the EUPL (the "Licence");
#
# You may not use this work except in compliance with the Licence.
# You may obtain a copy of the Licence at: https://joinup.ec.europa.eu/collection/eupl/eupl-text-eupl-12
"""
Defines the file processing chain model `dsp`.

Sub-Modules:

.. currentmodule:: gearshift

.. autosummary::
    :nosignatures:
    :toctree: toctree/gearshift/

    core
    cli
"""
import tqdm
import logging
import os.path as osp
import schedula as sh

log = logging.getLogger(__name__)
dsp = sh.BlueDispatcher(name="process")


[docs]def init_conf(inputs): """ Initialize GEARSHIFT model configurations. :param inputs: Initialization inputs. :type inputs: dict | schedula.Token :return: Initialization inputs. :rtype: dict | schedula.Token """ return inputs
def _yield_files(*paths, cache=None, ext=("xlsx")): import glob cache = set() if cache is None else cache for path in paths: path = osp.abspath(path) if path in cache or osp.basename(path).startswith("~"): continue cache.add(path) if osp.isdir(path): yield from _yield_files( *filter(osp.isfile, glob.glob(osp.join(path, "*"))), cache=cache ) elif osp.isfile(path) and path.lower().endswith(ext): yield path else: log.info('Skipping file "%s".' % path) dsp.add_data(sh.START, filters=[init_conf, lambda x: sh.NONE])
[docs]@sh.add_function(dsp, outputs=["core_model"]) def register_core(): """ Register core model. :return: GEARSHIFT core model. :rtype: schedula.Dispatcher """ from .core import dsp return dsp.register(memo={})
class _ProgressBar(tqdm.tqdm): def __init__(self, *args, _format_meter=None, **kwargs): if _format_meter: self._format_meter = _format_meter super(_ProgressBar, self).__init__(*args, **kwargs) @staticmethod def _format_meter(bar, data): return "%s: Processing %s\n" % (bar, data) # noinspection PyMissingOrEmptyDocstring def format_meter(self, n, *args, **kwargs): bar = super(_ProgressBar, self).format_meter(n, *args, **kwargs) try: return self._format_meter(bar, self.iterable[n]) except IndexError: return bar
[docs]@sh.add_function(dsp, outputs=["timestamp"]) def default_timestamp(start_time): """ Returns the default timestamp. :param start_time: Run start time. :type start_time: datetime.datetime :return: Run timestamp. :rtype: str """ return start_time.strftime("%Y%m%d_%H%M%S")
[docs]@sh.add_function(dsp, outputs=["core_solutions"]) def run_core( core_model, input_files, output_folder, cmd_flags, timestamp, output_format, **kwargs ): """ Run core model. :param core_model: GEARSHIFT core model. :type core_model: schedula.Dispatcher :param cmd_flags: Command line options. :type cmd_flags: dict :param timestamp: Run timestamp. :type timestamp: str :param input_files: List of input files and/or folders. :type input_files: iterable :return: Core model solutions. :rtype: dict[str, schedula.Solution] """ solutions, it = {}, list(_yield_files(*input_files)) if it: for fp in _ProgressBar(it): solutions[fp] = core_model( dict( input_file_name=fp, cmd_flags=cmd_flags, output_folder=output_folder, timestamp=timestamp, output_format=output_format, ), **kwargs ) return solutions
def _check_demo_flag(output_folder, demo_flag): return demo_flag
[docs]@sh.add_function(dsp, outputs=["demo"], input_domain=_check_demo_flag) def save_demo_files(output_folder, demo_flag): """ Save GEARSHIFT demo files. :param output_folder: Output folder. :type output_folder: str """ import glob import os from shutil import copy2 from pkg_resources import resource_filename os.makedirs(output_folder or ".", exist_ok=True) for src in glob.glob(resource_filename("gearshift", "demos/*")): copy2(src, osp.join(output_folder, osp.basename(src))) log.info("GEARSHIFT demos written into (%s).", output_folder)
[docs]@sh.add_function(dsp, outputs=["start_time"]) def default_start_time(): """ Returns the default run start time. :return: Run start time. :rtype: datetime.datetime """ import datetime return datetime.datetime.today()
[docs]@sh.add_function(dsp, outputs=["done"], weight=sh.inf(100, 0)) def log_done(start_time): """ Logs the overall execution time. :param start_time: Run start time. :type start_time: datetime.datetime :return: Execution time [s]. :rtype: datetime.datetime """ import datetime sec = (datetime.datetime.today() - start_time).total_seconds() log.info(" Done! [%.2f sec]" % sec) return sec