"""Extract information from the file system, such as filesize. Part of the bsie module. A copy of the license is provided with the project. Author: Matthias Baumgartner, 2022 """ # imports import os import typing # bsie imports from bsie.base import extractor from bsie.utils.bsfs import schema as _schema from bsie.utils import bsfs, node, ns # exports __all__: typing.Sequence[str] = ( 'Stat', ) ## code ## class Stat(extractor.Extractor): """Extract information from the file system.""" CONTENT_READER = 'bsie.reader.stat.Stat' # mapping from predicate to handler function. _callmap: typing.Dict[bsfs.schema.Predicate, typing.Callable[[os.stat_result], typing.Any]] def __init__(self): super().__init__(bsfs.schema.Schema.from_string(extractor.SCHEMA_PREAMBLE + ''' bse:filesize rdfs:subClassOf bsfs:Predicate ; rdfs:domain bsfs:Entity ; rdfs:range xsd:integer ; rdfs:label "File size"^^xsd:string ; schema:description "File size of entity in some filesystem."^^xsd:string ; bsfs:unique "false"^^xsd:boolean . ''')) self._callmap = { self.schema.predicate(ns.bse.filesize): self.__filesize, } def extract( self, subject: node.Node, content: os.stat_result, predicates: typing.Iterable[_schema.Predicate], ) -> typing.Iterator[typing.Tuple[node.Node, _schema.Predicate, typing.Any]]: for pred in predicates: # find callback clbk = self._callmap.get(pred) if clbk is None: continue # get value value = clbk(content) if value is None: continue # produce triple yield subject, pred, value def __filesize(self, content: os.stat_result) -> typing.Optional[int]: """Return the file size.""" try: return content.st_size except Exception: # FIXME: some kind of error reporting (e.g. logging) return None ## EOF ##