# standard imports import os import typing # external imports import yaml # bsie imports from bsie.extractor import ExtractorBuilder from bsie.lib import PipelineBuilder from bsie.lib.pipeline import Pipeline from bsie.reader import ReaderBuilder # constants DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__), 'default_config.yaml') # exports __all__: typing.Sequence[str] = ( 'DEFAULT_CONFIG_FILE', 'load_pipeline', ) ## code ## def load_pipeline(path: str = DEFAULT_CONFIG_FILE) -> Pipeline: """Load a pipeline according to a config at *path*.""" # load config file with open(path, 'rt', encoding='utf-8') as ifile: cfg = yaml.safe_load(ifile) # reader builder rbuild = ReaderBuilder(cfg['ReaderBuilder']) # extractor builder ebuild = ExtractorBuilder(cfg['ExtractorBuilder']) # pipeline builder pbuild = PipelineBuilder( rbuild, ebuild, ) # build pipeline pipeline = pbuild.build() # return pipeline return pipeline ## EOF ##