diff options
author | Matthias Baumgartner <dev@igsor.net> | 2023-04-05 17:16:14 +0200 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2023-04-05 17:16:14 +0200 |
commit | 63fe1d017e2fad8181e3ff47185b974304957d56 (patch) | |
tree | 868748fd54ae2648ba8deedef978d4a669bff564 | |
parent | af81318ae9311fd0b0e16949cef3cfaf7996970b (diff) | |
download | bsie-63fe1d017e2fad8181e3ff47185b974304957d56.tar.gz bsie-63fe1d017e2fad8181e3ff47185b974304957d56.tar.bz2 bsie-63fe1d017e2fad8181e3ff47185b974304957d56.zip |
IPTC tag extraction
-rw-r--r-- | bsie/extractor/image/iptc.py | 70 | ||||
-rw-r--r-- | bsie/lib/naming_policy.py | 15 | ||||
-rw-r--r-- | bsie/reader/exif.py | 21 | ||||
-rw-r--r-- | bsie/utils/namespaces.py | 2 | ||||
-rw-r--r-- | test/extractor/image/test_iptc.py | 69 | ||||
-rw-r--r-- | test/lib/test_naming_policy.py | 27 | ||||
-rw-r--r-- | test/reader/test_exif.py | 22 | ||||
-rw-r--r-- | test/reader/testimage_exif.jpg | bin | 719 -> 777 bytes |
8 files changed, 225 insertions, 1 deletions
diff --git a/bsie/extractor/image/iptc.py b/bsie/extractor/image/iptc.py new file mode 100644 index 0000000..195eff7 --- /dev/null +++ b/bsie/extractor/image/iptc.py @@ -0,0 +1,70 @@ + +# standard imports +import typing + +# bsie imports +from bsie.utils import bsfs, node, ns + +# inner-module imports +from .. import base + +# exports +__all__: typing.Sequence[str] = ( + 'Iptc', + ) + + +## code ## + +class Iptc(base.Extractor): + """Turn IPTC keywords into tags.""" + + CONTENT_READER = 'bsie.reader.exif.Iptc' + + def __init__(self): + super().__init__(bsfs.schema.from_string(base.SCHEMA_PREAMBLE + ''' + bsn:Tag rdfs:subClassOf bsfs:Node . + + bse:tag rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Entity ; + rdfs:range bsn:Tag . + + <https://schema.bsfs.io/ie/Node/Tag#label> rdfs:subClassOf bsfs:Predicate ; + rdfs:domain bsn:Tag ; + rdfs:range xsd:string ; + bsfs:unique "true"^^xsd:boolean . + + ''')) + self._callmap = { + self.schema.predicate(ns.bse.tag): self._keywords, + } + + def extract( + self, + subject: node.Node, + content: dict, + principals: typing.Iterable[bsfs.schema.Predicate], + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + for pred in principals: + # find callback + clbk = self._callmap.get(pred) + if clbk is None: + continue + # produce triples + yield from clbk(subject, content) + + def _keywords( + self, + subject: node.Node, + content: dict, + ) -> typing.Iterator[typing.Tuple[node.Node, bsfs.schema.Predicate, typing.Any]]: + if 'Iptc.Application2.Keywords' not in content: + return + for keyword in content['Iptc.Application2.Keywords']: + tag = node.Node(ns.bsn.Tag, label=keyword) + yield subject, self.schema.predicate(ns.bse.tag), tag + yield tag, self.schema.predicate(ns.bst.label), keyword + + + +## EOF ## diff --git a/bsie/lib/naming_policy.py b/bsie/lib/naming_policy.py index 9b9a45d..3e7c940 100644 --- a/bsie/lib/naming_policy.py +++ b/bsie/lib/naming_policy.py @@ -4,6 +4,9 @@ import abc import os import typing +# external imports +import urllib.parse + # bsie imports from bsie.utils import bsfs, errors, ns from bsie.utils.node import Node @@ -84,6 +87,8 @@ class DefaultNamingPolicy(NamingPolicy): return self.name_file(node) if node.node_type == ns.bsn.Preview: return self.name_preview(node) + if node.node_type == ns.bsn.Tag: + return self.name_tag(node) raise errors.ProgrammingError('no naming policy available for {node.node_type}') def name_file(self, node: Node) -> Node: @@ -112,4 +117,14 @@ class DefaultNamingPolicy(NamingPolicy): node.uri = getattr(self._prefix.preview(), fragment) return node + def name_tag(self, node: Node) -> Node: + # NOTE: Must ensure to produce the same name for that tags with the same label. + if 'label' in node.hints: # tag label + fragment = urllib.parse.quote(node.hints['label']) + else: # random name + fragment = self._uuid() + # FIXME: match to existing tags in bsfs storage! + node.uri = getattr(self._prefix.tag(), fragment) + return node + ## EOF ## diff --git a/bsie/reader/exif.py b/bsie/reader/exif.py index 2d0428b..7ec7574 100644 --- a/bsie/reader/exif.py +++ b/bsie/reader/exif.py @@ -17,6 +17,7 @@ MATCH_RULE = 'mime=image/jpeg' # exports __all__: typing.Sequence[str] = ( 'Exif', + 'Iptc', ) @@ -41,4 +42,24 @@ class Exif(base.Reader): except (TypeError, OSError, RuntimeError) as err: raise errors.ReaderError(path) from err + +class Iptc(base.Reader): + """Use pyexiv2 to read iptc metadata from image files.""" + + def __init__(self): + self._match = filematcher.parse(MATCH_RULE) + + def __call__(self, path: str) -> dict: + # perform quick checks first + if not self._match(path): + raise errors.UnsupportedFileFormatError(path) + + try: + # open the file + img = pyexiv2.Image(path) + # read metadata + return img.read_iptc() + except (TypeError, OSError, RuntimeError) as err: + raise errors.ReaderError(path) from err + ## EOF ## diff --git a/bsie/utils/namespaces.py b/bsie/utils/namespaces.py index 4a66048..9357253 100644 --- a/bsie/utils/namespaces.py +++ b/bsie/utils/namespaces.py @@ -20,6 +20,7 @@ bsf = bsie.Literal.Array.Feature bsl = bsfs.Literal bsn = bsie.Node bsp = bsie.Node.Preview() +bst = bsie.Node.Tag() # export __all__: typing.Sequence[str] = ( @@ -32,6 +33,7 @@ __all__: typing.Sequence[str] = ( 'bsl', 'bsn', 'bsp', + 'bst', 'xsd', ) diff --git a/test/extractor/image/test_iptc.py b/test/extractor/image/test_iptc.py new file mode 100644 index 0000000..5fa763d --- /dev/null +++ b/test/extractor/image/test_iptc.py @@ -0,0 +1,69 @@ + +# standard imports +import unittest + +# bsie imports +from bsie.extractor import base +from bsie.utils import bsfs, node as _node, ns + +# objects to test +from bsie.extractor.image.iptc import Iptc + + +## code ## + +class TestIptc(unittest.TestCase): + + def test_eq(self): + # identical instances are equal + self.assertEqual(Iptc(), Iptc()) + self.assertEqual(hash(Iptc()), hash(Iptc())) + # comparison respects type + class Foo(): pass + self.assertNotEqual(Iptc(), Foo()) + self.assertNotEqual(hash(Iptc()), hash(Foo())) + self.assertNotEqual(Iptc(), 1234) + self.assertNotEqual(hash(Iptc()), hash(1234)) + self.assertNotEqual(Iptc(), None) + self.assertNotEqual(hash(Iptc()), hash(None)) + + def test_schema(self): + self.assertSetEqual({pred.uri for pred in Iptc().schema.predicates()}, { + ns.bsfs.Predicate, + ns.bse.tag, + ns.bst.label, + }) + + def test_extract(self): + ext = Iptc() + node = _node.Node(ns.bsfs.File, '') # Blank node + content = { + 'Iptc.Application2.Keywords': ['hello', 'world'], + 'Iptc.Application2.RecordVersion': '4', + } + # target tags + t_hello = _node.Node(ns.bsn.Tag, label='hello') + t_world = _node.Node(ns.bsn.Tag, label='world') + + # invalid principals are ignored + self.assertSetEqual(set(ext.extract(node, content, {ns.bse.filename})), set()) + # extract finds all relevant information + self.assertSetEqual(set(ext.extract(node, content, {ext.schema.predicate(ns.bse.tag)})), { + (node, ext.schema.predicate(ns.bse.tag), t_hello), + (node, ext.schema.predicate(ns.bse.tag), t_world), + (t_hello, ext.schema.predicate(ns.bst.label), 'hello'), + (t_world, ext.schema.predicate(ns.bst.label), 'world'), + }) + + # empty content is acceptable + self.assertSetEqual(set(ext.extract(node, {}, set(ext.principals))), set()) + # no principals is acceptable + self.assertSetEqual(set(ext.extract(node, content, set())), set()) + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/lib/test_naming_policy.py b/test/lib/test_naming_policy.py index c9b0cd2..b284fc0 100644 --- a/test/lib/test_naming_policy.py +++ b/test/lib/test_naming_policy.py @@ -31,6 +31,10 @@ class TestDefaultNamingPolicy(unittest.TestCase): self.assertEqual(policy.handle_node( Node(ns.bsn.Preview, ucid='abc123cba', size=123)).uri, URI('http://example.com/me/preview#abc123cba_s123')) + # processes bsn:Tag + self.assertEqual(policy.handle_node( + Node(ns.bsn.Tag, label='hello')).uri, + URI('http://example.com/me/tag#hello')) # raises an exception on unknown types self.assertRaises(errors.ProgrammingError, policy.handle_node, Node(ns.bsn.Invalid, ucid='abc123cba', size=123)) @@ -71,6 +75,29 @@ class TestDefaultNamingPolicy(unittest.TestCase): self.assertTrue(policy.name_preview( Node(ns.bsn.Preview, size=200)).uri.endswith('_s200')) + def test_name_tag(self): + # setup + policy = DefaultNamingPolicy('http://example.com', 'me') + # name_tag uses label + self.assertEqual(policy.name_tag( + Node(ns.bsn.Tag, label='hello')).uri, + URI('http://example.com/me/tag#hello')) + # name_tag matches the label + self.assertEqual( + policy.name_tag(Node(ns.bsn.Tag, label='world')), + policy.name_tag(Node(ns.bsn.Tag, label='world')), + ) + self.assertNotEqual( + policy.name_tag(Node(ns.bsn.Tag, label='hello')), + policy.name_tag(Node(ns.bsn.Tag, label='world')), + ) + # label can include characters that are not valid for an uri + self.assertEqual(policy.name_tag( + Node(ns.bsn.Preview, label='hello world { foo bar ] ')).uri, + URI('http://example.com/me/tag#hello%20world%20%7B%20foo%20bar%20%5D%20')) + # name_tag falls back to a random guid + self.assertTrue(policy.name_tag( + Node(ns.bsn.Tag,)).uri.startswith('http://example.com/me/tag#')) class TestNamingPolicyIterator(unittest.TestCase): diff --git a/test/reader/test_exif.py b/test/reader/test_exif.py index de6e801..1767f12 100644 --- a/test/reader/test_exif.py +++ b/test/reader/test_exif.py @@ -10,7 +10,7 @@ import pyexiv2 from bsie.utils import errors # objects to test -from bsie.reader.exif import Exif +from bsie.reader.exif import Exif, Iptc ## code ## @@ -44,6 +44,26 @@ class TestExif(unittest.TestCase): }) +class TestIptc(unittest.TestCase): + def test_call(self): + rdr = Iptc() + # discards non-image files + self.assertRaises(errors.UnsupportedFileFormatError, rdr, + os.path.join(os.path.dirname(__file__), 'invalid.doc')) + # raises on invalid image files + self.assertRaises(errors.UnsupportedFileFormatError, rdr, + os.path.join(os.path.dirname(__file__), 'invalid.jpg')) + # raises on invalid image files + pyexiv2.set_log_level(3) # suppress log message + self.assertRaises(errors.ReaderError, rdr, + os.path.join(os.path.dirname(__file__), 'testimage_exif_corrupted.jpg')) + # returns dict with exif info + self.assertDictEqual(rdr(os.path.join(os.path.dirname(__file__), 'testimage_exif.jpg')), { + 'Iptc.Application2.Keywords': ['hello', 'world'], + 'Iptc.Application2.RecordVersion': '4', + }) + + ## main ## if __name__ == '__main__': diff --git a/test/reader/testimage_exif.jpg b/test/reader/testimage_exif.jpg Binary files differindex a774bc2..bc331ac 100644 --- a/test/reader/testimage_exif.jpg +++ b/test/reader/testimage_exif.jpg |