diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-03-05 19:22:58 +0100 |
commit | a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch) | |
tree | fb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/lib | |
parent | 7582c280ad5324a2f0427999911c7e7abc14a6ab (diff) | |
parent | af81318ae9311fd0b0e16949cef3cfaf7996970b (diff) | |
download | bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.gz bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.bz2 bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.zip |
Diffstat (limited to 'bsie/lib')
-rw-r--r-- | bsie/lib/__init__.py | 10 | ||||
-rw-r--r-- | bsie/lib/bsie.py | 21 | ||||
-rw-r--r-- | bsie/lib/builder.py | 75 | ||||
-rw-r--r-- | bsie/lib/naming_policy.py | 115 | ||||
-rw-r--r-- | bsie/lib/pipeline.py | 138 |
5 files changed, 343 insertions, 16 deletions
diff --git a/bsie/lib/__init__.py b/bsie/lib/__init__.py index 578c2c4..f44fb74 100644 --- a/bsie/lib/__init__.py +++ b/bsie/lib/__init__.py @@ -1,18 +1,16 @@ -""" -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports +# standard imports import typing # inner-module imports from .bsie import BSIE +from .builder import PipelineBuilder +from .naming_policy import DefaultNamingPolicy # exports __all__: typing.Sequence[str] = ( 'BSIE', + 'PipelineBuilder', ) ## EOF ## diff --git a/bsie/lib/bsie.py b/bsie/lib/bsie.py index e087fa9..b02e707 100644 --- a/bsie/lib/bsie.py +++ b/bsie/lib/bsie.py @@ -1,16 +1,14 @@ -""" -Part of the bsie module. -A copy of the license is provided with the project. -Author: Matthias Baumgartner, 2022 -""" -# imports +# standard imports import typing # bsie imports -from bsie.tools import Pipeline from bsie.utils import bsfs, node, ns +# inner-module imports +from .naming_policy import NamingPolicy +from .pipeline import Pipeline + # exports __all__: typing.Sequence[str] = ( 'BSIE', @@ -39,15 +37,18 @@ class BSIE(): def __init__( self, - # pipeline builder. + # pipeline. pipeline: Pipeline, + # naming policy + naming_policy: NamingPolicy, # principals to extract at most. None implies all available w.r.t. extractors. collect: typing.Optional[typing.Iterable[bsfs.URI]] = None, # principals to discard. discard: typing.Optional[typing.Iterable[bsfs.URI]] = None, ): - # store pipeline + # store pipeline and naming policy self._pipeline = pipeline + self._naming_policy = naming_policy # start off with available principals self._principals = {pred.uri for pred in self._pipeline.principals} # limit principals to specified ones by argument. @@ -87,6 +88,6 @@ class BSIE(): # predicate lookup principals = {self.schema.predicate(pred) for pred in principals} # invoke pipeline - yield from self._pipeline(path, principals) + yield from self._naming_policy(self._pipeline(path, principals)) ## EOF ## diff --git a/bsie/lib/builder.py b/bsie/lib/builder.py new file mode 100644 index 0000000..3a15311 --- /dev/null +++ b/bsie/lib/builder.py @@ -0,0 +1,75 @@ + +# standard imports +import logging +import typing + +# bsie imports +from bsie.extractor import ExtractorBuilder +from bsie.reader import ReaderBuilder +from bsie.utils import errors + +# inner-module imports +from . import pipeline + +# exports +__all__: typing.Sequence[str] = ( + 'PipelineBuilder', + ) + + +## code ## + +logger = logging.getLogger(__name__) + +class PipelineBuilder(): + """Build `bsie.tools.pipeline.Pipeline` instances.""" + + # builder for Readers. + rbuild: ReaderBuilder + + # builder for Extractors. + ebuild: ExtractorBuilder + + def __init__( + self, + reader_builder: ReaderBuilder, + extractor_builder: ExtractorBuilder, + ): + self.rbuild = reader_builder + self.ebuild = extractor_builder + + def build(self) -> pipeline.Pipeline: + """Return a Pipeline instance.""" + ext2rdr = {} + + for eidx in self.ebuild: + # build extractor + try: + ext = self.ebuild.build(eidx) + + except errors.LoaderError as err: # failed to load extractor; skip + logger.error('failed to load extractor: %s', err) + continue + + except errors.BuilderError as err: # failed to build instance; skip + logger.error(str(err)) + continue + + try: + # get reader required by extractor + if ext.CONTENT_READER is not None: + rdr = self.rbuild.build(ext.CONTENT_READER) + else: + rdr = None + # store extractor + ext2rdr[ext] = rdr + + except errors.LoaderError as err: # failed to load reader + logger.error('failed to load reader: %s', err) + + except errors.BuilderError as err: # failed to build reader + logger.error(str(err)) + + return pipeline.Pipeline(ext2rdr) + +## EOF ## diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py new file mode 100644 index 0000000..9b9a45d --- /dev/null +++ b/bsie/lib/naming_policy.py @@ -0,0 +1,115 @@ + +# standard imports +import abc +import os +import typing + +# bsie imports +from bsie.utils import bsfs, errors, ns +from bsie.utils.node import Node + +# exports +__all__: typing.Sequence[str] = ( + 'DefaultNamingPolicy', + ) + + +## code ## + +class NamingPolicy(): + """Determine node uri's from node hints.""" + def __call__( + self, + iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]], + ): + """Apply the policy on a triple iterator.""" + return NamingPolicyIterator(self, iterable) + + @abc.abstractmethod + def handle_node(self, node: Node) -> Node: + """Apply the policy on a node.""" + + +class NamingPolicyIterator(): + """Iterates over triples, determines uris according to a *policy* as it goes.""" + + # source triple iterator. + _iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]] + + # naming policy + _policy: NamingPolicy + + def __init__( + self, + policy: NamingPolicy, + iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]], + ): + self._iterable = iterable + self._policy = policy + + def __iter__(self): + for node, pred, value in self._iterable: + # handle subject + self._policy.handle_node(node) + # handle value + if isinstance(value, Node): + self._policy.handle_node(value) + # yield triple + yield node, pred, value + + +class DefaultNamingPolicy(NamingPolicy): + """Compose URIs as <host/user/node_type#fragment> + + What information is used as fragment depends on the node type. + Typically, the default is to use the "ucid" hint. + The fallback in all cases is to generate a random uuid. + + Never changes previously assigned uris. Sets uris in-place. + + """ + + def __init__( + self, + host: bsfs.URI, + user: str, + ): + self._prefix = bsfs.Namespace(os.path.join(host, user)) + self._uuid = bsfs.uuid.UUID() + + def handle_node(self, node: Node) -> Node: + if node.uri is not None: + return node + if node.node_type == ns.bsn.Entity : + return self.name_file(node) + if node.node_type == ns.bsn.Preview: + return self.name_preview(node) + raise errors.ProgrammingError('no naming policy available for {node.node_type}') + + def name_file(self, node: Node) -> Node: + """Set a bsfs:File node's uri fragment to its ucid.""" + if 'ucid' in node.hints: # content id + fragment = node.hints['ucid'] + else: # random name + fragment = self._uuid() + node.uri = getattr(self._prefix.file(), fragment) + return node + + def name_preview(self, node: Node) -> Node: + """Set a bsfs:Preview node's uri fragment to its ucid. + Uses its source fragment as fallback. Appends the size if provided. + """ + fragment = None + if 'ucid' in node.hints: # content id + fragment = node.hints['ucid'] + if fragment is None and 'source' in node.hints: # source id + self.handle_node(node.hints['source']) + fragment = node.hints['source'].uri.get('fragment', None) + if fragment is None: # random name + fragment = self._uuid() + if 'size' in node.hints: # append size + fragment += '_s' + str(node.hints['size']) + node.uri = getattr(self._prefix.preview(), fragment) + return node + +## EOF ## diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py new file mode 100644 index 0000000..30fd6fd --- /dev/null +++ b/bsie/lib/pipeline.py @@ -0,0 +1,138 @@ + +# standard imports +from collections import defaultdict +import logging +import typing + +# bsie imports +from bsie.extractor import Extractor +from bsie.reader import Reader +from bsie.utils import bsfs, errors, node, ns + +# exports +__all__: typing.Sequence[str] = ( + 'Pipeline', + ) + + +## code ## + +logger = logging.getLogger(__name__) + +class Pipeline(): + """Extraction pipeline to generate triples from files. + + The Pipeline binds readers and extractors, and performs + the necessary operations to produce triples from a file. + It takes a best-effort approach to extract as many triples + as possible. Errors during the extraction are passed over + and reported to the log. + + """ + + # combined extractor schemas. + _schema: bsfs.schema.Schema + + # extractor -> reader mapping + _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + + def __init__( + self, + ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]] + ): + # store core members + self._ext2rdr = ext2rdr + # compile schema from all extractors + self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) + + def __str__(self) -> str: + return bsfs.typename(self) + + def __repr__(self) -> str: + return f'{bsfs.typename(self)}(...)' + + def __hash__(self) -> int: + return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + + def __eq__(self, other: typing.Any) -> bool: + return isinstance(other, type(self)) \ + and self._schema == other._schema \ + and self._ext2rdr == other._ext2rdr + + @property + def schema(self) -> bsfs.schema.Schema: + """Return the pipeline's schema (combined from all extractors).""" + return self._schema + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates that can be extracted.""" + return iter({pred for ext in self._ext2rdr for pred in ext.principals}) + + def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: + """Return the subset of the schema that supports the given *principals*.""" + # materialize principals + principals = set(principals) + # collect and combine schemas from extractors + return bsfs.schema.Schema.Union({ + ext.schema + for ext + in self._ext2rdr + if not set(ext.principals).isdisjoint(principals) + }) + + def __call__( + self, + path: bsfs.URI, + principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" + # get principals + principals = set(principals) if principals is not None else set(self.schema.predicates()) + + # get extractors + extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} + + # corner-case short-cut + if len(extractors) == 0: + return + + # get readers -> extractors mapping + rdr2ext = defaultdict(set) + for ext in extractors: + rdr = self._ext2rdr[ext] + rdr2ext[rdr].add(ext) + + # create subject for file + subject = node.Node(ns.bsn.Entity, + ucid=bsfs.uuid.UCID.from_path(path), + ) + + # extract information + for rdr, extrs in rdr2ext.items(): + try: + # get content + content = rdr(path) if rdr is not None else None + #logger.info('extracted %s from %s', rdr, path) + + # apply extractors on this content + for ext in extrs: + try: + # get predicate/value tuples + yield from ext.extract(subject, content, principals) + + except errors.ExtractorError as err: + # critical extractor failure. + logger.error('%s failed to extract triples from content: %s', ext, err) + + except errors.UnsupportedFileFormatError: + # failed to read the file format. skip. + #logger.warning('%s could not process the file format of %s', rdr, err) + pass + + except errors.ReaderError as err: + # failed to read any content. skip. + logger.error('%s failed to read content: %s', rdr, err) + + +## EOF ## |