aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/tools/pipeline.py
blob: 8e1c992e328391e3a88c4506acdd46bc3ad7a9bd (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
"""

Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
from collections import defaultdict
import logging
import typing

# bsie imports
from bsie import base
from bsie.utils import ns
from bsie.utils.node import Node
from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename

# exports
__all__: typing.Sequence[str] = (
    'Pipeline',
    )

## code ##

logger = logging.getLogger(__name__)

class Pipeline():
    """Extraction pipeline to generate triples from files.

    The Pipeline binds readers and extractors, and performs
    the necessary operations to produce triples from a file.
    It takes a best-effort approach to extract as many triples
    as possible. Errors during the extraction are passed over
    and reported to the log.

    """

    # combined extractor schemas.
    schema: _schema.Schema

    # node prefix.
    _prefix: URI

    # extractor -> reader mapping
    _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]

    def __init__(
            self,
            prefix: URI,
            ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
            ):
        # store core members
        self._prefix = prefix
        self._ext2rdr = ext2rdr
        # compile schema from all extractors
        self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)

    def __str__(self) -> str:
        return typename(self)

    def __repr__(self) -> str:
        return f'{typename(self)}(...)'

    def __hash__(self) -> int:
        return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))

    def __eq__(self, other: typing.Any) -> bool:
        return isinstance(other, type(self)) \
           and self.schema == other.schema \
           and self._prefix == other._prefix \
           and self._ext2rdr == other._ext2rdr

    def __call__(
            self,
            path: URI,
            predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
            ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
        """Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
        # get predicates
        predicates = set(predicates) if predicates is not None else set(self.schema.predicates())

        # get extractors
        extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}

        # corner-case short-cut
        if len(extractors) == 0:
            return

        # get readers -> extractors mapping
        rdr2ext = defaultdict(set)
        for ext in extractors:
            rdr = self._ext2rdr[ext]
            rdr2ext[rdr].add(ext)

        # create subject for file
        uuid = _uuid.UCID.from_path(path)
        subject = Node(ns.bsfs.Entity, self._prefix + uuid)

        # extract information
        for rdr, extrs in rdr2ext.items():
            try:
                # get content
                content = rdr(path) if rdr is not None else None

                # apply extractors on this content
                for ext in extrs:
                    try:
                        # get predicate/value tuples
                        for node, pred, value in ext.extract(subject, content, predicates):
                            yield node, pred, value

                    except base.errors.ExtractorError as err:
                        # critical extractor failure.
                        logger.error('%s failed to extract triples from content: %s', ext, err)

            except base.errors.ReaderError as err:
                # failed to read any content. skip.
                logger.error('%s failed to read content: %s', rdr, err)


## EOF ##