aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/tools/pipeline.py
blob: 3d089939816106edaf81a8c657e9ff0bbe6189cf (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
"""

Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
from collections import defaultdict
import logging
import typing

# bsie imports
from bsie import base
from bsie.utils.node import Node
from bsie.utils.bsfs import schema as _schema, URI, uuid as _uuid, typename
from bsie.utils import bsfs, node, ns

# exports
__all__: typing.Sequence[str] = (
    'Pipeline',
    )

## code ##

logger = logging.getLogger(__name__)

class Pipeline():
    """Extraction pipeline to generate triples from files.

    The Pipeline binds readers and extractors, and performs
    the necessary operations to produce triples from a file.
    It takes a best-effort approach to extract as many triples
    as possible. Errors during the extraction are passed over
    and reported to the log.

    """

    # combined extractor schemas.
    schema: _schema.Schema

    # node prefix.
    _prefix: URI

    # extractor -> reader mapping
    _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]

    def __init__(
            self,
            prefix: URI,
            ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
            ):
        # store core members
        self._prefix = prefix
        self._ext2rdr = ext2rdr
        # compile schema from all extractors
        self.schema = _schema.Schema.Union(ext.schema for ext in ext2rdr)

    def __str__(self) -> str:
        return bsfs.typename(self)

    def __repr__(self) -> str:
        return f'{bsfs.typename(self)}(...)'

    def __hash__(self) -> int:
        return hash((type(self), self._prefix, self.schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))

    def __eq__(self, other: typing.Any) -> bool:
        return isinstance(other, type(self)) \
           and self.schema == other.schema \
           and self._prefix == other._prefix \
           and self._ext2rdr == other._ext2rdr

    def predicates(self) -> typing.Iterator[_schema.Predicate]:
        """Return the predicates that are extracted from a file."""
        return iter({pred for ext in self._ext2rdr for pred in ext.predicates()})

    def __call__(
            self,
            path: URI,
            predicates: typing.Optional[typing.Iterable[_schema.Predicate]] = None,
            ) -> typing.Iterator[typing.Tuple[Node, _schema.Predicate, typing.Any]]:
        """Extract triples from the file at *path*. Optionally, limit triples to *predicates*."""
        # get predicates
        predicates = set(predicates) if predicates is not None else set(self.schema.predicates())

        # get extractors
        extractors = {ext for ext in self._ext2rdr if not set(ext.predicates()).isdisjoint(predicates)}

        # corner-case short-cut
        if len(extractors) == 0:
            return

        # get readers -> extractors mapping
        rdr2ext = defaultdict(set)
        for ext in extractors:
            rdr = self._ext2rdr[ext]
            rdr2ext[rdr].add(ext)

        # create subject for file
        uuid = bsfs.uuid.UCID.from_path(path)
        subject = node.Node(ns.bsfs.File, self._prefix + 'file#' + uuid)

        # extract information
        for rdr, extrs in rdr2ext.items():
            try:
                # get content
                content = rdr(path) if rdr is not None else None

                # apply extractors on this content
                for ext in extrs:
                    try:
                        # get predicate/value tuples
                        for node, pred, value in ext.extract(subject, content, predicates):
                            yield node, pred, value

                    except base.errors.ExtractorError as err:
                        # critical extractor failure.
                        logger.error('%s failed to extract triples from content: %s', ext, err)

            except base.errors.ReaderError as err:
                # failed to read any content. skip.
                logger.error('%s failed to read content: %s', rdr, err)


## EOF ##