aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2023-06-17 22:33:38 +0200
committerMatthias Baumgartner <dev@igsor.net>2023-06-17 22:33:38 +0200
commit6a51098412b220e3be90cc7fdd7dba6fb4a2f025 (patch)
tree43ca0c26e36768a6b6358ffa63fb49fae6704caf
parentf44ba0b30f924df54a80aaa7bafdf817e5ab1881 (diff)
downloadbsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.tar.gz
bsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.tar.bz2
bsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.zip
face reader, face detection and identification extractors
-rw-r--r--.gitignore5
-rw-r--r--bsie/extractor/image/face/__init__.py8
-rw-r--r--bsie/extractor/image/face/detect.py93
-rw-r--r--bsie/extractor/image/face/identify.py176
-rw-r--r--bsie/lib/naming_policy.py11
-rw-r--r--bsie/reader/face.py179
-rw-r--r--setup.py4
-rw-r--r--test/extractor/image/face/__init__.py0
-rw-r--r--test/extractor/image/face/test_detect.py62
-rw-r--r--test/extractor/image/face/test_identify.py148
-rw-r--r--test/lib/test_naming_policy.py16
-rw-r--r--test/reader/test_face.py220
12 files changed, 922 insertions, 0 deletions
diff --git a/.gitignore b/.gitignore
index d2785ad..1cdad33 100644
--- a/.gitignore
+++ b/.gitignore
@@ -24,6 +24,11 @@ doc/build/
doc/source/api
# testing data
+test/extractor/image/face/*.csv
+test/extractor/image/face/*.jpg
+test/extractor/image/face/*.npy
+test/reader/faces-ivan.jpg
+test/reader/faces-noface.jpg
test/reader/image/testimage.nef*
test/reader/preview/testimage.nef*
diff --git a/bsie/extractor/image/face/__init__.py b/bsie/extractor/image/face/__init__.py
new file mode 100644
index 0000000..f82424a
--- /dev/null
+++ b/bsie/extractor/image/face/__init__.py
@@ -0,0 +1,8 @@
+
+# standard imports
+import typing
+
+# exports
+__all__: typing.Sequence[str] = []
+
+## EOF ##
diff --git a/bsie/extractor/image/face/detect.py b/bsie/extractor/image/face/detect.py
new file mode 100644
index 0000000..94e3a61
--- /dev/null
+++ b/bsie/extractor/image/face/detect.py
@@ -0,0 +1,93 @@
+
+# standard imports
+import typing
+
+# external imports
+import torch
+from facenet_pytorch import MTCNN, InceptionResnetV1
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from ... import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceDetect',
+ )
+
+
+## code ##
+
+bsf = ns.bsn.Face()
+
+class FaceDetect(base.Extractor):
+
+ CONTENT_READER = 'bsie.reader.face.FaceExtract'
+
+ def __init__(self):
+ # initialize parent with the schema
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f'''
+ prefix bsf: <https://schema.bsfs.io/ie/Node/Face#>
+
+ bsn:Face rdfs:subClassOf bsfs:Node .
+
+ <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512>
+ rdfs:subClassOf bsa:Feature ;
+ bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ;
+ bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>;
+ bsfs:dimension "512"^^xsd:integer .
+
+ bse:face rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range bsn:Face .
+
+ bsf:x rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:y rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:width rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:height rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range xsd:float ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ bsf:embedding rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ;
+ bsfs:unique "true"^^xsd:boolean .
+
+ '''))
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: dict,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ # check principals
+ if self.schema.predicate(ns.bse.face) not in principals:
+ # nothing to do; abort
+ return
+
+ for face in content:
+ fnode = node.Node(ns.bsn.Face, ucid=face['ucid'])
+ yield subject, ns.bse.face, fnode
+ yield fnode, bsf.x, face['x']
+ yield fnode, bsf.y, face['y']
+ yield fnode, bsf.width, face['width']
+ yield fnode, bsf.height, face['height']
+ yield fnode, bsf.embedding, face['embedding'].detach().cpu().numpy()
+
+## EOF ##
diff --git a/bsie/extractor/image/face/identify.py b/bsie/extractor/image/face/identify.py
new file mode 100644
index 0000000..152f113
--- /dev/null
+++ b/bsie/extractor/image/face/identify.py
@@ -0,0 +1,176 @@
+
+# standard imports
+import csv
+import typing
+
+# external imports
+from facenet_pytorch import MTCNN, InceptionResnetV1
+import numpy as np
+import torch
+
+# bsie imports
+from bsie.utils import bsfs, node, ns
+
+# inner-module imports
+from ... import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceIdentify',
+ )
+
+
+## code ##
+
+bsf = ns.bsn.Face()
+
+class FaceIdentify(base.Extractor):
+
+ CONTENT_READER = 'bsie.reader.face.FaceExtract'
+
+ _restklasse: bsfs.URI
+ _thres: float
+ _device: torch.device
+ _restidx: int
+ _id2name: typing.Dict[int, str]
+ _embeds: torch.Tensor
+ _targets: torch.Tensor
+
+
+
+ # FIXME: This could be a bsfs maintenance function instead of a bsie function
+
+ def __init__(
+ self,
+ # FIXME: Initialize from bsfs storage instead of files
+ ref_embeds: str,
+ ref_mapping: str,
+ thres: float = 0.9,
+ cuda_device: str = 'cuda:0',
+ restklasse: str = 'https://example.com/user/anon',
+ ):
+ # initialize parent with the schema
+ super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f'''
+ bsn:Face rdfs:subClassOf bsfs:Node .
+ bsn:Person rdfs:subClassOf bsfs:Node .
+ <https://schema.bsfs.io/ie/Node/Face#depicts> rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Face ;
+ rdfs:range bsn:Person .
+ # FIXME: Entity -> Face?
+ bse:face rdfs:subClassOf bsfs:Predicate ;
+ rdfs:domain bsn:Entity ;
+ rdfs:range bsn:Face .
+ # FIXME: Face -> Embedding?
+ #<https://schema.bsfs.io/ie/Node/Face#embedding>
+ # rdfs:subClassOf bsfs:Predicate ;
+ # rdfs:domain bsn:Face ;
+ # rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ;
+ # bsfs:unique "true"^^xsd:boolean .
+ #<https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512>
+ # rdfs:subClassOf bsa:Feature ;
+ # bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ;
+ # bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>;
+ # bsfs:dimension "512"^^xsd:integer .
+
+ '''))
+ # store extra members
+ self._restklasse = bsfs.URI(restklasse)
+ self._thres = thres
+ # get face instances
+ self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu')
+ with open(ref_embeds, 'rb') as ifile:
+ emb_with_trg = np.load(ifile)
+ targets, embeds = emb_with_trg[:, 0], emb_with_trg[:, 1:]
+ self._targets = torch.tensor(targets, dtype=torch.int32).to(self._device)
+ self._embeds = torch.tensor(embeds).to(self._device)
+ with open(ref_mapping, 'rt') as ifile:
+ mapping = [(int(idx), name) for name, idx in csv.reader(ifile)]
+ # ensure that the mapping is unique
+ ids, names = zip(*mapping)
+ if len(set(names)) != len(names):
+ raise Exception('people identifiers must be unique')
+ if len(set(ids)) != len(ids):
+ raise Exception('people indices must be unique')
+ # ensure that all targets are accounted for
+ if not {int(i) for i in self._targets.tolist()}.issubset(set(ids)):
+ raise Exception('all targets must be labelled')
+ # ensure and fetch the index of the restklasse
+ if self._restklasse not in names:
+ mapping.append((max(ids) + 1, self._restklasse))
+ # store mapping
+ self._restidx = [idx for idx, name in mapping if name == self._restklasse][0]
+ self._id2name = dict(mapping)
+ # discard the restklasse from the reference points
+ self._embeds = self._embeds[self._targets != self._restidx]
+ self._targets = self._targets[self._targets != self._restidx]
+
+ @property
+ def principals(self) -> typing.Iterator[bsfs.schema.Predicate]:
+ """Return the principal predicates, i.e., relations from/to the extraction subject."""
+ yield from super().principals
+ yield self.schema.predicate(bsf.depicts)
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}(N={len(self._embeds)}, restklasse={self._restklasse})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self._thres == other._thres \
+ and self._id2name == other._id2name \
+ and torch.equal(self._embeds, other._embeds) \
+ and torch.equal(self._targets, other._targets) \
+ and self._restklasse == other._restklasse \
+ and self._restidx == other._restidx
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(),
+ tuple(sorted(self._id2name.items())),
+ self._thres,
+ tuple(self._embeds.detach().cpu().numpy().reshape(-1).tolist()),
+ tuple(self._targets.detach().cpu().numpy().reshape(-1).tolist()),
+ self._restklasse,
+ self._restidx,
+ ))
+
+ def _classify(self, emb: torch.Tensor) -> torch.Tensor: # [Nx512] -> [N]
+ # nearest neighbour approach
+ dist = torch.cdist(emb, self._embeds) # pairwise distances
+ best = dist.argmin(dim=1) # idx of lowest distance, per row
+ labels = self._targets[best] # label (int) of nearest neighbour
+ acc = dist[range(len(best)), best] < self._thres # check if distance is below threshold
+ return [lbl.item() if cnd == True else self._restidx for cnd, lbl in zip(acc, labels)]
+
+ def extract(
+ self,
+ subject: node.Node,
+ content: typing.Any,
+ principals: typing.Iterable[bsfs.schema.Predicate],
+ ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]:
+ # check principals
+ #if self.schema.predicate(bsf.depicts) not in principals:
+ if self.schema.predicate(ns.bse.face) not in principals:
+ # nothing to do; abort
+ return
+ # check content
+ if len(content) == 0:
+ return
+
+ # collect embeddings
+ emb = torch.vstack([face['embedding'] for face in content]).to(self._device)
+ # apply classifier
+ labels = self._classify(emb)
+ # walk through faces
+ for face, idx in zip(content, labels):
+ lbl = bsfs.URI(self._id2name[idx]) # label (uri) of nearest neighbour
+ if lbl == self._restklasse: # suppress
+ continue
+ pnode = node.Node(ns.bsn.Person, uri=lbl)
+ fnode = node.Node(ns.bsn.Face, ucid=face['ucid'])
+ # emit triple
+ yield fnode, self.schema.predicate(bsf.depicts), pnode
+ # FIXME: emit subject -> face -> fnode?
+ yield subject, self.schema.predicate(ns.bse.face), fnode
+ # FIXME: emit embedding?
+ #yield fnode, bsf.embedding, face['embedding']
+
+## EOF ##
diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py
index ffef7d9..fbdbeb0 100644
--- a/bsie/lib/naming_policy.py
+++ b/bsie/lib/naming_policy.py
@@ -89,6 +89,8 @@ class DefaultNamingPolicy(NamingPolicy):
return self.name_preview(node)
if node.node_type == ns.bsn.Tag:
return self.name_tag(node)
+ if node.node_type == ns.bsn.Face:
+ return self.name_face(node)
raise errors.ProgrammingError(f'no naming policy available for {node.node_type}')
def name_entity(self, node: Node) -> Node:
@@ -127,4 +129,13 @@ class DefaultNamingPolicy(NamingPolicy):
node.uri = getattr(self._prefix.tag(), fragment)
return node
+ def name_face(self, node: Node) -> Node:
+ if 'ucid' in node.hints: # content id
+ fragment = node.hints['ucid']
+ else: # random name
+ fragment = self._uuid()
+ node.uri = getattr(self._prefix.face(), fragment)
+ return node
+
+
## EOF ##
diff --git a/bsie/reader/face.py b/bsie/reader/face.py
new file mode 100644
index 0000000..c5374e0
--- /dev/null
+++ b/bsie/reader/face.py
@@ -0,0 +1,179 @@
+
+# standard imports
+import operator
+import typing
+
+# external imports
+from facenet_pytorch import MTCNN, InceptionResnetV1
+import PIL.Image
+import torch
+
+# bsie imports
+from bsie.utils import bsfs, errors, node, ns
+
+# inner-module imports
+from . import base
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'FaceExtract',
+ )
+
+
+## code ##
+
+class FaceExtract(base.Reader):
+ """Extract faces and their feature vector from an image file."""
+
+ # Face patch size.
+ _target_size: int
+
+ # Lower bound on the detected face's probability.
+ _min_face_prob: float
+
+ # Face detector network.
+ _detector: MTCNN
+
+ # Face feature extractor network.
+ _embedder: InceptionResnetV1
+
+ def __init__(
+ self,
+ target_size: int = 1000,
+ min_face_size: int = 40,
+ min_face_prob: float = 0.992845,
+ cuda_device: str = 'cuda:0',
+ ext_face_size: int = 160,
+ thresholds: typing.Tuple[float, float, float] = [0.5, 0.6, 0.6],
+ factor: float = 0.709,
+ ):
+ # initialize
+ self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu')
+ # initialize the face detection network
+ self._target_size = target_size
+ self._min_face_prob = min_face_prob
+ self._carghash = hash((min_face_size, ext_face_size, tuple(thresholds), factor))
+ self._detector = MTCNN(
+ min_face_size=min_face_size,
+ image_size=ext_face_size,
+ thresholds=thresholds,
+ factor=factor,
+ device=self._device,
+ keep_all=True,
+ ).to(self._device)
+ # initialize the face embedding netwrok
+ self._embedder = InceptionResnetV1('vggface2').to(self._device).eval()
+
+ def __repr__(self) -> str:
+ return f'{bsfs.typename(self)}({self._min_face_prob})'
+
+ def __eq__(self, other: typing.Any) -> bool:
+ return super().__eq__(other) \
+ and self._target_size == other._target_size \
+ and self._min_face_prob == other._min_face_prob \
+ and self._carghash == other._carghash
+
+ def __hash__(self) -> int:
+ return hash((super().__hash__(), self._target_size, self._min_face_prob, self._carghash))
+
+ @staticmethod
+ def preprocess(
+ img: PIL.Image.Image,
+ target_size: int,
+ rotate: typing.Union[bool, int] = True,
+ ) -> typing.Tuple[PIL.Image.Image, typing.Callable[[typing.Tuple[float, float]], typing.Tuple[float, float]]]:
+ """Preprocess an image. Return the image and a coordinate back-transformation function.
+ 1. Scale larger side to *target_size*
+ 2. Rotate by angle *rotate*, or auto-rotate if *rotate=None* (the default).
+ """
+ # FIXME: re-using reader.Image would cover more file formats!
+
+ # >>> from PIL import ExifTags
+ # >>> exif_ori = [k for k, tag in ExifTags.TAGS.items() if tag == 'Orientation']
+ # >>> exif_ori = exif_ori[0]
+ exif_ori = 274
+
+ # scale image
+ orig_size = img.size
+ if img.size[0] > img.size[1]: # landscape
+ img = img.resize((target_size, int(img.height / img.width * target_size)), reducing_gap=3)
+ elif img.size[0] < img.size[1]: # portrait
+ img = img.resize((int(img.width / img.height * target_size), target_size), reducing_gap=3)
+ else: # square
+ img = img.resize((
+ int(img.width / img.height * target_size),
+ int(img.width / img.height * target_size),
+ ), reducing_gap=3)
+
+ # get scale factors
+ sX = orig_size[0] / img.width
+ sY = orig_size[1] / img.height
+
+ # rotate image (if need be)
+ denorm = lambda xy: (sX*xy[0], sY*xy[1])
+ if rotate is not None:
+ # auto-rotate according to EXIF information
+ img_ori = img.getexif().get(exif_ori, None)
+ if img_ori == 3 or rotate == 180:
+ img = img.rotate(180, expand=True)
+ denorm = lambda xy: (orig_size[0] - sX*xy[0], orig_size[1] - sY*xy[1])
+ elif img_ori == 6 or rotate == 270:
+ img = img.rotate(270, expand=True)
+ denorm = lambda xy: (orig_size[0] - sX*xy[1], sY*xy[0])
+ elif img_ori == 8 or rotate == 90:
+ img = img.rotate(90, expand=True)
+ denorm = lambda xy: (sX*xy[1], orig_size[1] - sY*xy[0])
+
+ # return image and denormalization function
+ return img, denorm
+
+ def __call__(self, path: str) -> typing.Sequence[dict]:
+ try:
+ # open the image
+ img = PIL.Image.open(path)
+ # rotate and scale the image
+ img, denorm = self.preprocess(img, self._target_size)
+
+ # detect faces
+ boxes, probs = self._detector.detect(img)
+ if boxes is None: # no faces detected
+ return []
+ # ignore boxes with probability below threshold
+ boxes = [box for box, p in zip(boxes, probs) if p >= self._min_face_prob]
+ if len(boxes) == 0: # no faces detected
+ return []
+ # compute face embeddings
+ faces_img = self._detector.extract(img, boxes, None).to(self._device)
+ embeds = self._embedder(faces_img)
+
+ faces = []
+ for bbox, face, emb in zip(boxes, faces_img, embeds):
+ # face hash
+ ucid = bsfs.uuid.UCID.from_bytes(bytes(face.detach().cpu().numpy()))
+ # position / size
+ x0, y0 = denorm(bbox[:2])
+ x1, y1 = denorm(bbox[2:])
+ x, y = min(x0, x1), min(y0, y1)
+ width, height = max(x0, x1) - x, max(y0, y1) - y
+ # assembled
+ faces.append(dict(
+ ucid=ucid, # str
+ x=x, # float
+ y=y, # float
+ width=width, # float
+ height=height, # float
+ embedding=emb, # np.array
+ ))
+
+ return faces
+
+ except PIL.UnidentifiedImageError as err: # format not supported by PIL
+ raise errors.UnsupportedFileFormatError(path) from err
+ except IOError as err: # file not found and file open errors
+ raise errors.ReaderError(path) from err
+ except RuntimeError as err: # pytorch errors
+ raise errors.ReaderError(path) from err
+ except ValueError as err: # negative seek value
+ raise errors.ReaderError(path) from err
+
+## EOF ##
diff --git a/setup.py b/setup.py
index b1f5b2c..c556568 100644
--- a/setup.py
+++ b/setup.py
@@ -8,6 +8,10 @@ extras = {
# image feature extractors
'numpy',
],
+ 'face': [
+ 'facenet_pytorch',
+ 'torch',
+ ],
'preview': [
# preview readers
'preview_generator', # also depends on some system packages
diff --git a/test/extractor/image/face/__init__.py b/test/extractor/image/face/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/test/extractor/image/face/__init__.py
diff --git a/test/extractor/image/face/test_detect.py b/test/extractor/image/face/test_detect.py
new file mode 100644
index 0000000..92375a2
--- /dev/null
+++ b/test/extractor/image/face/test_detect.py
@@ -0,0 +1,62 @@
+
+# standard imports
+import contextlib
+import io
+import os
+import requests
+import unittest
+
+# bsie imports
+from bsie.extractor import base
+from bsie.reader.face import FaceExtract
+from bsie.utils import bsfs, node as _node, ns
+
+# objects to test
+from bsie.extractor.image.face.detect import FaceDetect, bsf
+
+
+## code ##
+
+class TestFaceDetect(unittest.TestCase):
+ def setUp(self):
+ # download test image
+ target = os.path.join(os.path.dirname(__file__), 'testface1.jpg')
+ if not os.path.exists(target):
+ with open(target, 'wb') as ofile:
+ ans = requests.get('https://www.bsfs.io/testdata/iepahGee1uch5ahr3ic1.jpg')
+ ofile.write(ans.content)
+
+ def test_extract(self):
+ with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch
+ # setup
+ rdr = FaceExtract()
+ ext = FaceDetect()
+ subject = _node.Node(ns.bsfs.Entity)
+ content = rdr(os.path.join(os.path.dirname(__file__), 'testface1.jpg'))
+ principals = set(ext.principals)
+ face = _node.Node(ns.bsn.Face, ucid='2a7203c1515e0caa66a7461452c0b4552f1433a613cb3033e59ed2361790ad45')
+ triples = list(ext.extract(subject, content, principals))
+ # principals is bse:face
+ self.assertSetEqual(principals, {ext.schema.predicate(ns.bse.face)})
+ # check triples
+ self.assertIn((subject, ns.bse.face, face), triples)
+ self.assertIn((face, bsf.x, 575.4721153898192), triples)
+ self.assertIn((face, bsf.y, 265.3955625), triples)
+ self.assertIn((face, bsf.width, 626.3928904791771), triples)
+ self.assertIn((face, bsf.height,858.6870625), triples)
+ # check embedding
+ emb = [o for s, p, o in triples if s == face and p == bsf.embedding]
+ self.assertEqual(len(emb), 1)
+ self.assertAlmostEqual(emb[0].sum(), -1.9049968)
+ # no triples on principal mismatch
+ self.assertListEqual(list(ext.extract(subject, content, set())), [])
+ # no triples on no content
+ self.assertListEqual(list(ext.extract(subject, [], principals)), [])
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/extractor/image/face/test_identify.py b/test/extractor/image/face/test_identify.py
new file mode 100644
index 0000000..dde41db
--- /dev/null
+++ b/test/extractor/image/face/test_identify.py
@@ -0,0 +1,148 @@
+
+# standard imports
+import contextlib
+import io
+import os
+import unittest
+
+# external imports
+import requests
+
+# bsie imports
+from bsie.extractor import base
+from bsie.reader.face import FaceExtract
+from bsie.utils import bsfs, node as _node, ns
+
+# objects to test
+from bsie.extractor.image.face.identify import FaceIdentify, bsf
+
+
+## code ##
+
+def fetch(source, target):
+ target = os.path.join(os.path.dirname(__file__), target)
+ if not os.path.exists(target):
+ with open(target, 'wb') as ofile:
+ ans = requests.get(source)
+ ofile.write(ans.content)
+
+class TestFaceIdentify(unittest.TestCase):
+ def setUp(self):
+ # download test images
+ fetch('https://www.bsfs.io/testdata/iepahGee1uch5ahr3ic1.jpg', 'testface1.jpg')
+ fetch('https://www.bsfs.io/testdata/Woayiesae8eiL9aivoba.jpg', 'testface2.jpg')
+ fetch('https://www.bsfs.io/testdata/ATiagheiduth4So5ohxi.jpg', 'testface3.jpg')
+ # download reference vectors
+ fetch('https://www.bsfs.io/testdata/aetie3foo0faiDaiBahk.npy', 'ref_embeds.npy')
+ fetch('https://www.bsfs.io/testdata/uopoS8gei8Phiek3shei.npy', 'ref_embeds_alt1.npy')
+ fetch('https://www.bsfs.io/testdata/Otoo7ain6Ied2Iep2ein.npy', 'ref_embeds_alt2.npy')
+ fetch('https://www.bsfs.io/testdata/ie0keriChafahroeRo7i.npy', 'ref_embeds_extra.npy')
+ fetch('https://www.bsfs.io/testdata/phoophui3teeni4hieKu.csv', 'ref_mapping.csv')
+ fetch('https://www.bsfs.io/testdata/Quit4Wum8ael7Zeis4ei.csv', 'ref_mapping_alt.csv')
+ fetch('https://www.bsfs.io/testdata/Angu5cioVei5pohgh0aa.csv', 'ref_mapping_id_reuse.csv')
+ fetch('https://www.bsfs.io/testdata/ooshooK1bai5Queengae.csv', 'ref_mapping_name_reuse.csv')
+ fetch('https://www.bsfs.io/testdata/eixuepah3Ronge7oe4qu.csv', 'ref_mapping_restklasse.csv')
+
+ def test_essentials(self):
+ # setup
+ pth_embeds = os.path.join(os.path.dirname(__file__), 'ref_embeds.npy')
+ pth_embeds_alt1 = os.path.join(os.path.dirname(__file__), 'ref_embeds_alt1.npy')
+ pth_embeds_alt2 = os.path.join(os.path.dirname(__file__), 'ref_embeds_alt2.npy')
+ pth_mapping = os.path.join(os.path.dirname(__file__), 'ref_mapping.csv')
+ pth_mapping_alt = os.path.join(os.path.dirname(__file__), 'ref_mapping_alt.csv')
+ restklasse = 'https://example.com/user/fake_anon'
+ ext = FaceIdentify(pth_embeds, pth_mapping)
+ # string conversion returns class name
+ self.assertEqual(str(ext), 'FaceIdentify')
+ # representation respects number of embeddings
+ self.assertEqual(repr(ext), 'FaceIdentify(N=2, restklasse=https://example.com/user/anon)')
+ # representation respects restklasse
+ self.assertEqual(repr(FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse)),
+ 'FaceIdentify(N=2, restklasse=https://example.com/user/fake_anon)')
+ # identity
+ self.assertEqual(ext, FaceIdentify(pth_embeds, pth_mapping))
+ self.assertEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping))) # FIXME!
+ # comparison respects embeddings
+ self.assertNotEqual(ext, FaceIdentify(pth_embeds_alt1, pth_mapping))
+ self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds_alt1, pth_mapping)))
+ self.assertNotEqual(ext, FaceIdentify(pth_embeds_alt2, pth_mapping))
+ self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds_alt2, pth_mapping)))
+ # comparison respects mappings
+ self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping_alt))
+ self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping_alt)))
+ # comparison respects threshold
+ self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping, thres=0.1))
+ self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping, thres=0.1)))
+ # comparison respects restklasse
+ self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse))
+ self.assertNotEqual(hash(ext),
+ hash(FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse)))
+
+ def test_construct(self):
+ pth_embeds = os.path.join(os.path.dirname(__file__), 'ref_embeds.npy')
+ pth_mapping = os.path.join(os.path.dirname(__file__), 'ref_mapping.csv')
+ # valid construction
+ self.assertIsInstance(FaceIdentify(pth_embeds, pth_mapping), FaceIdentify)
+ # restklasse may be part of the mapping
+ ext = FaceIdentify(pth_embeds, os.path.join(os.path.dirname(__file__), 'ref_mapping_restklasse.csv'))
+ self.assertIsInstance(ext, FaceIdentify)
+ self.assertEqual(ext._restidx, 1)
+ # pass invalid mapping (name re-use)
+ self.assertRaises(Exception, FaceIdentify, pth_embeds,
+ os.path.join(os.path.dirname(__file__), 'ref_mapping_name_reuse.csv'))
+ # pass invalid mapping (id re-use)
+ self.assertRaises(Exception, FaceIdentify, pth_embeds,
+ os.path.join(os.path.dirname(__file__), 'ref_mapping_id_reuse.csv'))
+ # pass invalid embeds (extra embeddings)
+ self.assertRaises(Exception, FaceIdentify,
+ os.path.join(os.path.dirname(__file__), 'ref_embeds_extra.npy'),
+ pth_mapping)
+
+ def test_extract(self):
+ with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch
+ # setup
+ rdr = FaceExtract()
+ ext = FaceIdentify(
+ os.path.join(os.path.dirname(__file__), 'ref_embeds.npy'),
+ os.path.join(os.path.dirname(__file__), 'ref_mapping.csv'),
+ )
+ subject = _node.Node(ns.bsfs.Entity)
+ content = rdr(os.path.join(os.path.dirname(__file__), 'testface1.jpg'))
+ principals = set(ext.principals)
+ face = _node.Node(ns.bsn.Face, ucid='2a7203c1515e0caa66a7461452c0b4552f1433a613cb3033e59ed2361790ad45')
+ person = _node.Node(ns.bsn.Person, uri='https://example.com/user/Angelina_Jolie')
+ triples = list(ext.extract(subject, content, principals))
+ # principls is bse:face, bsf:depicts
+ self.assertSetEqual(set(ext.principals), {
+ ext.schema.predicate(ns.bse.face),
+ ext.schema.predicate(bsf.depicts)
+ })
+ # produces two triples ...
+ self.assertEqual(len(triples), 2)
+ # ... one if at least one person was identified
+ self.assertIn((subject, ext.schema.predicate(ns.bse.face), face), triples)
+ # ... one for each identified person
+ self.assertIn((face, ext.schema.predicate(bsf.depicts), person), triples)
+ # produces no triples if no person was identified
+ content = rdr(os.path.join(os.path.dirname(__file__), 'testface2.jpg'))
+ self.assertListEqual(list(ext.extract(subject, content, principals)), [])
+ # identifies the correct person despite somewhat similar options
+ content = rdr(os.path.join(os.path.dirname(__file__), 'testface3.jpg'))
+ face = _node.Node(ns.bsn.Face, ucid='f61fac01ef686ee05805afef1e7a10ba54c30dc1aa095d9e77d79ccdfeb40dc5')
+ triples = list(ext.extract(subject, content, principals))
+ self.assertEqual(len(triples), 2)
+ person = _node.Node(ns.bsn.Person, uri='https://example.com/user/Paul_Rudd')
+ self.assertIn((subject, ext.schema.predicate(ns.bse.face), face), triples)
+ self.assertIn((face, ext.schema.predicate(bsf.depicts), person), triples)
+ # no triples on principal mismatch
+ self.assertListEqual(list(ext.extract(subject, content, set())), [])
+ # no triples on no content
+ self.assertListEqual(list(ext.extract(subject, [], principals)), [])
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##
diff --git a/test/lib/test_naming_policy.py b/test/lib/test_naming_policy.py
index 09fd6f6..a078fbd 100644
--- a/test/lib/test_naming_policy.py
+++ b/test/lib/test_naming_policy.py
@@ -35,6 +35,10 @@ class TestDefaultNamingPolicy(unittest.TestCase):
self.assertEqual(policy.handle_node(
Node(ns.bsn.Tag, label='hello')).uri,
URI('http://example.com/me/tag#hello'))
+ # processes bsn:Face
+ self.assertEqual(policy.handle_node(
+ Node(ns.bsn.Face, ucid='hello')).uri,
+ URI('http://example.com/me/face#hello'))
# raises an exception on unknown types
self.assertRaises(errors.ProgrammingError, policy.handle_node,
Node(ns.bsn.Invalid, ucid='abc123cba', size=123))
@@ -99,6 +103,18 @@ class TestDefaultNamingPolicy(unittest.TestCase):
self.assertTrue(policy.name_tag(
Node(ns.bsn.Tag,)).uri.startswith('http://example.com/me/tag#'))
+ def test_name_face(self):
+ # setup
+ policy = DefaultNamingPolicy('http://example.com', 'me')
+ # name_face uses ucid
+ self.assertEqual(policy.name_face(
+ Node(ns.bsn.Face, ucid='hello_world')).uri,
+ URI('http://example.com/me/face#hello_world'))
+ # name_face falls back to a random guid
+ self.assertTrue(policy.name_face(
+ Node(ns.bsn.Face)).uri.startswith('http://example.com/me/face#'))
+
+
class TestNamingPolicyIterator(unittest.TestCase):
def test_call(self): # NOTE: We test NamingPolicy.__call__ here
diff --git a/test/reader/test_face.py b/test/reader/test_face.py
new file mode 100644
index 0000000..f462853
--- /dev/null
+++ b/test/reader/test_face.py
@@ -0,0 +1,220 @@
+
+# standard imports
+import contextlib
+import io
+import os
+import unittest
+
+# external imports
+import requests
+import PIL.Image
+
+# bsie imports
+from bsie.utils import errors
+
+# objects to test
+from bsie.reader.face import FaceExtract
+
+
+## code ##
+
+def fetch(source, target):
+ target = os.path.join(os.path.dirname(__file__), target)
+ if not os.path.exists(target):
+ with open(target, 'wb') as ofile:
+ ans = requests.get(source)
+ ofile.write(ans.content)
+
+class TestFaceExtract(unittest.TestCase):
+ def setUp(self):
+ # download test image w/o face
+ fetch('https://www.bsfs.io/testdata/Quiejoore1ahxa9jahma.jpg', 'faces-noface.jpg')
+ # download test image w/ face
+ fetch('https://www.bsfs.io/testdata/ONekai7Ohphooch3aege.jpg', 'faces-ivan.jpg')
+
+ def test_essentials(self):
+ # repr respects min_face_prob
+ self.assertEqual(repr(FaceExtract(min_face_prob=1.0)), 'FaceExtract(1.0)')
+ self.assertEqual(repr(FaceExtract(min_face_prob=0.5)), 'FaceExtract(0.5)')
+ # repr respects type
+ class Foo(FaceExtract): pass
+ self.assertEqual(repr(Foo(min_face_prob=0.5)), 'Foo(0.5)')
+
+ # comparison respects type
+ class Foo(): pass
+ self.assertNotEqual(FaceExtract(), 1234)
+ self.assertNotEqual(hash(FaceExtract()), hash(1234))
+ self.assertNotEqual(FaceExtract(), 'hello')
+ self.assertNotEqual(hash(FaceExtract()), hash('hello'))
+ self.assertNotEqual(FaceExtract(), Foo())
+ self.assertNotEqual(hash(FaceExtract()), hash(Foo()))
+ # comparison respects constructor arguments (except cuda_device)
+ self.assertEqual(FaceExtract(), FaceExtract())
+ self.assertEqual(hash(FaceExtract()), hash(FaceExtract()))
+ self.assertNotEqual(FaceExtract(), FaceExtract(target_size=10))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(target_size=10)))
+ self.assertNotEqual(FaceExtract(), FaceExtract(min_face_size=10))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(min_face_size=10)))
+ self.assertNotEqual(FaceExtract(), FaceExtract(min_face_prob=1.))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(min_face_prob=1.)))
+ self.assertNotEqual(FaceExtract(), FaceExtract(ext_face_size=100))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(ext_face_size=100)))
+ self.assertNotEqual(FaceExtract(), FaceExtract(thresholds=[0.1,0.1,0.1]))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(thresholds=[0.1,0.1,0.1])))
+ self.assertNotEqual(FaceExtract(), FaceExtract(factor=1.))
+ self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(factor=1.)))
+ # comparison ignores cuda_device
+ self.assertEqual(FaceExtract(), FaceExtract(cuda_device='cuda:123'))
+ self.assertEqual(hash(FaceExtract()), hash(FaceExtract(cuda_device='cuda:123')))
+
+ def test_preprocess(self):
+ testpath = os.path.join(os.path.dirname(__file__), 'faces-noface.jpg')
+ with PIL.Image.open(testpath) as img:
+ self.assertEqual(img.size, (199, 148))
+ # landscape, downscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, None)
+ self.assertEqual(img.size, (100, 74))
+ self.assertEqual(denorm((10,10)), (10*1.99, 10*2.0))
+ # landscape, upscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, None)
+ self.assertEqual(img.size, (398, 296))
+ self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5))
+ # landscape, downscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 90)
+ self.assertEqual(img.size, (74, 100))
+ self.assertEqual(denorm((10,10)), (10.0*1.99, 64*2.0))
+ # landscape, upscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 90)
+ self.assertEqual(img.size, (296, 398))
+ self.assertEqual(denorm((10,10)), (10*0.5, 286*0.5))
+ # landscape, downscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 270)
+ self.assertEqual(img.size, (74, 100))
+ self.assertEqual(denorm((10,10)), (90*1.99, 10*2.0))
+ # landscape, upscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 270)
+ self.assertEqual(img.size, (296, 398))
+ self.assertEqual(denorm((10,10)), (388*0.5, 10*0.5))
+ # landscape, downscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 180)
+ self.assertEqual(img.size, (100, 74))
+ self.assertEqual(denorm((10,10)), (90*1.99, 64*2.0))
+ # landscape, upscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 180)
+ self.assertEqual(img.size, (398, 296))
+ self.assertEqual(denorm((10,10)), (388*0.5, 286*0.5))
+ # portrait, downscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, None)
+ self.assertEqual(img.size, (74, 100))
+ self.assertEqual(denorm((10,10)), (10*2.0, 10*1.99))
+ # portrait, upscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, None)
+ self.assertEqual(img.size, (296, 398))
+ self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5))
+ # portrait, downscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 90)
+ self.assertEqual(img.size, (100, 74))
+ self.assertEqual(denorm((10,10)), (10.0*2.0, 90*1.99))
+ # portrait, upscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 90)
+ self.assertEqual(img.size, (398, 296))
+ self.assertEqual(denorm((10,10)), (10*0.5, 388*0.5))
+ # portrait, downscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 270)
+ self.assertEqual(img.size, (100, 74))
+ self.assertEqual(denorm((10,10)), (64*2.0, 10*1.99))
+ # portrait, upscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 270)
+ self.assertEqual(img.size, (398, 296))
+ self.assertEqual(denorm((10,10)), (286*0.5, 10*0.5))
+ # portrait, downscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 180)
+ self.assertEqual(img.size, (74, 100))
+ self.assertEqual(denorm((10,10)), (64*2.0, 90*1.99))
+ # portrait, upscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 180)
+ self.assertEqual(img.size, (296, 398))
+ self.assertEqual(denorm((10,10)), (286*0.5, 388*0.5))
+
+ # square image
+ testpath = os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg')
+ with PIL.Image.open(testpath) as img:
+ self.assertEqual(img.size, (561, 561))
+ # square, downscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, None)
+ self.assertEqual(img.size, (51, 51))
+ self.assertEqual(denorm((10,10)), (10*11, 10*11))
+ # square, upscale, no rotation
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, None)
+ self.assertEqual(img.size, (1122, 1122))
+ self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5))
+ # square, downscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 90)
+ self.assertEqual(img.size, (51, 51))
+ self.assertEqual(denorm((10,10)), (10.0*11, 41*11))
+ # square, upscale, 90cw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 90)
+ self.assertEqual(img.size, (1122, 1122))
+ self.assertEqual(denorm((10,10)), (10*0.5, 1112*0.5))
+ # square, downscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 270)
+ self.assertEqual(img.size, (51, 51))
+ self.assertEqual(denorm((10,10)), (41*11, 10*11))
+ # square, upscale, 90ccw
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 270)
+ self.assertEqual(img.size, (1122, 1122))
+ self.assertEqual(denorm((10,10)), (1112*0.5, 10*0.5))
+ # square, downscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 180)
+ self.assertEqual(img.size, (51, 51))
+ self.assertEqual(denorm((10,10)), (41*11, 41*11))
+ # square, upscale, 180
+ img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 180)
+ self.assertEqual(img.size, (1122, 1122))
+ self.assertEqual(denorm((10,10)), (1112*0.5, 1112*0.5))
+
+ def test_call(self):
+ with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch
+ rdr = FaceExtract()
+ # discards non-image files
+ self.assertRaises(errors.UnsupportedFileFormatError, rdr,
+ __file__)
+ # raises on invalid image
+ self.assertRaises(errors.UnsupportedFileFormatError, rdr,
+ os.path.join(os.path.dirname(__file__), 'testimage_exif_corrupted.jpg'))
+ # raises on missing file
+ self.assertRaises(errors.ReaderError, rdr,
+ os.path.join(os.path.dirname(__file__), 'invalid.jpg'))
+
+ # may return empty list
+ self.assertListEqual(FaceExtract(min_face_prob=1)(
+ os.path.join(os.path.dirname(__file__), 'faces-noface.jpg')), [])
+ self.assertListEqual(FaceExtract(min_face_prob=1)(
+ os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg')), [])
+ # returns faces
+ faces = rdr(os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg'))
+ # check if face was detected
+ self.assertEqual(len(faces), 1)
+ # check ucid
+ self.assertSetEqual({f['ucid'] for f in faces}, {
+ '926dc1684dd453aa2c3c8daf1c82ecf918514ef0de416b6b842235c23bec32ee',
+ })
+ # check embedding
+ for face in faces:
+ self.assertEqual(face['embedding'].shape, (512, ))
+ # check bbox
+ self.assertAlmostEqual(faces[0]['x'], 275.8, 2)
+ self.assertAlmostEqual(faces[0]['y'], 91.67, 2)
+ self.assertAlmostEqual(faces[0]['width'], 50.5, 2)
+ self.assertAlmostEqual(faces[0]['height'], 65.42, 2)
+
+ # FIXME: RuntimeError
+ # FIXME: ValueError
+
+
+## main ##
+
+if __name__ == '__main__':
+ unittest.main()
+
+## EOF ##