diff options
Diffstat (limited to 'bsie/tools/pipeline.py')
-rw-r--r-- | bsie/tools/pipeline.py | 52 |
1 files changed, 34 insertions, 18 deletions
diff --git a/bsie/tools/pipeline.py b/bsie/tools/pipeline.py index 834bd99..52ce526 100644 --- a/bsie/tools/pipeline.py +++ b/bsie/tools/pipeline.py @@ -11,8 +11,6 @@ import typing # bsie imports from bsie import base -from bsie.utils.node import Node -from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename from bsie.utils import bsfs, node, ns # exports @@ -36,7 +34,7 @@ class Pipeline(): """ # combined extractor schemas. - schema: _schema.Schema + _schema: bsfs.schema.Schema # node prefix. _prefix: bsfs.Namespace @@ -53,7 +51,7 @@ class Pipeline(): self._prefix = prefix self._ext2rdr = ext2rdr # compile schema from all extractors - self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr) + self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr) def __str__(self) -> str: return bsfs.typename(self) @@ -62,29 +60,47 @@ class Pipeline(): return f'{bsfs.typename(self)}(...)' def __hash__(self) -> int: - return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) + return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values()))) def __eq__(self, other: typing.Any) -> bool: return isinstance(other, type(self)) \ - and self.schema == other.schema \ + and self._schema == other._schema \ and self._prefix == other._prefix \ and self._ext2rdr == other._ext2rdr - def predicates(self) -> typing.Iterator[_schema.Predicate]: - """Return the predicates that are extracted from a file.""" - return iter({pred for ext in self._ext2rdr for pred in ext.predicates()}) + @property + def schema(self) -> bsfs.schema.Schema: + """Return the pipeline's schema (combined from all extractors).""" + return self._schema + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates that can be extracted.""" + return iter({pred for ext in self._ext2rdr for pred in ext.principals}) + + def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema: + """Return the subset of the schema that supports the given *principals*.""" + # materialize principals + principals = set(principals) + # collect and combine schemas from extractors + return bsfs.schema.Schema.Union({ + ext.schema + for ext + in self._ext2rdr + if not set(ext.principals).isdisjoint(principals) + }) def __call__( self, - path: URI, - predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None, - ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]: - """Extract triples from the file at *path*. Optionally, limit triples to *predicates*.""" - # get predicates - predicates = set(predicates) if predicates is not None else set(self.schema.predicates()) + path: bsfs.URI, + principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + """Extract triples from the file at *path*. Optionally, limit triples to *principals*.""" + # get principals + principals = set(principals) if principals is not None else set(self.schema.predicates()) # get extractors - extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)} + extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)} # corner-case short-cut if len(extractors) == 0: @@ -110,8 +126,8 @@ class Pipeline(): for ext in extrs: try: # get predicate/value tuples - for node, pred, value in ext.extract(subject, content, predicates): - yield node, pred, value + for subject, pred, value in ext.extract(subject, content, principals): + yield subject, pred, value except base.errors.ExtractorError as err: # critical extractor failure. |