aboutsummaryrefslogtreecommitdiffstats
path: root/bsie
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2022-11-25 14:59:17 +0100
committerMatthias Baumgartner <dev@igsor.net>2022-11-25 14:59:17 +0100
commita294bbe0622911bcd6df37c38865a4c0eb290593 (patch)
treef038ed8d4f04c63991939e13e61ae170de4e2c57 /bsie
parent9389c741bdbbca9adbff6099d440706cd63deac4 (diff)
parent3e6a69ce7f109f0fd4352507ad60d58d4cbd24a7 (diff)
downloadbsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.gz
bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.tar.bz2
bsie-a294bbe0622911bcd6df37c38865a4c0eb290593.zip
Merge branch 'mb/tools' into develop
Diffstat (limited to 'bsie')
-rw-r--r--bsie/base/errors.py20
-rw-r--r--bsie/base/extractor.py63
-rw-r--r--bsie/base/reader.py17
-rw-r--r--bsie/extractor/generic/constant.py26
-rw-r--r--bsie/extractor/generic/path.py44
-rw-r--r--bsie/extractor/generic/stat.py38
-rw-r--r--bsie/reader/path.py7
-rw-r--r--bsie/reader/stat.py10
-rw-r--r--bsie/tools/__init__.py20
-rw-r--r--bsie/tools/builder.py217
-rw-r--r--bsie/tools/pipeline.py121
-rw-r--r--bsie/utils/bsfs.py5
-rw-r--r--bsie/utils/namespaces.py3
-rw-r--r--bsie/utils/node.py20
14 files changed, 525 insertions, 86 deletions
diff --git a/bsie/base/errors.py b/bsie/base/errors.py
index f86ffb2..760351f 100644
--- a/bsie/base/errors.py
+++ b/bsie/base/errors.py
@@ -8,15 +8,29 @@ Author: Matthias Baumgartner, 2022
import typing
# exports
-__all__: typing.Sequence[str] = []
+__all__: typing.Sequence[str] = (
+ 'BuilderError',
+ 'ExtractorError',
+ 'LoaderError',
+ 'ReaderError',
+ )
## code ##
-class _BSIE_Error(Exception):
+class _BSIEError(Exception):
"""Generic BSIE error."""
-class ReaderError(_BSIE_Error):
+class BuilderError(_BSIEError):
+ """The Builder failed to create an instance."""
+
+class LoaderError(BuilderError):
+ """Failed to load a module or class."""
+
+class ExtractorError(_BSIEError):
+ """The Extractor failed to process the given content."""
+
+class ReaderError(_BSIEError):
"""The Reader failed to read the given file."""
## EOF ##
diff --git a/bsie/base/extractor.py b/bsie/base/extractor.py
index ea43925..2fc4f18 100644
--- a/bsie/base/extractor.py
+++ b/bsie/base/extractor.py
@@ -8,16 +8,40 @@ Author: Matthias Baumgartner, 2022
import abc
import typing
-# inner-module imports
-from . import reader
+# bsie imports
from bsie.utils import node
-from bsie.utils.bsfs import URI, typename
+from bsie.utils.bsfs import schema as _schema, typename
# exports
__all__: typing.Sequence[str] = (
'Extractor',
)
+# constants
+
+# essential definitions typically used in extractor schemas.
+# NOTE: The definition here is only for convenience; Each Extractor must implement its use, if so desired.
+SCHEMA_PREAMBLE = '''
+ # common external prefixes
+ prefix owl: <http://www.w3.org/2002/07/owl#>
+ prefix rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>
+ prefix rdfs: <http://www.w3.org/2000/01/rdf-schema#>
+ prefix xsd: <http://www.w3.org/2001/XMLSchema#>
+ prefix schema: <http://schema.org/>
+
+ # common bsfs prefixes
+ prefix bsfs: <http://bsfs.ai/schema/>
+ prefix bse: <http://bsfs.ai/schema/Entity#>
+
+ # essential nodes
+ bsfs:Entity rdfs:subClassOf bsfs:Node .
+
+ # common definitions
+ xsd:string rdfs:subClassOf bsfs:Literal .
+ xsd:integer rdfs:subClassOf bsfs:Literal .
+
+ '''
+
## code ##
@@ -25,7 +49,13 @@ class Extractor(abc.ABC):
"""Produce (node, predicate, value)-triples from some content."""
# what type of content is expected (i.e. reader subclass).
- CONTENT_READER: typing.Optional[typing.Type[reader.Reader]] = None
+ CONTENT_READER: typing.Optional[str] = None
+
+ # extractor schema.
+ schema: _schema.Schema
+
+ def __init__(self, schema: _schema.Schema):
+ self.schema = schema
def __str__(self) -> str:
return typename(self)
@@ -33,17 +63,32 @@ class Extractor(abc.ABC):
def __repr__(self) -> str:
return f'{typename(self)}()'
- @abc.abstractmethod
- def schema(self) -> str:
- """Return the schema (predicates and nodes) produced by this Extractor."""
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.CONTENT_READER == other.CONTENT_READER \
+ and self.schema == other.schema
+
+ def __hash__(self) -> int:
+ return hash((type(self), self.CONTENT_READER, self.schema))
+
+ def predicates(self) -> typing.Iterator[_schema.Predicate]:
+ """Return the predicates that may be part of extracted triples."""
+ # NOTE: Some predicates in the schema might not occur in actual triples,
+ # but are defined due to predicate class hierarchy. E.g., bsfs:Predicate
+ # is part of every schema but should not be used in triples.
+ # Announcing all predicates might not be the most efficient way, however,
+ # it is the most safe one. Concrete extractors that produce additional
+ # predicates (e.g. auxiliary nodes with their own predicates) should
+ # overwrite this method to only include the principal predicates.
+ return self.schema.predicates()
@abc.abstractmethod
def extract(
self,
subject: node.Node,
content: typing.Any,
- predicates: typing.Iterable[URI],
- ) -> typing.Iterator[typing.Tuple[node.Node, URI, typing.Any]]:
+ predicates: typing.Iterable[_schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, _schema.Predicate, typing.Any]]:
"""Return (node, predicate, value) triples."""
## EOF ##
diff --git a/bsie/base/reader.py b/bsie/base/reader.py
index f29e451..b7eabf7 100644
--- a/bsie/base/reader.py
+++ b/bsie/base/reader.py
@@ -12,12 +12,11 @@ Author: Matthias Baumgartner, 2022
import abc
import typing
-# inner-module imports
+# bsie imports
from bsie.utils.bsfs import URI, typename
# exports
__all__: typing.Sequence[str] = (
- 'Aggregator',
'Reader',
)
@@ -27,20 +26,20 @@ __all__: typing.Sequence[str] = (
class Reader(abc.ABC):
"""Read and return some content from a file."""
- # In what data structure content is returned
- CONTENT_TYPE = typing.Union[typing.Any]
- # NOTE: Child classes must also assign a typing.Union even if there's
- # only one options
-
def __str__(self) -> str:
return typename(self)
def __repr__(self) -> str:
return f'{typename(self)}()'
- # FIXME: How about using contexts instead of calls?
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self))
+
+ def __hash__(self) -> int:
+ return hash(type(self))
+
@abc.abstractmethod
- def __call__(self, path: URI) -> CONTENT_TYPE:
+ def __call__(self, path: URI) -> typing.Any:
"""Return some content of the file at *path*.
Raises a `ReaderError` if the reader cannot make sense of the file format.
"""
diff --git a/bsie/extractor/generic/constant.py b/bsie/extractor/generic/constant.py
index e243131..7da792a 100644
--- a/bsie/extractor/generic/constant.py
+++ b/bsie/extractor/generic/constant.py
@@ -7,9 +7,9 @@ Author: Matthias Baumgartner, 2022
# imports
import typing
-# inner-module imports
+# bsie imports
from bsie.base import extractor
-from bsie.utils.bsfs import URI
+from bsie.utils.bsfs import URI, schema as _schema
from bsie.utils.node import Node
# exports
@@ -25,26 +25,32 @@ class Constant(extractor.Extractor):
CONTENT_READER = None
+ # predicate/value pairs to be produced.
+ _tuples: typing.Tuple[typing.Tuple[_schema.Predicate, typing.Any], ...]
+
def __init__(
self,
schema: str,
tuples: typing.Iterable[typing.Tuple[URI, typing.Any]],
):
- self._schema = schema
- self._tuples = tuples
- # FIXME: use schema instance for predicate checking
- #self._tuples = [(pred, value) for pred, value in tuples if pred in schema]
+ super().__init__(_schema.Schema.from_string(extractor.SCHEMA_PREAMBLE + schema))
+ # NOTE: Raises a KeyError if the predicate is not part of the schema
+ self._tuples = tuple((self.schema.predicate(p_uri), value) for p_uri, value in tuples)
# FIXME: use schema instance for value checking
- def schema(self) -> str:
- return self._schema
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self._tuples == other._tuples
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), self._tuples))
def extract(
self,
subject: Node,
content: None,
- predicates: typing.Iterable[URI],
- ) -> typing.Iterator[typing.Tuple[Node, URI, typing.Any]]:
+ predicates: typing.Iterable[_schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
for pred, value in self._tuples:
if pred in predicates:
yield subject, pred, value
diff --git a/bsie/extractor/generic/path.py b/bsie/extractor/generic/path.py
index c39bbd2..f346f97 100644
--- a/bsie/extractor/generic/path.py
+++ b/bsie/extractor/generic/path.py
@@ -8,11 +8,10 @@ Author: Matthias Baumgartner, 2022
import os
import typing
-# inner-module imports
+# bsie imports
from bsie.base import extractor
from bsie.utils import node, ns
-from bsie.utils.bsfs import URI
-import bsie.reader.path
+from bsie.utils.bsfs import schema
# exports
__all__: typing.Sequence[str] = (
@@ -25,32 +24,33 @@ __all__: typing.Sequence[str] = (
class Path(extractor.Extractor):
"""Extract information from file's path."""
- CONTENT_READER = bsie.reader.path.Path
+ CONTENT_READER = 'bsie.reader.path.Path'
- def __init__(self):
- self.__callmap = {
- ns.bse.filename: self.__filename,
- }
+ # mapping from predicate to handler function.
+ _callmap: typing.Dict[schema.Predicate, typing.Callable[[str], typing.Any]]
- def schema(self) -> str:
- return '''
- bse:filename a bsfs:Predicate ;
+ def __init__(self):
+ super().__init__(schema.Schema.from_string(extractor.SCHEMA_PREAMBLE + '''
+ bse:filename rdfs:subClassOf bsfs:Predicate ;
rdfs:domain bsfs:Entity ;
rdfs:range xsd:string ;
- rdf:label "File name"^^xsd:string ;
+ rdfs:label "File name"^^xsd:string ;
schema:description "Filename of entity in some filesystem."^^xsd:string ;
owl:maxCardinality "INF"^^xsd:number .
- '''
+ '''))
+ self._callmap = {
+ self.schema.predicate(ns.bse.filename): self.__filename,
+ }
def extract(
self,
subject: node.Node,
- content: CONTENT_READER.CONTENT_TYPE,
- predicates: typing.Iterable[URI],
- ) -> typing.Iterator[typing.Tuple[node.Node, URI, typing.Any]]:
+ content: str,
+ predicates: typing.Iterable[schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, schema.Predicate, typing.Any]]:
for pred in predicates:
# find callback
- clbk = self.__callmap.get(pred)
+ clbk = self._callmap.get(pred)
if clbk is None:
continue
# get value
@@ -60,11 +60,15 @@ class Path(extractor.Extractor):
# produce triple
yield subject, pred, value
- def __filename(self, path: str) -> str:
+ def __filename(self, path: str) -> typing.Optional[str]:
try:
return os.path.basename(path)
- except Exception:
- # FIXME: some kind of error reporting (e.g. logging)
+ except Exception: # some error, skip.
+ # FIXME: some kind of error reporting (e.g. logging)?
+ # Options: (a) Fail silently (current); (b) Skip and report to log;
+ # (c) Raise ExtractorError (aborts extraction); (d) separate content type
+ # checks from basename errors (report content type errors, skip basename
+ # errors)
return None
## EOF ##
diff --git a/bsie/extractor/generic/stat.py b/bsie/extractor/generic/stat.py
index d74369c..7088c0a 100644
--- a/bsie/extractor/generic/stat.py
+++ b/bsie/extractor/generic/stat.py
@@ -5,14 +5,13 @@ A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
+import os
import typing
-# inner-module imports
+# bsie imports
from bsie.base import extractor
from bsie.utils import node, ns
-from bsie.utils.bsfs import URI
-import bsie.reader.stat
-
+from bsie.utils.bsfs import schema as _schema
# exports
__all__: typing.Sequence[str] = (
@@ -25,32 +24,33 @@ __all__: typing.Sequence[str] = (
class Stat(extractor.Extractor):
"""Extract information from the file system."""
- CONTENT_READER = bsie.reader.stat.Stat
+ CONTENT_READER = 'bsie.reader.stat.Stat'
- def __init__(self):
- self.__callmap = {
- ns.bse.filesize: self.__filesize,
- }
+ # mapping from predicate to handler function.
+ _callmap: typing.Dict[_schema.Predicate, typing.Callable[[os.stat_result], typing.Any]]
- def schema(self) -> str:
- return '''
- bse:filesize a bsfs:Predicate ;
+ def __init__(self):
+ super().__init__(_schema.Schema.from_string(extractor.SCHEMA_PREAMBLE + '''
+ bse:filesize rdfs:subClassOf bsfs:Predicate ;
rdfs:domain bsfs:Entity ;
rdfs:range xsd:integer ;
- rdf:label "File size"^^xsd:string ;
+ rdfs:label "File size"^^xsd:string ;
schema:description "File size of entity in some filesystem."^^xsd:string ;
owl:maxCardinality "INF"^^xsd:number .
- '''
+ '''))
+ self._callmap = {
+ self.schema.predicate(ns.bse.filesize): self.__filesize,
+ }
def extract(
self,
subject: node.Node,
- content: CONTENT_READER.CONTENT_TYPE,
- predicates: typing.Iterable[URI],
- ) -> typing.Iterator[typing.Tuple[node.Node, URI, typing.Any]]:
+ content: os.stat_result,
+ predicates: typing.Iterable[_schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, _schema.Predicate, typing.Any]]:
for pred in predicates:
# find callback
- clbk = self.__callmap.get(pred)
+ clbk = self._callmap.get(pred)
if clbk is None:
continue
# get value
@@ -60,7 +60,7 @@ class Stat(extractor.Extractor):
# produce triple
yield subject, pred, value
- def __filesize(self, content: CONTENT_READER.CONTENT_TYPE) -> int:
+ def __filesize(self, content: os.stat_result) -> typing.Optional[int]:
"""Return the file size."""
try:
return content.st_size
diff --git a/bsie/reader/path.py b/bsie/reader/path.py
index d27c664..d60f187 100644
--- a/bsie/reader/path.py
+++ b/bsie/reader/path.py
@@ -5,10 +5,9 @@ A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
-import os
import typing
-# inner-module imports
+# bsie imports
from bsie.base import reader
# exports
@@ -22,9 +21,7 @@ __all__: typing.Sequence[str] = (
class Path(reader.Reader):
"""Return the path."""
- CONTENT_TYPE = typing.Union[str]
-
- def __call__(self, path: str) -> CONTENT_TYPE:
+ def __call__(self, path: str) -> str:
return path
diff --git a/bsie/reader/stat.py b/bsie/reader/stat.py
index f0b83fb..592d912 100644
--- a/bsie/reader/stat.py
+++ b/bsie/reader/stat.py
@@ -8,7 +8,7 @@ Author: Matthias Baumgartner, 2022
import os
import typing
-# inner-module imports
+# bsie imports
from bsie.base import reader, errors
# exports
@@ -22,13 +22,11 @@ __all__: typing.Sequence[str] = (
class Stat(reader.Reader):
"""Read and return the filesystem's stat infos."""
- CONTENT_TYPE = typing.Union[os.stat_result]
-
- def __call__(self, path: str) -> CONTENT_TYPE:
+ def __call__(self, path: str) -> os.stat_result:
try:
return os.stat(path)
- except Exception:
- raise errors.ReaderError(path)
+ except Exception as err:
+ raise errors.ReaderError(path) from err
## EOF ##
diff --git a/bsie/tools/__init__.py b/bsie/tools/__init__.py
new file mode 100644
index 0000000..8ca9620
--- /dev/null
+++ b/bsie/tools/__init__.py
@@ -0,0 +1,20 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# inner-module imports
+from . import builder
+from . import pipeline
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'builder',
+ 'pipeline',
+ )
+
+## EOF ##
diff --git a/bsie/tools/builder.py b/bsie/tools/builder.py
new file mode 100644
index 0000000..8f7a410
--- /dev/null
+++ b/bsie/tools/builder.py
@@ -0,0 +1,217 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import importlib
+import logging
+import typing
+
+# bsie imports
+from bsie import base
+from bsie.base import errors
+from bsie.utils.bsfs import URI, typename
+
+# inner-module imports
+from . import pipeline
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'ExtractorBuilder',
+ 'PipelineBuilder',
+ 'ReaderBuilder',
+ )
+
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+def _safe_load(module_name: str, class_name: str):
+ """Get a class from a module. Raise BuilderError if anything goes wrong."""
+ try:
+ # load the module
+ module = importlib.import_module(module_name)
+ except Exception as err:
+ # cannot import module
+ raise errors.LoaderError(f'cannot load module {module_name}') from err
+
+ try:
+ # get the class from the module
+ cls = getattr(module, class_name)
+ except Exception as err:
+ # cannot find the class
+ raise errors.LoaderError(f'cannot load class {class_name} from module {module_name}') from err
+
+ return cls
+
+
+def _unpack_name(name):
+ """Split a name into its module and class component (dot-separated)."""
+ if not isinstance(name, str):
+ raise TypeError(name)
+ if '.' not in name:
+ raise ValueError('name must be a qualified class name.')
+ module_name, class_name = name[:name.rfind('.')], name[name.rfind('.')+1:]
+ if module_name == '':
+ raise ValueError('name must be a qualified class name.')
+ return module_name, class_name
+
+
+class ReaderBuilder():
+ """Build `bsie.base.reader.Reader` instances.
+
+ Readers are defined via their qualified class name
+ (e.g., bsie.reader.path.Path) and optional keyword
+ arguments that are passed to the constructor via
+ the *kwargs* argument (name as key, kwargs as value).
+ The ReaderBuilder keeps a cache of previously built
+ reader instances, as they are anyway built with
+ identical keyword arguments.
+
+ """
+
+ # keyword arguments
+ kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]
+
+ # cached readers
+ cache: typing.Dict[str, base.reader.Reader]
+
+ def __init__(self, kwargs: typing.Dict[str, typing.Dict[str, typing.Any]]):
+ self.kwargs = kwargs
+ self.cache = {}
+
+ def build(self, name: str) -> base.reader.Reader:
+ """Return an instance for the qualified class name."""
+ # return cached instance
+ if name in self.cache:
+ return self.cache[name]
+
+ # check name and get module/class components
+ module_name, class_name = _unpack_name(name)
+
+ # import reader class
+ cls = _safe_load(module_name, class_name)
+
+ # get kwargs
+ kwargs = self.kwargs.get(name, {})
+ if not isinstance(kwargs, dict):
+ raise TypeError(f'expected a kwargs dict, found {typename(kwargs)}')
+
+ try: # build, cache, and return instance
+ obj = cls(**kwargs)
+ # cache instance
+ self.cache[name] = obj
+ # return instance
+ return obj
+
+ except Exception as err:
+ raise errors.BuilderError(f'failed to build reader {name} due to {typename(err)}: {err}') from err
+
+
+class ExtractorBuilder():
+ """Build `bsie.base.extractor.Extractor instances.
+
+ It is permissible to build multiple instances of the same extractor
+ (typically with different arguments), hence the ExtractorBuilder
+ receives a list of build specifications. Each specification is
+ a dict with a single key (extractor's qualified name) and a dict
+ to be used as keyword arguments.
+ Example: [{'bsie.extractor.generic.path.Path': {}}, ]
+
+ """
+
+ # build specifications
+ specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]
+
+ def __init__(self, specs: typing.List[typing.Dict[str, typing.Dict[str, typing.Any]]]):
+ self.specs = specs
+
+ def __iter__(self) -> typing.Iterator[int]:
+ """Iterate over extractor specifications."""
+ return iter(range(len(self.specs)))
+
+ def build(self, index: int) -> base.extractor.Extractor:
+ """Return an instance of the n'th extractor (n=*index*)."""
+ # get build instructions
+ specs = self.specs[index]
+
+ # check specs structure. expecting[{name: {kwargs}}]
+ if not isinstance(specs, dict):
+ raise TypeError(f'expected a dict, found {typename(specs)}')
+ if len(specs) != 1:
+ raise TypeError(f'expected a dict of length one, found {len(specs)}')
+
+ # get name and args from specs
+ name = next(iter(specs.keys()))
+ kwargs = specs[name]
+
+ # check kwargs structure
+ if not isinstance(kwargs, dict):
+ raise TypeError(f'expected a dict, found {typename(kwargs)}')
+
+ # check name and get module/class components
+ module_name, class_name = _unpack_name(name)
+
+ # import extractor class
+ cls = _safe_load(module_name, class_name)
+
+ try: # build and return instance
+ return cls(**kwargs)
+
+ except Exception as err:
+ raise errors.BuilderError(f'failed to build extractor {name} due to {typename(err)}: {err}') from err
+
+
+class PipelineBuilder():
+ """Build `bsie.tools.pipeline.Pipeline` instances."""
+
+ def __init__(
+ self,
+ prefix: URI,
+ reader_builder: ReaderBuilder,
+ extractor_builder: ExtractorBuilder,
+ ):
+ self.prefix = prefix
+ self.rbuild = reader_builder
+ self.ebuild = extractor_builder
+
+ def build(self) -> pipeline.Pipeline:
+ """Return a Pipeline instance."""
+ ext2rdr = {}
+
+ for eidx in self.ebuild:
+ # build extractor
+ try:
+ ext = self.ebuild.build(eidx)
+
+ except errors.LoaderError as err: # failed to load extractor; skip
+ logger.error('failed to load extractor: %s', err)
+ continue
+
+ except errors.BuilderError as err: # failed to build instance; skip
+ logger.error(str(err))
+ continue
+
+ try:
+ # get reader required by extractor
+ if ext.CONTENT_READER is not None:
+ rdr = self.rbuild.build(ext.CONTENT_READER)
+ else:
+ rdr = None
+ # store extractor
+ ext2rdr[ext] = rdr
+
+ except errors.LoaderError as err: # failed to load reader
+ logger.error('failed to load reader: %s', err)
+
+ except errors.BuilderError as err: # failed to build reader
+ logger.error(str(err))
+
+ return pipeline.Pipeline(self.prefix, ext2rdr)
+
+
+
+## EOF ##
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py
new file mode 100644
index 0000000..8e1c992
--- /dev/null
+++ b/bsie/tools/pipeline.py
@@ -0,0 +1,121 @@
+"""
+
+Part of the bsie module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+from collections import defaultdict
+import logging
+import typing
+
+# bsie imports
+from bsie import base
+from bsie.utils import ns
+from bsie.utils.node import Node
+from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Pipeline',
+ )
+
+## code ##
+
+logger = logging.getLogger(__name__)
+
+class Pipeline():
+ """Extraction pipeline to generate triples from files.
+
+ The Pipeline binds readers and extractors, and performs
+ the necessary operations to produce triples from a file.
+ It takes a best-effort approach to extract as many triples
+ as possible. Errors during the extraction are passed over
+ and reported to the log.
+
+ """
+
+ # combined extractor schemas.
+ schema: _schema.Schema
+
+ # node prefix.
+ _prefix: URI
+
+ # extractor -> reader mapping
+ _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+
+ def __init__(
+ self,
+ prefix: URI,
+ ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
+ ):
+ # store core members
+ self._prefix = prefix
+ self._ext2rdr = ext2rdr
+ # compile schema from all extractors
+ self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)
+
+ def __str__(self) -> str:
+ return typename(self)
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}(...)'
+
+ def __hash__(self) -> int:
+ return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, type(self)) \
+ and self.schema == other.schema \
+ and self._prefix == other._prefix \
+ and self._ext2rdr == other._ext2rdr
+
+ def __call__(
+ self,
+ path: URI,
+ predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
+ ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
+ """Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
+ # get predicates
+ predicates = set(predicates) if predicates is not None else set(self.schema.predicates())
+
+ # get extractors
+ extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}
+
+ # corner-case short-cut
+ if len(extractors) == 0:
+ return
+
+ # get readers -> extractors mapping
+ rdr2ext = defaultdict(set)
+ for ext in extractors:
+ rdr = self._ext2rdr[ext]
+ rdr2ext[rdr].add(ext)
+
+ # create subject for file
+ uuid = _uuid.UCID.from_path(path)
+ subject = Node(ns.bsfs.Entity, self._prefix + uuid)
+
+ # extract information
+ for rdr, extrs in rdr2ext.items():
+ try:
+ # get content
+ content = rdr(path) if rdr is not None else None
+
+ # apply extractors on this content
+ for ext in extrs:
+ try:
+ # get predicate/value tuples
+ for node, pred, value in ext.extract(subject, content, predicates):
+ yield node, pred, value
+
+ except base.errors.ExtractorError as err:
+ # critical extractor failure.
+ logger.error('%s failed to extract triples from content: %s', ext, err)
+
+ except base.errors.ReaderError as err:
+ # failed to read any content. skip.
+ logger.error('%s failed to read content: %s', rdr, err)
+
+
+## EOF ##
diff --git a/bsie/utils/bsfs.py b/bsie/utils/bsfs.py
index 1ae657c..a4b7626 100644
--- a/bsie/utils/bsfs.py
+++ b/bsie/utils/bsfs.py
@@ -8,14 +8,17 @@ Author: Matthias Baumgartner, 2022
import typing
# bsfs imports
+from bsfs import schema
from bsfs.namespace import Namespace
-from bsfs.utils import URI, typename
+from bsfs.utils import URI, typename, uuid
# exports
__all__: typing.Sequence[str] = (
'Namespace',
'URI',
+ 'schema',
'typename',
+ 'uuid',
)
## EOF ##
diff --git a/bsie/utils/namespaces.py b/bsie/utils/namespaces.py
index 67ccc71..13be96b 100644
--- a/bsie/utils/namespaces.py
+++ b/bsie/utils/namespaces.py
@@ -7,13 +7,14 @@ Author: Matthias Baumgartner, 2022
# imports
import typing
-# bsie imports
+# inner-module imports
from . import bsfs as _bsfs
# constants
bse = _bsfs.Namespace('http://bsfs.ai/schema/Entity#')
bsfs = _bsfs.Namespace('http://bsfs.ai/schema/')
bsm = _bsfs.Namespace('http://bsfs.ai/schema/meta#')
+xsd = _bsfs.Namespace('http://www.w3.org/2001/XMLSchema#')
# export
__all__: typing.Sequence[str] = (
diff --git a/bsie/utils/node.py b/bsie/utils/node.py
index 60863a4..c9c494f 100644
--- a/bsie/utils/node.py
+++ b/bsie/utils/node.py
@@ -7,12 +7,12 @@ Author: Matthias Baumgartner, 2022
# imports
import typing
-# inner-module imports
-from bsie.utils.bsfs import URI
+# bsie imports
+from bsie.utils.bsfs import URI, typename
# exports
__all__: typing.Sequence[str] = (
- 'Node'
+ 'Node',
)
@@ -36,4 +36,18 @@ class Node():
self.node_type = URI(node_type)
self.uri = URI(uri)
+ def __eq__(self, other: typing.Any) -> bool:
+ return isinstance(other, Node) \
+ and other.node_type == self.node_type \
+ and other.uri == self.uri
+
+ def __hash__(self) -> int:
+ return hash((type(self), self.node_type, self.uri))
+
+ def __str__(self) -> str:
+ return f'{typename(self)}({self.node_type}, {self.uri})'
+
+ def __repr__(self) -> str:
+ return f'{typename(self)}({self.node_type}, {self.uri})'
+
## EOF ##