aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/lib
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
commita35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch)
treefb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/lib
parent7582c280ad5324a2f0427999911c7e7abc14a6ab (diff)
parentaf81318ae9311fd0b0e16949cef3cfaf7996970b (diff)
downloadbsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.gz
bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.tar.bz2
bsie-a35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d.zip
Merge branch 'develop'HEADv0.23.03releasemain
Diffstat (limited to 'bsie/lib')
-rw-r--r--bsie/lib/__init__.py10
-rw-r--r--bsie/lib/bsie.py21
-rw-r--r--bsie/lib/builder.py75
-rw-r--r--bsie/lib/naming_policy.py115
-rw-r--r--bsie/lib/pipeline.py138
5 files changed, 343 insertions, 16 deletions
diff --git a/bsie/lib/__init__.py b/bsie/lib/__init__.py
index 578c2c4..f44fb74 100644
--- a/bsie/lib/__init__.py
+++ b/bsie/lib/__init__.py
@@ -1,18 +1,16 @@
-"""
-Part of the bsie module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
-# imports
+# standard imports
import typing
# inner-module imports
from .bsie import BSIE
+from .builder import PipelineBuilder
+from .naming_policy import DefaultNamingPolicy
# exports
__all__: typing.Sequence[str] = (
'BSIE',
+ 'PipelineBuilder',
)
## EOF ##
diff --git a/bsie/lib/bsie.py b/bsie/lib/bsie.py
index e087fa9..b02e707 100644
--- a/bsie/lib/bsie.py
+++ b/bsie/lib/bsie.py
@@ -1,16 +1,14 @@
-"""
-Part of the bsie module.
-A copy of the license is provided with the project.
-Author: Matthias Baumgartner, 2022
-"""
-# imports
+# standard imports
import typing
# bsie imports
-from bsie.tools import Pipeline
from bsie.utils import bsfs, node, ns
+# inner-module imports
+from .naming_policy import NamingPolicy
+from .pipeline import Pipeline
+
# exports
__all__: typing.Sequence[str] = (
'BSIE',
@@ -39,15 +37,18 @@ class BSIE():
def __init__(
self,
- # pipeline builder.
+ # pipeline.
pipeline: Pipeline,
+ # naming policy
+ naming_policy: NamingPolicy,
# principals to extract at most. None implies all available w.r.t. extractors.
collect: typing.Optional[typing.Iterable[bsfs.URI]] = None,
# principals to discard.
discard: typing.Optional[typing.Iterable[bsfs.URI]] = None,
):
- # store pipeline
+ # store pipeline and naming policy
self._pipeline = pipeline
+ self._naming_policy = naming_policy
# start off with available principals
self._principals = {pred.uri for pred in self._pipeline.principals}
# limit principals to specified ones by argument.
@@ -87,6 +88,6 @@ class BSIE():
# predicate lookup
principals = {self.schema.predicate(pred) for pred in principals}
# invoke pipeline
- yield from self._pipeline(path, principals)
+ yield from self._naming_policy(self._pipeline(path, principals))
## EOF ##
diff --git a/bsie/lib/builder.py b/bsie/lib/builder.py
new file mode 100644
index 0000000..3a15311
--- /dev/null
+++ b/bsie/lib/builder.py
@@ -0,0 +1,75 @@
+
+# standard imports
+import logging
+import typing
+
+# bsie imports
+from bsie.extractor import ExtractorBuilder
+from bsie.reader import ReaderBuilder
+from bsie.utils import errors
+
+# inner-module imports
+from . import pipeline
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'PipelineBuilder',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class PipelineBuilder():
+ """Build `bsie.tools.pipeline.Pipeline` instances."""
+
+ # builder for Readers.
+ rbuild: ReaderBuilder
+
+ # builder for Extractors.
+ ebuild: ExtractorBuilder
+
+ def __init__(
+ self,
+ reader_builder: ReaderBuilder,
+ extractor_builder: ExtractorBuilder,
+ ):
+ self.rbuild = reader_builder
+ self.ebuild = extractor_builder
+
+ def build(self) -> pipeline.Pipeline:
+ """Return a Pipeline instance."""
+ ext2rdr = {}
+
+ for eidx in self.ebuild:
+ # build extractor
+ try:
+ ext = self.ebuild.build(eidx)
+
+ except errors.LoaderError as err: # failed to load extractor; skip
+ logger.error('failed to load extractor: %s', err)
+ continue
+
+ except errors.BuilderError as err: # failed to build instance; skip
+ logger.error(str(err))
+ continue
+
+ try:
+ # get reader required by extractor
+ if ext.CONTENT_READER is not None:
+ rdr = self.rbuild.build(ext.CONTENT_READER)
+ else:
+ rdr = None
+ # store extractor
+ ext2rdr[ext] = rdr
+
+ except errors.LoaderError as err: # failed to load reader
+ logger.error('failed to load reader: %s', err)
+
+ except errors.BuilderError as err: # failed to build reader
+ logger.error(str(err))
+
+ return pipeline.Pipeline(ext2rdr)
+
+## EOF ##
diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py
new file mode 100644
index 0000000..9b9a45d
--- /dev/null
+++ b/bsie/lib/naming_policy.py
@@ -0,0 +1,115 @@
+
+# standard imports
+import abc
+import os
+import typing
+
+# bsie imports
+from bsie.utils import bsfs, errors, ns
+from bsie.utils.node import Node
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'DefaultNamingPolicy',
+ )
+
+
+## code ##
+
+class NamingPolicy():
+ """Determine node uri's from node hints."""
+ def __call__(
+ self,
+ iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
+ ):
+ """Apply the policy on a triple iterator."""
+ return NamingPolicyIterator(self, iterable)
+
+ @abc.abstractmethod
+ def handle_node(self, node: Node) -> Node:
+ """Apply the policy on a node."""
+
+
+class NamingPolicyIterator():
+ """Iterates over triples, determines uris according to a *policy* as it goes."""
+
+ # source triple iterator.
+ _iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]]
+
+ # naming policy
+ _policy: NamingPolicy
+
+ def __init__(
+ self,
+ policy: NamingPolicy,
+ iterable: typing.Iterable[typing.Tuple[Node, bsfs.URI, typing.Any]],
+ ):
+ self._iterable = iterable
+ self._policy = policy
+
+ def __iter__(self):
+ for node, pred, value in self._iterable:
+ # handle subject
+ self._policy.handle_node(node)
+ # handle value
+ if isinstance(value, Node):
+ self._policy.handle_node(value)
+ # yield triple
+ yield node, pred, value
+
+
+class DefaultNamingPolicy(NamingPolicy):
+ """Compose URIs as <host/user/node_type#fragment>
+
+ What information is used as fragment depends on the node type.
+ Typically, the default is to use the "ucid" hint.
+ The fallback in all cases is to generate a random uuid.
+
+ Never changes previously assigned uris. Sets uris in-place.
+
+ """
+
+ def __init__(
+ self,
+ host: bsfs.URI,
+ user: str,
+ ):
+ self._prefix = bsfs.Namespace(os.path.join(host, user))
+ self._uuid = bsfs.uuid.UUID()
+
+ def handle_node(self, node: Node) -> Node:
+ if node.uri is not None:
+ return node
+ if node.node_type == ns.bsn.Entity :
+ return self.name_file(node)
+ if node.node_type == ns.bsn.Preview:
+ return self.name_preview(node)
+ raise errors.ProgrammingError('no naming policy available for {node.node_type}')
+
+ def name_file(self, node: Node) -> Node:
+ """Set a bsfs:File node's uri fragment to its ucid."""
+ if 'ucid' in node.hints: # content id
+ fragment = node.hints['ucid']
+ else: # random name
+ fragment = self._uuid()
+ node.uri = getattr(self._prefix.file(), fragment)
+ return node
+
+ def name_preview(self, node: Node) -> Node:
+ """Set a bsfs:Preview node's uri fragment to its ucid.
+ Uses its source fragment as fallback. Appends the size if provided.
+ """
+ fragment = None
+ if 'ucid' in node.hints: # content id
+ fragment = node.hints['ucid']
+ if fragment is None and 'source' in node.hints: # source id
+ self.handle_node(node.hints['source'])
+ fragment = node.hints['source'].uri.get('fragment', None)
+ if fragment is None: # random name
+ fragment = self._uuid()
+ if 'size' in node.hints: # append size
+ fragment += '_s' + str(node.hints['size'])
+ node.uri = getattr(self._prefix.preview(), fragment)
+ return node
+
+## EOF ##
diff --git a/bsie/lib/pipeline.py b/bsie/lib/pipeline.py
new file mode 100644
index 0000000..30fd6fd
--- /dev/null
+++ b/bsie/lib/pipeline.py
@@ -0,0 +1,138 @@
+
+# standard imports
+from collections import defaultdict
+import logging
+import typing
+
+# bsie imports
+from bsie.extractor import Extractor
+from bsie.reader import Reader
+from bsie.utils import bsfs, errors, node, ns
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Pipeline',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class Pipeline():
+ """Extraction pipeline to generate triples from files.
+
+ The Pipeline binds readers and extractors, and performs
+ the necessary operations to produce triples from a file.
+ It takes a best-effort approach to extract as many triples
+ as possible. Errors during the extraction are passed over
+ and reported to the log.
+
+ """
+
+ # combined extractor schemas.
+ _schema: bsfs.schema.Schema
+
+ # extractor -> reader mapping
+ _ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
+
+ def __init__(
+ self,
+ ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
+ ):
+ # store core members
+ self._ext2rdr = ext2rdr
+ # compile schema from all extractors
+ self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr)
+
+ def __str__(self) -> str:
+ return bsfs.typename(self)
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}(...)'
+
+ def __hash__(self) -> int:
+ return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self._schema == other._schema \
+ and self._ext2rdr == other._ext2rdr
+
+ @property
+ def schema(self) -> bsfs.schema.Schema:
+ """Return the pipeline's schema (combined from all extractors)."""
+ return self._schema
+
+ @property
+ def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
+ """Return the principal predicates that can be extracted."""
+ return iter({pred for ext in self._ext2rdr for pred in ext.principals})
+
+ def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema:
+ """Return the subset of the schema that supports the given *principals*."""
+ # materialize principals
+ principals = set(principals)
+ # collect and combine schemas from extractors
+ return bsfs.schema.Schema.Union({
+ ext.schema
+ for ext
+ in self._ext2rdr
+ if not set(ext.principals).isdisjoint(principals)
+ })
+
+ def __call__(
+ self,
+ path: bsfs.URI,
+ principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None,
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ """Extract triples from the file at *path*. Optionally, limit triples to *principals*."""
+ # get principals
+ principals = set(principals) if principals is not None else set(self.schema.predicates())
+
+ # get extractors
+ extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)}
+
+ # corner-case short-cut
+ if len(extractors) == 0:
+ return
+
+ # get readers -> extractors mapping
+ rdr2ext = defaultdict(set)
+ for ext in extractors:
+ rdr = self._ext2rdr[ext]
+ rdr2ext[rdr].add(ext)
+
+ # create subject for file
+ subject = node.Node(ns.bsn.Entity,
+ ucid=bsfs.uuid.UCID.from_path(path),
+ )
+
+ # extract information
+ for rdr, extrs in rdr2ext.items():
+ try:
+ # get content
+ content = rdr(path) if rdr is not None else None
+ #logger.info('extracted %s from %s', rdr, path)
+
+ # apply extractors on this content
+ for ext in extrs:
+ try:
+ # get predicate/value tuples
+ yield from ext.extract(subject, content, principals)
+
+ except errors.ExtractorError as err:
+ # critical extractor failure.
+ logger.error('%s failed to extract triples from content: %s', ext, err)
+
+ except errors.UnsupportedFileFormatError:
+ # failed to read the file format. skip.
+ #logger.warning('%s could not process the file format of %s', rdr, err)
+ pass
+
+ except errors.ReaderError as err:
+ # failed to read any content. skip.
+ logger.error('%s failed to read content: %s', rdr, err)
+
+
+## EOF ##