diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-06-17 22:33:38 +0200 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-06-17 22:33:38 +0200 |
commit | 6a51098412b220e3be90cc7fdd7dba6fb4a2f025 (patch) | |
tree | 43ca0c26e36768a6b6358ffa63fb49fae6704caf | |
parent | f44ba0b30f924df54a80aaa7bafdf817e5ab1881 (diff) | |
download | bsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.tar.gz bsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.tar.bz2 bsie-6a51098412b220e3be90cc7fdd7dba6fb4a2f025.zip |
face reader, face detection and identification extractors
-rw-r--r-- | .gitignore | 5 | ||||
-rw-r--r-- | bsie/extractor/image/face/__init__.py | 8 | ||||
-rw-r--r-- | bsie/extractor/image/face/detect.py | 93 | ||||
-rw-r--r-- | bsie/extractor/image/face/identify.py | 176 | ||||
-rw-r--r-- | bsie/lib/naming_policy.py | 11 | ||||
-rw-r--r-- | bsie/reader/face.py | 179 | ||||
-rw-r--r-- | setup.py | 4 | ||||
-rw-r--r-- | test/extractor/image/face/__init__.py | 0 | ||||
-rw-r--r-- | test/extractor/image/face/test_detect.py | 62 | ||||
-rw-r--r-- | test/extractor/image/face/test_identify.py | 148 | ||||
-rw-r--r-- | test/lib/test_naming_policy.py | 16 | ||||
-rw-r--r-- | test/reader/test_face.py | 220 |
12 files changed, 922 insertions, 0 deletions
@@ -24,6 +24,11 @@ doc/build/ doc/source/api # testing data +test/extractor/image/face/*.csv +test/extractor/image/face/*.jpg +test/extractor/image/face/*.npy +test/reader/faces-ivan.jpg +test/reader/faces-noface.jpg test/reader/image/testimage.nef* test/reader/preview/testimage.nef* diff --git a/bsie/extractor/image/face/__init__.py b/bsie/extractor/image/face/__init__.py new file mode 100644 index 0000000..f82424a --- /dev/null +++ b/bsie/extractor/image/face/__init__.py @@ -0,0 +1,8 @@ + +# standard imports +import typing + +# exports +__all__: typing.Sequence[str] = [] + +## EOF ## diff --git a/bsie/extractor/image/face/detect.py b/bsie/extractor/image/face/detect.py new file mode 100644 index 0000000..94e3a61 --- /dev/null +++ b/bsie/extractor/image/face/detect.py @@ -0,0 +1,93 @@ + +# standard imports +import typing + +# external imports +import torch +from facenet_pytorch import MTCNN, InceptionResnetV1 + +# bsie imports +from bsie.utils import bsfs, node, ns + +# inner-module imports +from ... import base + +# exports +__all__: typing.Sequence[str] = ( + 'FaceDetect', + ) + + +## code ## + +bsf = ns.bsn.Face() + +class FaceDetect(base.Extractor): + + CONTENT_READER = 'bsie.reader.face.FaceExtract' + + def __init__(self): + # initialize parent with the schema + super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f''' + prefix bsf: <https://schema.bsfs.io/ie/Node/Face#> + + bsn:Face rdfs:subClassOf bsfs:Node . + + <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> + rdfs:subClassOf bsa:Feature ; + bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ; + bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>; + bsfs:dimension "512"^^xsd:integer . + + bse:face rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Entity ; + rdfs:range bsn:Face . + + bsf:x rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range xsd:float ; + bsfs:unique "true"^^xsd:boolean . + + bsf:y rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range xsd:float ; + bsfs:unique "true"^^xsd:boolean . + + bsf:width rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range xsd:float ; + bsfs:unique "true"^^xsd:boolean . + + bsf:height rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range xsd:float ; + bsfs:unique "true"^^xsd:boolean . + + bsf:embedding rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ; + bsfs:unique "true"^^xsd:boolean . + + ''')) + + def extract( + self, + subject: node.Node, + content: dict, + principals: typing.Iterable[bsfs.schema.Predicate], + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + # check principals + if self.schema.predicate(ns.bse.face) not in principals: + # nothing to do; abort + return + + for face in content: + fnode = node.Node(ns.bsn.Face, ucid=face['ucid']) + yield subject, ns.bse.face, fnode + yield fnode, bsf.x, face['x'] + yield fnode, bsf.y, face['y'] + yield fnode, bsf.width, face['width'] + yield fnode, bsf.height, face['height'] + yield fnode, bsf.embedding, face['embedding'].detach().cpu().numpy() + +## EOF ## diff --git a/bsie/extractor/image/face/identify.py b/bsie/extractor/image/face/identify.py new file mode 100644 index 0000000..152f113 --- /dev/null +++ b/bsie/extractor/image/face/identify.py @@ -0,0 +1,176 @@ + +# standard imports +import csv +import typing + +# external imports +from facenet_pytorch import MTCNN, InceptionResnetV1 +import numpy as np +import torch + +# bsie imports +from bsie.utils import bsfs, node, ns + +# inner-module imports +from ... import base + +# exports +__all__: typing.Sequence[str] = ( + 'FaceIdentify', + ) + + +## code ## + +bsf = ns.bsn.Face() + +class FaceIdentify(base.Extractor): + + CONTENT_READER = 'bsie.reader.face.FaceExtract' + + _restklasse: bsfs.URI + _thres: float + _device: torch.device + _restidx: int + _id2name: typing.Dict[int, str] + _embeds: torch.Tensor + _targets: torch.Tensor + + + + # FIXME: This could be a bsfs maintenance function instead of a bsie function + + def __init__( + self, + # FIXME: Initialize from bsfs storage instead of files + ref_embeds: str, + ref_mapping: str, + thres: float = 0.9, + cuda_device: str = 'cuda:0', + restklasse: str = 'https://example.com/user/anon', + ): + # initialize parent with the schema + super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + f''' + bsn:Face rdfs:subClassOf bsfs:Node . + bsn:Person rdfs:subClassOf bsfs:Node . + <https://schema.bsfs.io/ie/Node/Face#depicts> rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Face ; + rdfs:range bsn:Person . + # FIXME: Entity -> Face? + bse:face rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Entity ; + rdfs:range bsn:Face . + # FIXME: Face -> Embedding? + #<https://schema.bsfs.io/ie/Node/Face#embedding> + # rdfs:subClassOf bsfs:Predicate ; + # rdfs:domain bsn:Face ; + # rdfs:range <https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> ; + # bsfs:unique "true"^^xsd:boolean . + #<https://schema.bsfs.io/ie/Literal/Array/Feature/Face#resnet512> + # rdfs:subClassOf bsa:Feature ; + # bsfs:distance <https://schema.bsfs.io/core/distance#euclidean> ; + # bsfs:dtype <https://schema.bsfs.io/core/dtype#f32>; + # bsfs:dimension "512"^^xsd:integer . + + ''')) + # store extra members + self._restklasse = bsfs.URI(restklasse) + self._thres = thres + # get face instances + self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu') + with open(ref_embeds, 'rb') as ifile: + emb_with_trg = np.load(ifile) + targets, embeds = emb_with_trg[:, 0], emb_with_trg[:, 1:] + self._targets = torch.tensor(targets, dtype=torch.int32).to(self._device) + self._embeds = torch.tensor(embeds).to(self._device) + with open(ref_mapping, 'rt') as ifile: + mapping = [(int(idx), name) for name, idx in csv.reader(ifile)] + # ensure that the mapping is unique + ids, names = zip(*mapping) + if len(set(names)) != len(names): + raise Exception('people identifiers must be unique') + if len(set(ids)) != len(ids): + raise Exception('people indices must be unique') + # ensure that all targets are accounted for + if not {int(i) for i in self._targets.tolist()}.issubset(set(ids)): + raise Exception('all targets must be labelled') + # ensure and fetch the index of the restklasse + if self._restklasse not in names: + mapping.append((max(ids) + 1, self._restklasse)) + # store mapping + self._restidx = [idx for idx, name in mapping if name == self._restklasse][0] + self._id2name = dict(mapping) + # discard the restklasse from the reference points + self._embeds = self._embeds[self._targets != self._restidx] + self._targets = self._targets[self._targets != self._restidx] + + @property + def principals(self) -> typing.Iterator[bsfs.schema.Predicate]: + """Return the principal predicates, i.e., relations from/to the extraction subject.""" + yield from super().principals + yield self.schema.predicate(bsf.depicts) + + def __repr__(self) -> str: + return f'{bsfs.typename(self)}(N={len(self._embeds)}, restklasse={self._restklasse})' + + def __eq__(self, other: typing.Any) -> bool: + return super().__eq__(other) \ + and self._thres == other._thres \ + and self._id2name == other._id2name \ + and torch.equal(self._embeds, other._embeds) \ + and torch.equal(self._targets, other._targets) \ + and self._restklasse == other._restklasse \ + and self._restidx == other._restidx + + def __hash__(self) -> int: + return hash((super().__hash__(), + tuple(sorted(self._id2name.items())), + self._thres, + tuple(self._embeds.detach().cpu().numpy().reshape(-1).tolist()), + tuple(self._targets.detach().cpu().numpy().reshape(-1).tolist()), + self._restklasse, + self._restidx, + )) + + def _classify(self, emb: torch.Tensor) -> torch.Tensor: # [Nx512] -> [N] + # nearest neighbour approach + dist = torch.cdist(emb, self._embeds) # pairwise distances + best = dist.argmin(dim=1) # idx of lowest distance, per row + labels = self._targets[best] # label (int) of nearest neighbour + acc = dist[range(len(best)), best] < self._thres # check if distance is below threshold + return [lbl.item() if cnd == True else self._restidx for cnd, lbl in zip(acc, labels)] + + def extract( + self, + subject: node.Node, + content: typing.Any, + principals: typing.Iterable[bsfs.schema.Predicate], + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + # check principals + #if self.schema.predicate(bsf.depicts) not in principals: + if self.schema.predicate(ns.bse.face) not in principals: + # nothing to do; abort + return + # check content + if len(content) == 0: + return + + # collect embeddings + emb = torch.vstack([face['embedding'] for face in content]).to(self._device) + # apply classifier + labels = self._classify(emb) + # walk through faces + for face, idx in zip(content, labels): + lbl = bsfs.URI(self._id2name[idx]) # label (uri) of nearest neighbour + if lbl == self._restklasse: # suppress + continue + pnode = node.Node(ns.bsn.Person, uri=lbl) + fnode = node.Node(ns.bsn.Face, ucid=face['ucid']) + # emit triple + yield fnode, self.schema.predicate(bsf.depicts), pnode + # FIXME: emit subject -> face -> fnode? + yield subject, self.schema.predicate(ns.bse.face), fnode + # FIXME: emit embedding? + #yield fnode, bsf.embedding, face['embedding'] + +## EOF ## diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py index ffef7d9..fbdbeb0 100644 --- a/bsie/lib/naming_policy.py +++ b/bsie/lib/naming_policy.py @@ -89,6 +89,8 @@ class DefaultNamingPolicy(NamingPolicy): return self.name_preview(node) if node.node_type == ns.bsn.Tag: return self.name_tag(node) + if node.node_type == ns.bsn.Face: + return self.name_face(node) raise errors.ProgrammingError(f'no naming policy available for {node.node_type}') def name_entity(self, node: Node) -> Node: @@ -127,4 +129,13 @@ class DefaultNamingPolicy(NamingPolicy): node.uri = getattr(self._prefix.tag(), fragment) return node + def name_face(self, node: Node) -> Node: + if 'ucid' in node.hints: # content id + fragment = node.hints['ucid'] + else: # random name + fragment = self._uuid() + node.uri = getattr(self._prefix.face(), fragment) + return node + + ## EOF ## diff --git a/bsie/reader/face.py b/bsie/reader/face.py new file mode 100644 index 0000000..c5374e0 --- /dev/null +++ b/bsie/reader/face.py @@ -0,0 +1,179 @@ + +# standard imports +import operator +import typing + +# external imports +from facenet_pytorch import MTCNN, InceptionResnetV1 +import PIL.Image +import torch + +# bsie imports +from bsie.utils import bsfs, errors, node, ns + +# inner-module imports +from . import base + +# exports +__all__: typing.Sequence[str] = ( + 'FaceExtract', + ) + + +## code ## + +class FaceExtract(base.Reader): + """Extract faces and their feature vector from an image file.""" + + # Face patch size. + _target_size: int + + # Lower bound on the detected face's probability. + _min_face_prob: float + + # Face detector network. + _detector: MTCNN + + # Face feature extractor network. + _embedder: InceptionResnetV1 + + def __init__( + self, + target_size: int = 1000, + min_face_size: int = 40, + min_face_prob: float = 0.992845, + cuda_device: str = 'cuda:0', + ext_face_size: int = 160, + thresholds: typing.Tuple[float, float, float] = [0.5, 0.6, 0.6], + factor: float = 0.709, + ): + # initialize + self._device = torch.device(cuda_device if torch.cuda.is_available() else 'cpu') + # initialize the face detection network + self._target_size = target_size + self._min_face_prob = min_face_prob + self._carghash = hash((min_face_size, ext_face_size, tuple(thresholds), factor)) + self._detector = MTCNN( + min_face_size=min_face_size, + image_size=ext_face_size, + thresholds=thresholds, + factor=factor, + device=self._device, + keep_all=True, + ).to(self._device) + # initialize the face embedding netwrok + self._embedder = InceptionResnetV1('vggface2').to(self._device).eval() + + def __repr__(self) -> str: + return f'{bsfs.typename(self)}({self._min_face_prob})' + + def __eq__(self, other: typing.Any) -> bool: + return super().__eq__(other) \ + and self._target_size == other._target_size \ + and self._min_face_prob == other._min_face_prob \ + and self._carghash == other._carghash + + def __hash__(self) -> int: + return hash((super().__hash__(), self._target_size, self._min_face_prob, self._carghash)) + + @staticmethod + def preprocess( + img: PIL.Image.Image, + target_size: int, + rotate: typing.Union[bool, int] = True, + ) -> typing.Tuple[PIL.Image.Image, typing.Callable[[typing.Tuple[float, float]], typing.Tuple[float, float]]]: + """Preprocess an image. Return the image and a coordinate back-transformation function. + 1. Scale larger side to *target_size* + 2. Rotate by angle *rotate*, or auto-rotate if *rotate=None* (the default). + """ + # FIXME: re-using reader.Image would cover more file formats! + + # >>> from PIL import ExifTags + # >>> exif_ori = [k for k, tag in ExifTags.TAGS.items() if tag == 'Orientation'] + # >>> exif_ori = exif_ori[0] + exif_ori = 274 + + # scale image + orig_size = img.size + if img.size[0] > img.size[1]: # landscape + img = img.resize((target_size, int(img.height / img.width * target_size)), reducing_gap=3) + elif img.size[0] < img.size[1]: # portrait + img = img.resize((int(img.width / img.height * target_size), target_size), reducing_gap=3) + else: # square + img = img.resize(( + int(img.width / img.height * target_size), + int(img.width / img.height * target_size), + ), reducing_gap=3) + + # get scale factors + sX = orig_size[0] / img.width + sY = orig_size[1] / img.height + + # rotate image (if need be) + denorm = lambda xy: (sX*xy[0], sY*xy[1]) + if rotate is not None: + # auto-rotate according to EXIF information + img_ori = img.getexif().get(exif_ori, None) + if img_ori == 3 or rotate == 180: + img = img.rotate(180, expand=True) + denorm = lambda xy: (orig_size[0] - sX*xy[0], orig_size[1] - sY*xy[1]) + elif img_ori == 6 or rotate == 270: + img = img.rotate(270, expand=True) + denorm = lambda xy: (orig_size[0] - sX*xy[1], sY*xy[0]) + elif img_ori == 8 or rotate == 90: + img = img.rotate(90, expand=True) + denorm = lambda xy: (sX*xy[1], orig_size[1] - sY*xy[0]) + + # return image and denormalization function + return img, denorm + + def __call__(self, path: str) -> typing.Sequence[dict]: + try: + # open the image + img = PIL.Image.open(path) + # rotate and scale the image + img, denorm = self.preprocess(img, self._target_size) + + # detect faces + boxes, probs = self._detector.detect(img) + if boxes is None: # no faces detected + return [] + # ignore boxes with probability below threshold + boxes = [box for box, p in zip(boxes, probs) if p >= self._min_face_prob] + if len(boxes) == 0: # no faces detected + return [] + # compute face embeddings + faces_img = self._detector.extract(img, boxes, None).to(self._device) + embeds = self._embedder(faces_img) + + faces = [] + for bbox, face, emb in zip(boxes, faces_img, embeds): + # face hash + ucid = bsfs.uuid.UCID.from_bytes(bytes(face.detach().cpu().numpy())) + # position / size + x0, y0 = denorm(bbox[:2]) + x1, y1 = denorm(bbox[2:]) + x, y = min(x0, x1), min(y0, y1) + width, height = max(x0, x1) - x, max(y0, y1) - y + # assembled + faces.append(dict( + ucid=ucid, # str + x=x, # float + y=y, # float + width=width, # float + height=height, # float + embedding=emb, # np.array + )) + + return faces + + except PIL.UnidentifiedImageError as err: # format not supported by PIL + raise errors.UnsupportedFileFormatError(path) from err + except IOError as err: # file not found and file open errors + raise errors.ReaderError(path) from err + except RuntimeError as err: # pytorch errors + raise errors.ReaderError(path) from err + except ValueError as err: # negative seek value + raise errors.ReaderError(path) from err + +## EOF ## @@ -8,6 +8,10 @@ extras = { # image feature extractors 'numpy', ], + 'face': [ + 'facenet_pytorch', + 'torch', + ], 'preview': [ # preview readers 'preview_generator', # also depends on some system packages diff --git a/test/extractor/image/face/__init__.py b/test/extractor/image/face/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/extractor/image/face/__init__.py diff --git a/test/extractor/image/face/test_detect.py b/test/extractor/image/face/test_detect.py new file mode 100644 index 0000000..92375a2 --- /dev/null +++ b/test/extractor/image/face/test_detect.py @@ -0,0 +1,62 @@ + +# standard imports +import contextlib +import io +import os +import requests +import unittest + +# bsie imports +from bsie.extractor import base +from bsie.reader.face import FaceExtract +from bsie.utils import bsfs, node as _node, ns + +# objects to test +from bsie.extractor.image.face.detect import FaceDetect, bsf + + +## code ## + +class TestFaceDetect(unittest.TestCase): + def setUp(self): + # download test image + target = os.path.join(os.path.dirname(__file__), 'testface1.jpg') + if not os.path.exists(target): + with open(target, 'wb') as ofile: + ans = requests.get('https://www.bsfs.io/testdata/iepahGee1uch5ahr3ic1.jpg') + ofile.write(ans.content) + + def test_extract(self): + with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch + # setup + rdr = FaceExtract() + ext = FaceDetect() + subject = _node.Node(ns.bsfs.Entity) + content = rdr(os.path.join(os.path.dirname(__file__), 'testface1.jpg')) + principals = set(ext.principals) + face = _node.Node(ns.bsn.Face, ucid='2a7203c1515e0caa66a7461452c0b4552f1433a613cb3033e59ed2361790ad45') + triples = list(ext.extract(subject, content, principals)) + # principals is bse:face + self.assertSetEqual(principals, {ext.schema.predicate(ns.bse.face)}) + # check triples + self.assertIn((subject, ns.bse.face, face), triples) + self.assertIn((face, bsf.x, 575.4721153898192), triples) + self.assertIn((face, bsf.y, 265.3955625), triples) + self.assertIn((face, bsf.width, 626.3928904791771), triples) + self.assertIn((face, bsf.height,858.6870625), triples) + # check embedding + emb = [o for s, p, o in triples if s == face and p == bsf.embedding] + self.assertEqual(len(emb), 1) + self.assertAlmostEqual(emb[0].sum(), -1.9049968) + # no triples on principal mismatch + self.assertListEqual(list(ext.extract(subject, content, set())), []) + # no triples on no content + self.assertListEqual(list(ext.extract(subject, [], principals)), []) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/extractor/image/face/test_identify.py b/test/extractor/image/face/test_identify.py new file mode 100644 index 0000000..dde41db --- /dev/null +++ b/test/extractor/image/face/test_identify.py @@ -0,0 +1,148 @@ + +# standard imports +import contextlib +import io +import os +import unittest + +# external imports +import requests + +# bsie imports +from bsie.extractor import base +from bsie.reader.face import FaceExtract +from bsie.utils import bsfs, node as _node, ns + +# objects to test +from bsie.extractor.image.face.identify import FaceIdentify, bsf + + +## code ## + +def fetch(source, target): + target = os.path.join(os.path.dirname(__file__), target) + if not os.path.exists(target): + with open(target, 'wb') as ofile: + ans = requests.get(source) + ofile.write(ans.content) + +class TestFaceIdentify(unittest.TestCase): + def setUp(self): + # download test images + fetch('https://www.bsfs.io/testdata/iepahGee1uch5ahr3ic1.jpg', 'testface1.jpg') + fetch('https://www.bsfs.io/testdata/Woayiesae8eiL9aivoba.jpg', 'testface2.jpg') + fetch('https://www.bsfs.io/testdata/ATiagheiduth4So5ohxi.jpg', 'testface3.jpg') + # download reference vectors + fetch('https://www.bsfs.io/testdata/aetie3foo0faiDaiBahk.npy', 'ref_embeds.npy') + fetch('https://www.bsfs.io/testdata/uopoS8gei8Phiek3shei.npy', 'ref_embeds_alt1.npy') + fetch('https://www.bsfs.io/testdata/Otoo7ain6Ied2Iep2ein.npy', 'ref_embeds_alt2.npy') + fetch('https://www.bsfs.io/testdata/ie0keriChafahroeRo7i.npy', 'ref_embeds_extra.npy') + fetch('https://www.bsfs.io/testdata/phoophui3teeni4hieKu.csv', 'ref_mapping.csv') + fetch('https://www.bsfs.io/testdata/Quit4Wum8ael7Zeis4ei.csv', 'ref_mapping_alt.csv') + fetch('https://www.bsfs.io/testdata/Angu5cioVei5pohgh0aa.csv', 'ref_mapping_id_reuse.csv') + fetch('https://www.bsfs.io/testdata/ooshooK1bai5Queengae.csv', 'ref_mapping_name_reuse.csv') + fetch('https://www.bsfs.io/testdata/eixuepah3Ronge7oe4qu.csv', 'ref_mapping_restklasse.csv') + + def test_essentials(self): + # setup + pth_embeds = os.path.join(os.path.dirname(__file__), 'ref_embeds.npy') + pth_embeds_alt1 = os.path.join(os.path.dirname(__file__), 'ref_embeds_alt1.npy') + pth_embeds_alt2 = os.path.join(os.path.dirname(__file__), 'ref_embeds_alt2.npy') + pth_mapping = os.path.join(os.path.dirname(__file__), 'ref_mapping.csv') + pth_mapping_alt = os.path.join(os.path.dirname(__file__), 'ref_mapping_alt.csv') + restklasse = 'https://example.com/user/fake_anon' + ext = FaceIdentify(pth_embeds, pth_mapping) + # string conversion returns class name + self.assertEqual(str(ext), 'FaceIdentify') + # representation respects number of embeddings + self.assertEqual(repr(ext), 'FaceIdentify(N=2, restklasse=https://example.com/user/anon)') + # representation respects restklasse + self.assertEqual(repr(FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse)), + 'FaceIdentify(N=2, restklasse=https://example.com/user/fake_anon)') + # identity + self.assertEqual(ext, FaceIdentify(pth_embeds, pth_mapping)) + self.assertEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping))) # FIXME! + # comparison respects embeddings + self.assertNotEqual(ext, FaceIdentify(pth_embeds_alt1, pth_mapping)) + self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds_alt1, pth_mapping))) + self.assertNotEqual(ext, FaceIdentify(pth_embeds_alt2, pth_mapping)) + self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds_alt2, pth_mapping))) + # comparison respects mappings + self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping_alt)) + self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping_alt))) + # comparison respects threshold + self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping, thres=0.1)) + self.assertNotEqual(hash(ext), hash(FaceIdentify(pth_embeds, pth_mapping, thres=0.1))) + # comparison respects restklasse + self.assertNotEqual(ext, FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse)) + self.assertNotEqual(hash(ext), + hash(FaceIdentify(pth_embeds, pth_mapping, restklasse=restklasse))) + + def test_construct(self): + pth_embeds = os.path.join(os.path.dirname(__file__), 'ref_embeds.npy') + pth_mapping = os.path.join(os.path.dirname(__file__), 'ref_mapping.csv') + # valid construction + self.assertIsInstance(FaceIdentify(pth_embeds, pth_mapping), FaceIdentify) + # restklasse may be part of the mapping + ext = FaceIdentify(pth_embeds, os.path.join(os.path.dirname(__file__), 'ref_mapping_restklasse.csv')) + self.assertIsInstance(ext, FaceIdentify) + self.assertEqual(ext._restidx, 1) + # pass invalid mapping (name re-use) + self.assertRaises(Exception, FaceIdentify, pth_embeds, + os.path.join(os.path.dirname(__file__), 'ref_mapping_name_reuse.csv')) + # pass invalid mapping (id re-use) + self.assertRaises(Exception, FaceIdentify, pth_embeds, + os.path.join(os.path.dirname(__file__), 'ref_mapping_id_reuse.csv')) + # pass invalid embeds (extra embeddings) + self.assertRaises(Exception, FaceIdentify, + os.path.join(os.path.dirname(__file__), 'ref_embeds_extra.npy'), + pth_mapping) + + def test_extract(self): + with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch + # setup + rdr = FaceExtract() + ext = FaceIdentify( + os.path.join(os.path.dirname(__file__), 'ref_embeds.npy'), + os.path.join(os.path.dirname(__file__), 'ref_mapping.csv'), + ) + subject = _node.Node(ns.bsfs.Entity) + content = rdr(os.path.join(os.path.dirname(__file__), 'testface1.jpg')) + principals = set(ext.principals) + face = _node.Node(ns.bsn.Face, ucid='2a7203c1515e0caa66a7461452c0b4552f1433a613cb3033e59ed2361790ad45') + person = _node.Node(ns.bsn.Person, uri='https://example.com/user/Angelina_Jolie') + triples = list(ext.extract(subject, content, principals)) + # principls is bse:face, bsf:depicts + self.assertSetEqual(set(ext.principals), { + ext.schema.predicate(ns.bse.face), + ext.schema.predicate(bsf.depicts) + }) + # produces two triples ... + self.assertEqual(len(triples), 2) + # ... one if at least one person was identified + self.assertIn((subject, ext.schema.predicate(ns.bse.face), face), triples) + # ... one for each identified person + self.assertIn((face, ext.schema.predicate(bsf.depicts), person), triples) + # produces no triples if no person was identified + content = rdr(os.path.join(os.path.dirname(__file__), 'testface2.jpg')) + self.assertListEqual(list(ext.extract(subject, content, principals)), []) + # identifies the correct person despite somewhat similar options + content = rdr(os.path.join(os.path.dirname(__file__), 'testface3.jpg')) + face = _node.Node(ns.bsn.Face, ucid='f61fac01ef686ee05805afef1e7a10ba54c30dc1aa095d9e77d79ccdfeb40dc5') + triples = list(ext.extract(subject, content, principals)) + self.assertEqual(len(triples), 2) + person = _node.Node(ns.bsn.Person, uri='https://example.com/user/Paul_Rudd') + self.assertIn((subject, ext.schema.predicate(ns.bse.face), face), triples) + self.assertIn((face, ext.schema.predicate(bsf.depicts), person), triples) + # no triples on principal mismatch + self.assertListEqual(list(ext.extract(subject, content, set())), []) + # no triples on no content + self.assertListEqual(list(ext.extract(subject, [], principals)), []) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/lib/test_naming_policy.py b/test/lib/test_naming_policy.py index 09fd6f6..a078fbd 100644 --- a/test/lib/test_naming_policy.py +++ b/test/lib/test_naming_policy.py @@ -35,6 +35,10 @@ class TestDefaultNamingPolicy(unittest.TestCase): self.assertEqual(policy.handle_node( Node(ns.bsn.Tag, label='hello')).uri, URI('http://example.com/me/tag#hello')) + # processes bsn:Face + self.assertEqual(policy.handle_node( + Node(ns.bsn.Face, ucid='hello')).uri, + URI('http://example.com/me/face#hello')) # raises an exception on unknown types self.assertRaises(errors.ProgrammingError, policy.handle_node, Node(ns.bsn.Invalid, ucid='abc123cba', size=123)) @@ -99,6 +103,18 @@ class TestDefaultNamingPolicy(unittest.TestCase): self.assertTrue(policy.name_tag( Node(ns.bsn.Tag,)).uri.startswith('http://example.com/me/tag#')) + def test_name_face(self): + # setup + policy = DefaultNamingPolicy('http://example.com', 'me') + # name_face uses ucid + self.assertEqual(policy.name_face( + Node(ns.bsn.Face, ucid='hello_world')).uri, + URI('http://example.com/me/face#hello_world')) + # name_face falls back to a random guid + self.assertTrue(policy.name_face( + Node(ns.bsn.Face)).uri.startswith('http://example.com/me/face#')) + + class TestNamingPolicyIterator(unittest.TestCase): def test_call(self): # NOTE: We test NamingPolicy.__call__ here diff --git a/test/reader/test_face.py b/test/reader/test_face.py new file mode 100644 index 0000000..f462853 --- /dev/null +++ b/test/reader/test_face.py @@ -0,0 +1,220 @@ + +# standard imports +import contextlib +import io +import os +import unittest + +# external imports +import requests +import PIL.Image + +# bsie imports +from bsie.utils import errors + +# objects to test +from bsie.reader.face import FaceExtract + + +## code ## + +def fetch(source, target): + target = os.path.join(os.path.dirname(__file__), target) + if not os.path.exists(target): + with open(target, 'wb') as ofile: + ans = requests.get(source) + ofile.write(ans.content) + +class TestFaceExtract(unittest.TestCase): + def setUp(self): + # download test image w/o face + fetch('https://www.bsfs.io/testdata/Quiejoore1ahxa9jahma.jpg', 'faces-noface.jpg') + # download test image w/ face + fetch('https://www.bsfs.io/testdata/ONekai7Ohphooch3aege.jpg', 'faces-ivan.jpg') + + def test_essentials(self): + # repr respects min_face_prob + self.assertEqual(repr(FaceExtract(min_face_prob=1.0)), 'FaceExtract(1.0)') + self.assertEqual(repr(FaceExtract(min_face_prob=0.5)), 'FaceExtract(0.5)') + # repr respects type + class Foo(FaceExtract): pass + self.assertEqual(repr(Foo(min_face_prob=0.5)), 'Foo(0.5)') + + # comparison respects type + class Foo(): pass + self.assertNotEqual(FaceExtract(), 1234) + self.assertNotEqual(hash(FaceExtract()), hash(1234)) + self.assertNotEqual(FaceExtract(), 'hello') + self.assertNotEqual(hash(FaceExtract()), hash('hello')) + self.assertNotEqual(FaceExtract(), Foo()) + self.assertNotEqual(hash(FaceExtract()), hash(Foo())) + # comparison respects constructor arguments (except cuda_device) + self.assertEqual(FaceExtract(), FaceExtract()) + self.assertEqual(hash(FaceExtract()), hash(FaceExtract())) + self.assertNotEqual(FaceExtract(), FaceExtract(target_size=10)) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(target_size=10))) + self.assertNotEqual(FaceExtract(), FaceExtract(min_face_size=10)) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(min_face_size=10))) + self.assertNotEqual(FaceExtract(), FaceExtract(min_face_prob=1.)) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(min_face_prob=1.))) + self.assertNotEqual(FaceExtract(), FaceExtract(ext_face_size=100)) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(ext_face_size=100))) + self.assertNotEqual(FaceExtract(), FaceExtract(thresholds=[0.1,0.1,0.1])) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(thresholds=[0.1,0.1,0.1]))) + self.assertNotEqual(FaceExtract(), FaceExtract(factor=1.)) + self.assertNotEqual(hash(FaceExtract()), hash(FaceExtract(factor=1.))) + # comparison ignores cuda_device + self.assertEqual(FaceExtract(), FaceExtract(cuda_device='cuda:123')) + self.assertEqual(hash(FaceExtract()), hash(FaceExtract(cuda_device='cuda:123'))) + + def test_preprocess(self): + testpath = os.path.join(os.path.dirname(__file__), 'faces-noface.jpg') + with PIL.Image.open(testpath) as img: + self.assertEqual(img.size, (199, 148)) + # landscape, downscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, None) + self.assertEqual(img.size, (100, 74)) + self.assertEqual(denorm((10,10)), (10*1.99, 10*2.0)) + # landscape, upscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, None) + self.assertEqual(img.size, (398, 296)) + self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5)) + # landscape, downscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 90) + self.assertEqual(img.size, (74, 100)) + self.assertEqual(denorm((10,10)), (10.0*1.99, 64*2.0)) + # landscape, upscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 90) + self.assertEqual(img.size, (296, 398)) + self.assertEqual(denorm((10,10)), (10*0.5, 286*0.5)) + # landscape, downscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 270) + self.assertEqual(img.size, (74, 100)) + self.assertEqual(denorm((10,10)), (90*1.99, 10*2.0)) + # landscape, upscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 270) + self.assertEqual(img.size, (296, 398)) + self.assertEqual(denorm((10,10)), (388*0.5, 10*0.5)) + # landscape, downscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 100, 180) + self.assertEqual(img.size, (100, 74)) + self.assertEqual(denorm((10,10)), (90*1.99, 64*2.0)) + # landscape, upscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 398, 180) + self.assertEqual(img.size, (398, 296)) + self.assertEqual(denorm((10,10)), (388*0.5, 286*0.5)) + # portrait, downscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, None) + self.assertEqual(img.size, (74, 100)) + self.assertEqual(denorm((10,10)), (10*2.0, 10*1.99)) + # portrait, upscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, None) + self.assertEqual(img.size, (296, 398)) + self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5)) + # portrait, downscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 90) + self.assertEqual(img.size, (100, 74)) + self.assertEqual(denorm((10,10)), (10.0*2.0, 90*1.99)) + # portrait, upscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 90) + self.assertEqual(img.size, (398, 296)) + self.assertEqual(denorm((10,10)), (10*0.5, 388*0.5)) + # portrait, downscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 270) + self.assertEqual(img.size, (100, 74)) + self.assertEqual(denorm((10,10)), (64*2.0, 10*1.99)) + # portrait, upscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 270) + self.assertEqual(img.size, (398, 296)) + self.assertEqual(denorm((10,10)), (286*0.5, 10*0.5)) + # portrait, downscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 100, 180) + self.assertEqual(img.size, (74, 100)) + self.assertEqual(denorm((10,10)), (64*2.0, 90*1.99)) + # portrait, upscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath).rotate(90, expand=True), 398, 180) + self.assertEqual(img.size, (296, 398)) + self.assertEqual(denorm((10,10)), (286*0.5, 388*0.5)) + + # square image + testpath = os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg') + with PIL.Image.open(testpath) as img: + self.assertEqual(img.size, (561, 561)) + # square, downscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, None) + self.assertEqual(img.size, (51, 51)) + self.assertEqual(denorm((10,10)), (10*11, 10*11)) + # square, upscale, no rotation + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, None) + self.assertEqual(img.size, (1122, 1122)) + self.assertEqual(denorm((10,10)), (10*0.5, 10*0.5)) + # square, downscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 90) + self.assertEqual(img.size, (51, 51)) + self.assertEqual(denorm((10,10)), (10.0*11, 41*11)) + # square, upscale, 90cw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 90) + self.assertEqual(img.size, (1122, 1122)) + self.assertEqual(denorm((10,10)), (10*0.5, 1112*0.5)) + # square, downscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 270) + self.assertEqual(img.size, (51, 51)) + self.assertEqual(denorm((10,10)), (41*11, 10*11)) + # square, upscale, 90ccw + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 270) + self.assertEqual(img.size, (1122, 1122)) + self.assertEqual(denorm((10,10)), (1112*0.5, 10*0.5)) + # square, downscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 51, 180) + self.assertEqual(img.size, (51, 51)) + self.assertEqual(denorm((10,10)), (41*11, 41*11)) + # square, upscale, 180 + img, denorm = FaceExtract.preprocess(PIL.Image.open(testpath), 1122, 180) + self.assertEqual(img.size, (1122, 1122)) + self.assertEqual(denorm((10,10)), (1112*0.5, 1112*0.5)) + + def test_call(self): + with contextlib.redirect_stderr(io.StringIO()): # NOTE: hide warnings from facenet_pytorch + rdr = FaceExtract() + # discards non-image files + self.assertRaises(errors.UnsupportedFileFormatError, rdr, + __file__) + # raises on invalid image + self.assertRaises(errors.UnsupportedFileFormatError, rdr, + os.path.join(os.path.dirname(__file__), 'testimage_exif_corrupted.jpg')) + # raises on missing file + self.assertRaises(errors.ReaderError, rdr, + os.path.join(os.path.dirname(__file__), 'invalid.jpg')) + + # may return empty list + self.assertListEqual(FaceExtract(min_face_prob=1)( + os.path.join(os.path.dirname(__file__), 'faces-noface.jpg')), []) + self.assertListEqual(FaceExtract(min_face_prob=1)( + os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg')), []) + # returns faces + faces = rdr(os.path.join(os.path.dirname(__file__), 'faces-ivan.jpg')) + # check if face was detected + self.assertEqual(len(faces), 1) + # check ucid + self.assertSetEqual({f['ucid'] for f in faces}, { + '926dc1684dd453aa2c3c8daf1c82ecf918514ef0de416b6b842235c23bec32ee', + }) + # check embedding + for face in faces: + self.assertEqual(face['embedding'].shape, (512, )) + # check bbox + self.assertAlmostEqual(faces[0]['x'], 275.8, 2) + self.assertAlmostEqual(faces[0]['y'], 91.67, 2) + self.assertAlmostEqual(faces[0]['width'], 50.5, 2) + self.assertAlmostEqual(faces[0]['height'], 65.42, 2) + + # FIXME: RuntimeError + # FIXME: ValueError + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## |