1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
"""
Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
from collections import defaultdict
import logging
import typing
# bsie imports
from bsie import base
from bsie.utils.node import Node
from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename
from bsie.utils import bsfs, node, ns
# exports
__all__: typing.Sequence[str] = (
'Pipeline',
)
## code ##
logger = logging.getLogger(__name__)
class Pipeline():
"""Extraction pipeline to generate triples from files.
The Pipeline binds readers and extractors, and performs
the necessary operations to produce triples from a file.
It takes a best-effort approach to extract as many triples
as possible. Errors during the extraction are passed over
and reported to the log.
"""
# combined extractor schemas.
schema: _schema.Schema
# node prefix.
_prefix: URI
# extractor -> reader mapping
_ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
def __init__(
self,
prefix: URI,
ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
):
# store core members
self._prefix = prefix
self._ext2rdr = ext2rdr
# compile schema from all extractors
self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)
def __str__(self) -> str:
return bsfs.typename(self)
def __repr__(self) -> str:
return f'{bsfs.typename(self)}(...)'
def __hash__(self) -> int:
return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, type(self)) \
and self.schema == other.schema \
and self._prefix == other._prefix \
and self._ext2rdr == other._ext2rdr
def predicates(self) -> typing.Iterator[_schema.Predicate]:
"""Return the predicates that are extracted from a file."""
return iter({pred for ext in self._ext2rdr for pred in ext.predicates()})
def __call__(
self,
path: URI,
predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
"""Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
# get predicates
predicates = set(predicates) if predicates is not None else set(self.schema.predicates())
# get extractors
extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}
# corner-case short-cut
if len(extractors) == 0:
return
# get readers -> extractors mapping
rdr2ext = defaultdict(set)
for ext in extractors:
rdr = self._ext2rdr[ext]
rdr2ext[rdr].add(ext)
# create subject for file
uuid = _uuid.UCID.from_path(path)
subject = Node(ns.bsfs.Entity, self._prefix + uuid)
# extract information
for rdr, extrs in rdr2ext.items():
try:
# get content
content = rdr(path) if rdr is not None else None
# apply extractors on this content
for ext in extrs:
try:
# get predicate/value tuples
for node, pred, value in ext.extract(subject, content, predicates):
yield node, pred, value
except base.errors.ExtractorError as err:
# critical extractor failure.
logger.error('%s failed to extract triples from content: %s', ext, err)
except base.errors.ReaderError as err:
# failed to read any content. skip.
logger.error('%s failed to read content: %s', rdr, err)
## EOF ##
|