aboutsummaryrefslogtreecommitdiffstats
path: root/bsie/extractor/image
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
committerMatthias Baumgartner <dev@igsor.net>2023-03-05 19:22:58 +0100
commita35b33f4f1ddcf6f1bb8ab0f41b87bf2b847f11d (patch)
treefb220da28bb7248ebf37ce09af5de88f2c1aaad4 /bsie/extractor/image
parent7582c280ad5324a2f0427999911c7e7abc14a6ab (diff)
parentaf81318ae9311fd0b0e16949cef3cfaf7996970b (diff)
downloadbsie-release.tar.gz
bsie-release.tar.bz2
bsie-release.zip
Merge branch 'develop'HEADv0.23.03releasemain
Diffstat (limited to 'bsie/extractor/image')
-rw-r--r--bsie/extractor/image/__init__.py8
-rw-r--r--bsie/extractor/image/colors_spatial.py150
-rw-r--r--bsie/extractor/image/photometrics.py211
3 files changed, 369 insertions, 0 deletions
diff --git a/bsie/extractor/image/__init__.py b/bsie/extractor/image/__init__.py
new file mode 100644
index 0000000..f82424a
--- /dev/null
+++ b/bsie/extractor/image/__init__.py
@@ -0,0 +1,8 @@
+
+# standard imports
+import typing
+
+# exports
+__all__: typing.Sequence[str] = []
+
+## EOF ##
diff --git a/bsie/extractor/image/colors_spatial.py b/bsie/extractor/image/colors_spatial.py
new file mode 100644
index 0000000..e6661a9
--- /dev/null
+++ b/bsie/extractor/image/colors_spatial.py
@@ -0,0 +1,150 @@
+"""Spatial color features.
+"""
+# standard imports
+import typing
+
+# external imports
+import PIL.Image
+import numpy as np
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from .. import base
+
+# constants
+FEATURE_NAME = ns.bsf.ColorsSpatial()
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'ColorsSpatial',
+ )
+
+
+## code ##
+
+class ColorsSpatial(base.Extractor):
+ """Determine dominant colors of subregions in the image.
+
+ Computes the domiant color of increasingly smaller subregions of the image.
+ """
+
+ CONTENT_READER = 'bsie.reader.image.Image'
+
+ # Initial subregion width.
+ width: int
+
+ # Initial subregion height.
+ height: int
+
+ # Decrement exponent.
+ exp: float
+
+ # Principal predicate's URI.
+ _predicate_name: bsfs.URI
+
+ def __init__(
+ self,
+ width: int = 32,
+ height: int = 32,
+ exp: float = 4.,
+ ):
+ # instance identifier
+ uuid = bsfs.uuid.UCID.from_dict({
+ 'width': width,
+ 'height': height,
+ 'exp': exp,
+ })
+ # determine symbol names
+ instance_name = getattr(FEATURE_NAME, uuid)
+ predicate_name = getattr(ns.bse, 'colors_spatial_' + uuid)
+ # get vector dimension
+ dimension = self.dimension(width, height, exp)
+ # initialize parent with the schema
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f'''
+ <{FEATURE_NAME}> rdfs:subClassOf bsa:Feature ;
+ # annotations
+ rdfs:label "Spatially dominant colors"^^xsd:string ;
+ schema:description "Domiant colors of subregions in an image."^^xsd:string ;
+ bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ;
+ bsfs:dtype xsd:integer .
+
+ <{instance_name}> rdfs:subClassOf <{FEATURE_NAME}> ;
+ bsfs:dimension "{dimension}"^^xsd:integer ;
+ # annotations
+ <{FEATURE_NAME}/args#width> "{width}"^^xsd:integer ;
+ <{FEATURE_NAME}/args#height> "{height}"^^xsd:integer ;
+ <{FEATURE_NAME}/args#exp> "{exp}"^^xsd:float .
+
+ <{predicate_name}> rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range <{instance_name}> ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ '''))
+ # assign extra members
+ self.width = width
+ self.height = height
+ self.exp = exp
+ self._predicate_name = predicate_name
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}({self.width}, {self.height}, {self.exp})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self.width == other.width \
+ and self.height == other.height \
+ and self.exp == other.exp
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), self.width, self.height, self.exp))
+
+ @staticmethod
+ def dimension(width: int, height: int, exp: float) -> int:
+ """Return the feature vector dimension."""
+ # FIXME: replace with a proper formula
+ dim = 0
+ while width >= 1 and height >= 1:
+ dim += width * height
+ width = np.floor(width / exp)
+ height = np.floor(height / exp)
+ dim *= 3 # per band
+ return int(dim)
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: PIL.Image.Image,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ # check principals
+ if self.schema.predicate(self._predicate_name) not in principals:
+ # nothing to do; abort
+ return
+
+ # convert to HSV
+ content = content.convert('HSV')
+
+ # get dimensions
+ width, height = self.width, self.height
+ num_bands = len(content.getbands()) # it's three since we converted to HSV before
+
+ features = []
+ while width >= 1 and height >= 1:
+ # downsample
+ img = content.resize((width, height), resample=PIL.Image.Resampling.BOX)
+ # feature vector
+ features.append(
+ np.array(img.getdata()).reshape((width * height, num_bands)))
+ # iterate
+ width = int(np.floor(width / self.exp))
+ height = int(np.floor(height / self.exp))
+
+ # combine bands and convert features to tuple
+ value = tuple(np.vstack(features).reshape(-1))
+ # return triple with feature vector as value
+ yield subject, self.schema.predicate(self._predicate_name), value
+
+## EOF ##
diff --git a/bsie/extractor/image/photometrics.py b/bsie/extractor/image/photometrics.py
new file mode 100644
index 0000000..42eb3c8
--- /dev/null
+++ b/bsie/extractor/image/photometrics.py
@@ -0,0 +1,211 @@
+
+# standard imports
+from fractions import Fraction
+import typing
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from .. import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'Exif',
+ )
+
+
+## code ##
+
+def _gps_to_dec(coords: typing.Tuple[float, float, float]) -> float:
+ """Convert GPS coordinates from exif to float."""
+ # unpack args
+ deg, min, sec = coords # pylint: disable=redefined-builtin # min
+ # convert to float
+ deg = float(Fraction(deg))
+ min = float(Fraction(min))
+ sec = float(Fraction(sec))
+
+ if float(sec) > 0:
+ # format is deg+min+sec
+ return (float(deg) * 3600 + float(min) * 60 + float(sec)) / 3600
+ # format is deg+min
+ return float(deg) + float(min) / 60
+
+
+class Exif(base.Extractor):
+ """Extract information from EXIF/IPTC tags of an image file."""
+
+ CONTENT_READER = 'bsie.reader.exif.Exif'
+
+ def __init__(self):
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + '''
+ #bse:t_capture rdfs:subClassOf bsfs:Predicate ;
+ # rdfs:domain bsn:Entity ;
+ # rdfs:range xsd:float ;
+ # bsfs:unique "true"^^xsd:boolean .
+ bse:exposure rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:aperture rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:iso rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:integer ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:focal_length rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:width rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:integer ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:height rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:integer ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:orientation rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:integer ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:orientation_label rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:string ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:altitude rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:latitude rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ bse:longitude rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+ '''))
+ # initialize mapping from predicate to callback
+ self._callmap = {
+ #self.schema.predicate(ns.bse.t_capture): self._date,
+ self.schema.predicate(ns.bse.exposure): self._exposure,
+ self.schema.predicate(ns.bse.aperture): self._aperture,
+ self.schema.predicate(ns.bse.iso): self._iso,
+ self.schema.predicate(ns.bse.focal_length): self._focal_length,
+ self.schema.predicate(ns.bse.width): self._width,
+ self.schema.predicate(ns.bse.height): self._height,
+ self.schema.predicate(ns.bse.orientation): self._orientation,
+ self.schema.predicate(ns.bse.orientation_label): self._orientation_label,
+ self.schema.predicate(ns.bse.altitude): self._altitude,
+ self.schema.predicate(ns.bse.latitude): self._latitude,
+ self.schema.predicate(ns.bse.longitude): self._longitude,
+ }
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: dict,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ for pred in principals:
+ # find callback
+ clbk = self._callmap.get(pred)
+ if clbk is None:
+ continue
+ # get value
+ value = clbk(content)
+ if value is None:
+ continue
+ # produce triple
+ yield subject, pred, value
+
+ #def _date(self, content: dict): # FIXME: Return type annotation
+ # date_keys = (
+ # 'Exif.Photo.DateTimeOriginal',
+ # 'Exif.Photo.DateTimeDigitized',
+ # 'Exif.Image.DateTime',
+ # )
+ # for key in date_keys:
+ # if key in content:
+ # dt = content[key].value
+ # if dt.tzinfo is None:
+ # dt = dt.replace(tzinfo=ttime.NoTimeZone)
+ # return dt
+ # return None
+
+
+ ## photometrics
+
+ def _exposure(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.Photo.ExposureTime' in content:
+ return 1.0 / float(Fraction(content['Exif.Photo.ExposureTime']))
+ return None
+
+ def _aperture(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.Photo.FNumber' in content:
+ return float(Fraction(content['Exif.Photo.FNumber']))
+ return None
+
+ def _iso(self, content: dict) -> typing.Optional[int]:
+ if 'Exif.Photo.ISOSpeedRatings' in content:
+ return int(content['Exif.Photo.ISOSpeedRatings'])
+ return None
+
+ def _focal_length(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.Photo.FocalLength' in content:
+ return float(Fraction(content['Exif.Photo.FocalLength']))
+ return None
+
+
+ ## image dimensions
+
+ def _width(self, content: dict) -> typing.Optional[int]:
+ # FIXME: consider orientation!
+ if 'Exif.Photo.PixelXDimension' in content:
+ return int(content['Exif.Photo.PixelXDimension'])
+ return None
+
+ def _height(self, content: dict) -> typing.Optional[int]:
+ # FIXME: consider orientation!
+ if 'Exif.Photo.PixelYDimension' in content:
+ return int(content['Exif.Photo.PixelYDimension'])
+ return None
+
+ def _orientation(self, content: dict) -> typing.Optional[int]:
+ if 'Exif.Image.Orientation' in content:
+ return int(content['Exif.Image.Orientation'])
+ return None
+
+ def _orientation_label(self, content: dict) -> typing.Optional[str]:
+ width = self._width(content)
+ height = self._height(content)
+ ori = self._orientation(content)
+ if width is not None and height is not None and ori is not None:
+ if ori <= 4:
+ return 'landscape' if width >= height else 'portrait'
+ return 'portrait' if width >= height else 'landscape'
+ return None
+
+
+ ## location
+
+ def _altitude(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.GPSInfo.GPSAltitude' in content:
+ return float(Fraction(content['Exif.GPSInfo.GPSAltitude']))
+ return None
+
+ def _latitude(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.GPSInfo.GPSLatitude' in content:
+ return _gps_to_dec(content['Exif.GPSInfo.GPSLatitude'].split())
+ return None
+
+ def _longitude(self, content: dict) -> typing.Optional[float]:
+ if 'Exif.GPSInfo.GPSLongitude' in content:
+ return _gps_to_dec(content['Exif.GPSInfo.GPSLongitude'].split())
+ return None
+
+## EOF ##