aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/tools/pipeline.py
blob: 20e8ddf7d6ef96fb992a0a2541b9bddd9dea666b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
"""

Part of the bsie module.
A copy of the license is provided with the project.
Author: Matthias Baumgartner, 2022
"""
# imports
from collections import defaultdict
import logging
import typing

# bsie imports
from bsie import base
from bsie.utils import bsfs, node, ns

# exports
__all__: typing.Sequence[str] = (
    'Pipeline',
    )

# constants
FILE_PREFIX = 'file#'

## code ##

logger = logging.getLogger(__name__)

class Pipeline():
    """Extraction pipeline to generate triples from files.

    The Pipeline binds readers and extractors, and performs
    the necessary operations to produce triples from a file.
    It takes a best-effort approach to extract as many triples
    as possible. Errors during the extraction are passed over
    and reported to the log.

    """

    # combined extractor schemas.
    _schema: bsfs.schema.Schema

    # node prefix.
    _prefix: bsfs.Namespace

    # extractor -> reader mapping
    _ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]

    def __init__(
            self,
            prefix: bsfs.Namespace,
            ext2rdr: typing.Dict[base.extractor.Extractor, typing.Optional[base.reader.Reader]]
            ):
        # store core members
        self._prefix = prefix + FILE_PREFIX
        self._ext2rdr = ext2rdr
        # compile schema from all extractors
        self._schema = bsfs.schema.Schema.Union(ext.schema for ext in ext2rdr)

    def __str__(self) -> str:
        return bsfs.typename(self)

    def __repr__(self) -> str:
        return f'{bsfs.typename(self)}(...)'

    def __hash__(self) -> int:
        return hash((type(self), self._prefix, self._schema, tuple(self._ext2rdr), tuple(self._ext2rdr.values())))

    def __eq__(self, other: typing.Any) -> bool:
        return isinstance(other, type(self)) \
           and self._schema == other._schema \
           and self._prefix == other._prefix \
           and self._ext2rdr == other._ext2rdr

    @property
    def schema(self) -> bsfs.schema.Schema:
        """Return the pipeline's schema (combined from all extractors)."""
        return self._schema

    @property
    def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
        """Return the principal predicates that can be extracted."""
        return iter({pred for ext in self._ext2rdr for pred in ext.principals})

    def subschema(self, principals: typing.Iterable[bsfs.schema.Predicate]) -> bsfs.schema.Schema:
        """Return the subset of the schema that supports the given *principals*."""
        # materialize principals
        principals = set(principals)
        # collect and combine schemas from extractors
        return bsfs.schema.Schema.Union({
            ext.schema
            for ext
            in self._ext2rdr
            if not set(ext.principals).isdisjoint(principals)
            })

    def __call__(
            self,
            path: bsfs.URI,
            principals: typing.Optional[typing.Iterable[bsfs.schema.Predicate]] = None,
            ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
        """Extract triples from the file at *path*. Optionally, limit triples to *principals*."""
        # get principals
        principals = set(principals) if principals is not None else set(self.schema.predicates())

        # get extractors
        extractors = {ext for ext in self._ext2rdr if not set(ext.principals).isdisjoint(principals)}

        # corner-case short-cut
        if len(extractors) == 0:
            return

        # get readers -> extractors mapping
        rdr2ext = defaultdict(set)
        for ext in extractors:
            rdr = self._ext2rdr[ext]
            rdr2ext[rdr].add(ext)

        # create subject for file
        uuid = bsfs.uuid.UCID.from_path(path)
        subject = node.Node(ns.bsfs.File, self._prefix[uuid])

        # extract information
        for rdr, extrs in rdr2ext.items():
            try:
                # get content
                content = rdr(path) if rdr is not None else None

                # apply extractors on this content
                for ext in extrs:
                    try:
                        # get predicate/value tuples
                        for subject, pred, value in ext.extract(subject, content, principals):
                            yield subject, pred, value

                    except base.errors.ExtractorError as err:
                        # critical extractor failure.
                        logger.error('%s failed to extract triples from content: %s', ext, err)

            except base.errors.ReaderError as err:
                # failed to read any content. skip.
                logger.error('%s failed to read content: %s', rdr, err)


## EOF ##