aboutsummaryrefslogtreecommitdiffstats
path: root/bsie
diff options
context:
space:
mode:
Diffstat (limited to 'bsie')
-rw-r--r--bsie/extractor/image/face/__init__.py8
-rw-r--r--bsie/extractor/image/face/detect.py93
-rw-r--r--bsie/extractor/image/face/identify.py176
-rw-r--r--bsie/lib/naming_policy.py11
-rw-r--r--bsie/reader/face.py179
5 files changed, 467 insertions, 0 deletions
diff --git a/bsie/extractor/image/face/__init__.py b/bsie/extractor/image/face/__init__.py
new file mode 100644
index 0000000..f82424a
--- /dev/null
+++ b/bsie/extractor/image/face/__init__.py
@@ -0,0 +1,8 @@
+
+# standard imports
+import typing
+
+# exports
+__all__: typing.Sequence[str] = []
+
+## EOF ##
diff --git a/bsie/extractor/image/face/detect.py b/bsie/extractor/image/face/detect.py
new file mode 100644
index 0000000..94e3a61
--- /dev/null
+++ b/bsie/extractor/image/face/detect.py
@@ -0,0 +1,93 @@
+
+# standard imports
+import typing
+
+# external imports
+import torch
+from facenet_pytorch import MTCNN, InceptionResnetV1
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from ... import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceDetect',
+ )
+
+
+## code ##
+
+bsf = ns.bsn.Face()
+
+class FaceDetect(base.Extractor):
+
+ CONTENT_READER = 'bsie.reader.face.FaceExtract'
+
+ def __init__(self):
+ # initialize parent with the schema
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f'''
+ prefix bsf: <https://schema.bsfs.io/ie/Node/Face#>
+
+ bsn:Face rdfs:subClassOf bsfs:Node .
+
+ <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512>
+ rdfs:subClassOf bsa:Feature ;
+ bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ;
+ bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>;
+ bsfs:dimension "512"^^xsd:integer .
+
+ bse:face rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range bsn:Face .
+
+ bsf:x rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:y rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:width rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:height rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:embedding rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ '''))
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: dict,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ # check principals
+ if self.schema.predicate(ns.bse.face) not in principals:
+ # nothing to do; abort
+ return
+
+ for face in content:
+ fnode = node.Node(ns.bsn.Face, ucid=face['ucid'])
+ yield subject, ns.bse.face, fnode
+ yield fnode, bsf.x, face['x']
+ yield fnode, bsf.y, face['y']
+ yield fnode, bsf.width, face['width']
+ yield fnode, bsf.height, face['height']
+ yield fnode, bsf.embedding, face['embedding'].detach().cpu().numpy()
+
+## EOF ##
diff --git a/bsie/extractor/image/face/identify.py b/bsie/extractor/image/face/identify.py
new file mode 100644
index 0000000..152f113
--- /dev/null
+++ b/bsie/extractor/image/face/identify.py
@@ -0,0 +1,176 @@
+
+# standard imports
+import csv
+import typing
+
+# external imports
+from facenet_pytorch import MTCNN, InceptionResnetV1
+import numpy as np
+import torch
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from ... import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceIdentify',
+ )
+
+
+## code ##
+
+bsf = ns.bsn.Face()
+
+class FaceIdentify(base.Extractor):
+
+ CONTENT_READER = 'bsie.reader.face.FaceExtract'
+
+ _restklasse: bsfs.URI
+ _thres: float
+ _device: torch.device
+ _restidx: int
+ _id2name: typing.Dict[int, str]
+ _embeds: torch.Tensor
+ _targets: torch.Tensor
+
+
+
+ # FIXME: This could be a bsfs maintenance function instead of a bsie function
+
+ def __init__(
+ self,
+ # FIXME: Initialize from bsfs storage instead of files
+ ref_embeds: str,
+ ref_mapping: str,
+ thres: float = 0.9,
+ cuda_device: str = 'cuda:0',
+ restklasse: str = 'https://example.com/user/anon',
+ ):
+ # initialize parent with the schema
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f'''
+ bsn:Face rdfs:subClassOf bsfs:Node .
+ bsn:Person rdfs:subClassOf bsfs:Node .
+ <https://schema.bsfs.io/ie/Node/Face#depicts> rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range bsn:Person .
+ # FIXME: Entity -> Face?
+ bse:face rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range bsn:Face .
+ # FIXME: Face -> Embedding?
+ #<https://schema.bsfs.io/ie/Node/Face#embedding>
+ # rdfs:subClassOf bsfs:Predicate ;
+ # rdfs:domain bsn:Face ;
+ # rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ;
+ # bsfs:unique "true"^^xsd:boolean .
+ #<https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512>
+ # rdfs:subClassOf bsa:Feature ;
+ # bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ;
+ # bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>;
+ # bsfs:dimension "512"^^xsd:integer .
+
+ '''))
+ # store extra members
+ self._restklasse = bsfs.URI(restklasse)
+ self._thres = thres
+ # get face instances
+ self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu')
+ with open(ref_embeds, 'rb') as ifile:
+ emb_with_trg = np.load(ifile)
+ targets, embeds = emb_with_trg[:, 0], emb_with_trg[:, 1:]
+ self._targets = torch.tensor(targets, dtype=torch.int32).to(self._device)
+ self._embeds = torch.tensor(embeds).to(self._device)
+ with open(ref_mapping, 'rt') as ifile:
+ mapping = [(int(idx), name) for name, idx in csv.reader(ifile)]
+ # ensure that the mapping is unique
+ ids, names = zip(*mapping)
+ if len(set(names)) != len(names):
+ raise Exception('people identifiers must be unique')
+ if len(set(ids)) != len(ids):
+ raise Exception('people indices must be unique')
+ # ensure that all targets are accounted for
+ if not {int(i) for i in self._targets.tolist()}.issubset(set(ids)):
+ raise Exception('all targets must be labelled')
+ # ensure and fetch the index of the restklasse
+ if self._restklasse not in names:
+ mapping.append((max(ids) + 1, self._restklasse))
+ # store mapping
+ self._restidx = [idx for idx, name in mapping if name == self._restklasse][0]
+ self._id2name = dict(mapping)
+ # discard the restklasse from the reference points
+ self._embeds = self._embeds[self._targets != self._restidx]
+ self._targets = self._targets[self._targets != self._restidx]
+
+ @property
+ def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
+ """Return the principal predicates, i.e., relations from/to the extraction subject."""
+ yield from super().principals
+ yield self.schema.predicate(bsf.depicts)
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}(N={len(self._embeds)}, restklasse={self._restklasse})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self._thres == other._thres \
+ and self._id2name == other._id2name \
+ and torch.equal(self._embeds, other._embeds) \
+ and torch.equal(self._targets, other._targets) \
+ and self._restklasse == other._restklasse \
+ and self._restidx == other._restidx
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(),
+ tuple(sorted(self._id2name.items())),
+ self._thres,
+ tuple(self._embeds.detach().cpu().numpy().reshape(-1).tolist()),
+ tuple(self._targets.detach().cpu().numpy().reshape(-1).tolist()),
+ self._restklasse,
+ self._restidx,
+ ))
+
+ def _classify(self, emb: torch.Tensor) -> torch.Tensor: # [Nx512] -> [N]
+ # nearest neighbour approach
+ dist = torch.cdist(emb, self._embeds) # pairwise distances
+ best = dist.argmin(dim=1) # idx of lowest distance, per row
+ labels = self._targets[best] # label (int) of nearest neighbour
+ acc = dist[range(len(best)), best] < self._thres # check if distance is below threshold
+ return [lbl.item() if cnd == True else self._restidx for cnd, lbl in zip(acc, labels)]
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: typing.Any,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ # check principals
+ #if self.schema.predicate(bsf.depicts) not in principals:
+ if self.schema.predicate(ns.bse.face) not in principals:
+ # nothing to do; abort
+ return
+ # check content
+ if len(content) == 0:
+ return
+
+ # collect embeddings
+ emb = torch.vstack([face['embedding'] for face in content]).to(self._device)
+ # apply classifier
+ labels = self._classify(emb)
+ # walk through faces
+ for face, idx in zip(content, labels):
+ lbl = bsfs.URI(self._id2name[idx]) # label (uri) of nearest neighbour
+ if lbl == self._restklasse: # suppress
+ continue
+ pnode = node.Node(ns.bsn.Person, uri=lbl)
+ fnode = node.Node(ns.bsn.Face, ucid=face['ucid'])
+ # emit triple
+ yield fnode, self.schema.predicate(bsf.depicts), pnode
+ # FIXME: emit subject -> face -> fnode?
+ yield subject, self.schema.predicate(ns.bse.face), fnode
+ # FIXME: emit embedding?
+ #yield fnode, bsf.embedding, face['embedding']
+
+## EOF ##
diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py
index ffef7d9..fbdbeb0 100644
--- a/bsie/lib/naming_policy.py
+++ b/bsie/lib/naming_policy.py
@@ -89,6 +89,8 @@ class DefaultNamingPolicy(NamingPolicy):
return self.name_preview(node)
if node.node_type == ns.bsn.Tag:
return self.name_tag(node)
+ if node.node_type == ns.bsn.Face:
+ return self.name_face(node)
raise errors.ProgrammingError(f'no naming policy available for {node.node_type}')
def name_entity(self, node: Node) -> Node:
@@ -127,4 +129,13 @@ class DefaultNamingPolicy(NamingPolicy):
node.uri = getattr(self._prefix.tag(), fragment)
return node
+ def name_face(self, node: Node) -> Node:
+ if 'ucid' in node.hints: # content id
+ fragment = node.hints['ucid']
+ else: # random name
+ fragment = self._uuid()
+ node.uri = getattr(self._prefix.face(), fragment)
+ return node
+
+
## EOF ##
diff --git a/bsie/reader/face.py b/bsie/reader/face.py
new file mode 100644
index 0000000..c5374e0
--- /dev/null
+++ b/bsie/reader/face.py
@@ -0,0 +1,179 @@
+
+# standard imports
+import operator
+import typing
+
+# external imports
+from facenet_pytorch import MTCNN, InceptionResnetV1
+import PIL.Image
+import torch
+
+# bsie imports
+from bsie.utils import bsfs, errors, node, ns
+
+# inner-module imports
+from . import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceExtract',
+ )
+
+
+## code ##
+
+class FaceExtract(base.Reader):
+ """Extract faces and their feature vector from an image file."""
+
+ # Face patch size.
+ _target_size: int
+
+ # Lower bound on the detected face's probability.
+ _min_face_prob: float
+
+ # Face detector network.
+ _detector: MTCNN
+
+ # Face feature extractor network.
+ _embedder: InceptionResnetV1
+
+ def __init__(
+ self,
+ target_size: int = 1000,
+ min_face_size: int = 40,
+ min_face_prob: float = 0.992845,
+ cuda_device: str = 'cuda:0',
+ ext_face_size: int = 160,
+ thresholds: typing.Tuple[float, float, float] = [0.5, 0.6, 0.6],
+ factor: float = 0.709,
+ ):
+ # initialize
+ self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu')
+ # initialize the face detection network
+ self._target_size = target_size
+ self._min_face_prob = min_face_prob
+ self._carghash = hash((min_face_size, ext_face_size, tuple(thresholds), factor))
+ self._detector = MTCNN(
+ min_face_size=min_face_size,
+ image_size=ext_face_size,
+ thresholds=thresholds,
+ factor=factor,
+ device=self._device,
+ keep_all=True,
+ ).to(self._device)
+ # initialize the face embedding netwrok
+ self._embedder = InceptionResnetV1('vggface2').to(self._device).eval()
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}({self._min_face_prob})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self._target_size == other._target_size \
+ and self._min_face_prob == other._min_face_prob \
+ and self._carghash == other._carghash
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), self._target_size, self._min_face_prob, self._carghash))
+
+ @staticmethod
+ def preprocess(
+ img: PIL.Image.Image,
+ target_size: int,
+ rotate: typing.Union[bool, int] = True,
+ ) -> typing.Tuple[PIL.Image.Image, typing.Callable[[typing.Tuple[float, float]], typing.Tuple[float, float]]]:
+ """Preprocess an image. Return the image and a coordinate back-transformation function.
+ 1. Scale larger side to *target_size*
+ 2. Rotate by angle *rotate*, or auto-rotate if *rotate=None* (the default).
+ """
+ # FIXME: re-using reader.Image would cover more file formats!
+
+ # >>> from PIL import ExifTags
+ # >>> exif_ori = [k for k, tag in ExifTags.TAGS.items() if tag == 'Orientation']
+ # >>> exif_ori = exif_ori[0]
+ exif_ori = 274
+
+ # scale image
+ orig_size = img.size
+ if img.size[0] > img.size[1]: # landscape
+ img = img.resize((target_size, int(img.height / img.width * target_size)), reducing_gap=3)
+ elif img.size[0] < img.size[1]: # portrait
+ img = img.resize((int(img.width / img.height * target_size), target_size), reducing_gap=3)
+ else: # square
+ img = img.resize((
+ int(img.width / img.height * target_size),
+ int(img.width / img.height * target_size),
+ ), reducing_gap=3)
+
+ # get scale factors
+ sX = orig_size[0] / img.width
+ sY = orig_size[1] / img.height
+
+ # rotate image (if need be)
+ denorm = lambda xy: (sX*xy[0], sY*xy[1])
+ if rotate is not None:
+ # auto-rotate according to EXIF information
+ img_ori = img.getexif().get(exif_ori, None)
+ if img_ori == 3 or rotate == 180:
+ img = img.rotate(180, expand=True)
+ denorm = lambda xy: (orig_size[0] - sX*xy[0], orig_size[1] - sY*xy[1])
+ elif img_ori == 6 or rotate == 270:
+ img = img.rotate(270, expand=True)
+ denorm = lambda xy: (orig_size[0] - sX*xy[1], sY*xy[0])
+ elif img_ori == 8 or rotate == 90:
+ img = img.rotate(90, expand=True)
+ denorm = lambda xy: (sX*xy[1], orig_size[1] - sY*xy[0])
+
+ # return image and denormalization function
+ return img, denorm
+
+ def __call__(self, path: str) -> typing.Sequence[dict]:
+ try:
+ # open the image
+ img = PIL.Image.open(path)
+ # rotate and scale the image
+ img, denorm = self.preprocess(img, self._target_size)
+
+ # detect faces
+ boxes, probs = self._detector.detect(img)
+ if boxes is None: # no faces detected
+ return []
+ # ignore boxes with probability below threshold
+ boxes = [box for box, p in zip(boxes, probs) if p >= self._min_face_prob]
+ if len(boxes) == 0: # no faces detected
+ return []
+ # compute face embeddings
+ faces_img = self._detector.extract(img, boxes, None).to(self._device)
+ embeds = self._embedder(faces_img)
+
+ faces = []
+ for bbox, face, emb in zip(boxes, faces_img, embeds):
+ # face hash
+ ucid = bsfs.uuid.UCID.from_bytes(bytes(face.detach().cpu().numpy()))
+ # position / size
+ x0, y0 = denorm(bbox[:2])
+ x1, y1 = denorm(bbox[2:])
+ x, y = min(x0, x1), min(y0, y1)
+ width, height = max(x0, x1) - x, max(y0, y1) - y
+ # assembled
+ faces.append(dict(
+ ucid=ucid, # str
+ x=x, # float
+ y=y, # float
+ width=width, # float
+ height=height, # float
+ embedding=emb, # np.array
+ ))
+
+ return faces
+
+ except PIL.UnidentifiedImageError as err: # format not supported by PIL
+ raise errors.UnsupportedFileFormatError(path) from err
+ except IOError as err: # file not found and file open errors
+ raise errors.ReaderError(path) from err
+ except RuntimeError as err: # pytorch errors
+ raise errors.ReaderError(path) from err
+ except ValueError as err: # negative seek value
+ raise errors.ReaderError(path) from err
+
+## EOF ##