1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
|
"""
Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# standard imports
from collections import defaultdict
import logging
import typing
# bsie imports
from bsie.extractor import Extractor
from bsie.reader import Reader
from bsie.utils import bsfs, errors, node, ns
# exports
__all__: typing.Sequence[str] = (
'Pipeline',
)
## code ##
logger = logging.getLogger(__name__)
class Pipeline():
"""Extraction pipeline to generate triples from files.
The Pipeline binds readers and extractors, and performs
the necessary operations to produce triples from a file.
It takes a best-effort approach to extract as many triples
as possible. Errors during the extraction are passed over
and reported to the log.
"""
# combined extractor schemas.
_schema: bsfs.schema.Schema
# extractor -> reader mapping
_ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
def __init__(
self,
ext2rdr: typing.Dict[Extractor, typing.Optional[Reader]]
):
# store core members
self._ext2rdr = ext2rdr
# compile schema from all extractors
self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr)
def __str__(self) -> str:
return bsfs.typename(self)
def __repr__(self) -> str:
return f'{bsfs.typename(self)}(...)'
def __hash__(self) -> int:
return hash((type(self), self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))
def __eq__(self, other: typing.Any) -> bool:
return isinstance(other, type(self)) \
and self._schema == other._schema \
and self._ext2rdr == other._ext2rdr
@property
def schema(self) -> bsfs.schema.Schema:
"""Return the pipeline's schema (combined from all extractors)."""
return self._schema
@property
def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
"""Return the principal predicates that can be extracted."""
return iter({pred for ext in self._ext2rdr for pred in ext.principals})
def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema:
"""Return the subset of the schema that supports the given *principals*."""
# materialize principals
principals = set(principals)
# collect and combine schemas from extractors
return bsfs.schema.Schema.Union({
ext.schema
for ext
in self._ext2rdr
if not set(ext.principals).isdisjoint(principals)
})
def __call__(
self,
path: bsfs.URI,
principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None,
) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
"""Extract triples from the file at *path*. Optionally, limit triples to *principals*."""
# get principals
principals = set(principals) if principals is not None else set(self.schema.predicates())
# get extractors
extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)}
# corner-case short-cut
if len(extractors) == 0:
return
# get readers -> extractors mapping
rdr2ext = defaultdict(set)
for ext in extractors:
rdr = self._ext2rdr[ext]
rdr2ext[rdr].add(ext)
# create subject for file
subject = node.Node(ns.bsfs.File,
ucid=bsfs.uuid.UCID.from_path(path),
)
# extract information
for rdr, extrs in rdr2ext.items():
try:
# get content
content = rdr(path) if rdr is not None else None
#logger.info('extracted %s from %s', rdr, path)
# apply extractors on this content
for ext in extrs:
try:
# get predicate/value tuples
yield from ext.extract(subject, content, principals)
except errors.ExtractorError as err:
# critical extractor failure.
logger.error('%s failed to extract triples from content: %s', ext, err)
except errors.UnsupportedFileFormatError:
# failed to read the file format. skip.
#logger.warning('%s could not process the file format of %s', rdr, err)
pass
except errors.ReaderError as err:
# failed to read any content. skip.
logger.error('%s failed to read content: %s', rdr, err)
## EOF ##
|