diff options
-rw-r--r-- | bsfs/__init__.py | 8 | ||||
-rw-r--r-- | bsfs/utils/__init__.py | 20 | ||||
-rw-r--r-- | bsfs/utils/commons.py | 23 | ||||
-rw-r--r-- | bsfs/utils/uri.py | 196 | ||||
-rw-r--r-- | test/__init__.py | 0 | ||||
-rw-r--r-- | test/utils/__init__.py | 0 | ||||
-rw-r--r-- | test/utils/test_commons.py | 31 | ||||
-rw-r--r-- | test/utils/test_uri.py | 171 |
8 files changed, 449 insertions, 0 deletions
diff --git a/bsfs/__init__.py b/bsfs/__init__.py new file mode 100644 index 0000000..f5f5cbc --- /dev/null +++ b/bsfs/__init__.py @@ -0,0 +1,8 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" + +## EOF ## diff --git a/bsfs/utils/__init__.py b/bsfs/utils/__init__.py new file mode 100644 index 0000000..56a9323 --- /dev/null +++ b/bsfs/utils/__init__.py @@ -0,0 +1,20 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# inner-module imports +from .commons import typename +from .uri import URI + +# exports +__all__ : typing.Sequence[str] = ( + 'URI', + 'typename', + ) + +## EOF ## diff --git a/bsfs/utils/commons.py b/bsfs/utils/commons.py new file mode 100644 index 0000000..bad2fe0 --- /dev/null +++ b/bsfs/utils/commons.py @@ -0,0 +1,23 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import typing + +# exports +__all__: typing.Sequence[str] = ( + 'typename', + ) + + +## code ## + +def typename(obj) -> str: + """Return the type name of *obj*.""" + return type(obj).__name__ + + +## EOF ## diff --git a/bsfs/utils/uri.py b/bsfs/utils/uri.py new file mode 100644 index 0000000..a56423a --- /dev/null +++ b/bsfs/utils/uri.py @@ -0,0 +1,196 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import re +import typing + +# constants +RX_URI = re.compile(r''' + ^ + (?:(?P<scheme>[^:/?#]+):)? # scheme, ://-delimited + (?://(?P<authority>[^/?#]*))? # authority (user@host:port), [/#?]-delimited + (?P<path>[^?#]*) # path, [#?]-delimited + (?:\?(?P<query>[^#]*))? # query, [#]-delimited + (?:\#(?P<fragment>.*))? # fragment, remaining characters + $ + ''', re.VERBOSE + re.IGNORECASE) + +RX_HOST = re.compile(r''' + ^ + (?:(?P<userinfo>[^@]*)@)? # userinfo + (?P<host> + (?:\[[^\]]+\]) | # IPv6 address + (?:[^:]+) # IPv4 address or regname + ) + (?::(?P<port>\d*))? # port + $ + ''', re.VERBOSE + re.IGNORECASE) + +# exports +__all__: typing.Sequence[str] = ( + 'URI', + ) + + +## code ## + +def _get_optional( + regexp: re.Pattern, + query: str, + grp: str + ) -> str: + """Return the regular expression *regexp*'s group *grp* of *query* + or raise a `ValueError` if the *query* doesn't match the expression. + """ + parts = regexp.search(query) + if parts is not None: + if parts.group(grp) is not None: + return parts.group(grp) + raise ValueError(query) + + +class URI(str): + """URI additions to built-in strings. + + Provides properties to access the different components of an URI, + according to RFC 3986 (https://datatracker.ietf.org/doc/html/rfc3986). + + Note that this class does not actually validate an URI but only offers + access to components of a *well-formed* URI. Use `urllib.parse` for + more advanced purposes. + + """ + + def __new__(cls, value: str): + """Create a new URI instance. + Raises a `ValueError` if the (supposed) URI is malformatted. + """ + if not cls.is_parseable(value): + raise ValueError(value) + return str.__new__(cls, value) + + @staticmethod + def is_parseable(query: str) -> bool: + """Return True if the *query* can be decomposed into the URI components. + + Note that a valid URI is always parseable, however, an invalid URI + might be parseable as well. The return value of this method makes + no claim about the validity of an URI! + + """ + # check uri + parts = RX_URI.match(query) + if parts is not None: + # check authority + authority = parts.group('authority') + if authority is None or RX_HOST.match(authority) is not None: + return True + # some check not passed + return False + + @staticmethod + def compose( + path: str, + scheme: typing.Optional[str] = None, + authority: typing.Optional[str] = None, + user: typing.Optional[str] = None, + host: typing.Optional[str] = None, + port: typing.Optional[int] = None, + query: typing.Optional[str] = None, + fragment: typing.Optional[str] = None, + ): + """URI composition from components. + + If the *host* argument is supplied, the authority is composed of *user*, + *host*, and *port* arguments, and the *authority* argument is ignored. + Note that if the *host* is an IPv6 address, it must be enclosed in brackets. + """ + # strip whitespaces + path = path.strip() + + # compose authority + if host is not None: + authority = '' + if user is not None: + authority += user + '@' + authority += host + if port is not None: + authority += ':' + str(port) + + # ensure root on path + if path[0] != '/': + path = '/' + path + + # compose uri + uri = '' + if scheme is not None: + uri += scheme + ':' + if authority is not None: + uri += '//' + authority + uri += path + if query is not None: + uri += '?' + query + if fragment is not None: + uri += '#' + fragment + + # return as URI + return URI(uri) + + @property + def scheme(self) -> str: + """Return the protocol/scheme part of the URI.""" + return _get_optional(RX_URI, self, 'scheme') + + @property + def authority(self) -> str: + """Return the authority part of the URI, including userinfo and port.""" + return _get_optional(RX_URI, self, 'authority') + + @property + def userinfo(self) -> str: + """Return the userinfo part of the URI.""" + return _get_optional(RX_HOST, self.authority, 'userinfo') + + @property + def host(self) -> str: + """Return the host part of the URI.""" + return _get_optional(RX_HOST, self.authority, 'host') + + @property + def port(self) -> int: + """Return the port part of the URI.""" + return int(_get_optional(RX_HOST, self.authority, 'port')) + + @property + def path(self) -> str: + """Return the path part of the URI.""" + return _get_optional(RX_URI, self, 'path') + + @property + def query(self) -> str: + """Return the query part of the URI.""" + return _get_optional(RX_URI, self, 'query') + + @property + def fragment(self) -> str: + """Return the fragment part of the URI.""" + return _get_optional(RX_URI, self, 'fragment') + + def get(self, component: str, default: typing.Optional[typing.Any] = None) -> typing.Optional[typing.Any]: + """Return the component or a default value.""" + # check args + if component not in ('scheme', 'authority', 'userinfo', 'host', + 'port', 'path', 'query', 'fragment'): + raise ValueError(component) + try: + # return component's value + return getattr(self, component) + except ValueError: + # return the default value + return default + +## EOF ## diff --git a/test/__init__.py b/test/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/__init__.py diff --git a/test/utils/__init__.py b/test/utils/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/test/utils/__init__.py diff --git a/test/utils/test_commons.py b/test/utils/test_commons.py new file mode 100644 index 0000000..ce73788 --- /dev/null +++ b/test/utils/test_commons.py @@ -0,0 +1,31 @@ +""" + +Part of the tagit test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import unittest + +# objects to test +from bsfs.utils.commons import typename + + +## code ## + +class TestCommons(unittest.TestCase): + def test_typename(self): + class Foo(): pass + self.assertEqual(typename(Foo()), 'Foo') + self.assertEqual(typename('hello'), 'str') + self.assertEqual(typename(123), 'int') + self.assertEqual(typename(None), 'NoneType') + + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## diff --git a/test/utils/test_uri.py b/test/utils/test_uri.py new file mode 100644 index 0000000..976e75d --- /dev/null +++ b/test/utils/test_uri.py @@ -0,0 +1,171 @@ +""" + +Part of the tagit test suite. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import unittest + +# objects to test +from bsfs.utils.uri import URI + + +## code ## + +class TestURI(unittest.TestCase): + + def test_new(self): + # cannot create an unparseable URI + self.assertRaises(ValueError, URI, 'http://') + # returns URI otherwise + self.assertIsInstance(URI('http://user@www.example.com:1234/path0/path1?query#fragment'), URI) + + def test_is_parseable(self): + # empty string is a parseable uri + self.assertTrue(URI.is_parseable('')) + # examples from the RFC are parseable + self.assertTrue(URI.is_parseable('foo://example.com:8042/over/there?name=ferret#nose')) + self.assertTrue(URI.is_parseable('urn:example:animal:ferret:nose')) + self.assertTrue(URI.is_parseable('mailto:fred@xample.com')) + self.assertTrue(URI.is_parseable('www.w3.org/Addressing/')) + self.assertTrue(URI.is_parseable('ftp://cnn.example.com&store=breaking_news@10.0.0.1/top_story.htm')) + self.assertTrue(URI.is_parseable('ftp://ftp.is.co.za/rfc/rfc1808.txt')) + self.assertTrue(URI.is_parseable('http://www.ietf.org/rfc/rfc2396.txt')) + self.assertTrue(URI.is_parseable('ldap://[2001:db8::7]/c=GB?objectClass?one')) + self.assertTrue(URI.is_parseable('mailto:John.Doe@example.com')) + self.assertTrue(URI.is_parseable('news:comp.infosystems.www.servers.unix')) + self.assertTrue(URI.is_parseable('tel:+1-816-555-1212')) + self.assertTrue(URI.is_parseable('telnet://192.0.2.16:80/')) + self.assertTrue(URI.is_parseable('urn:oasis:names:specification:docbook:dtd:xml:4.1.2')) + + # uri cannot end with a scheme delimiter + self.assertFalse(URI.is_parseable('http://')) + # port must be a number + self.assertFalse(URI.is_parseable('http://example.com:foo/')) + # the double slash (//) implies a authority + self.assertFalse(URI.is_parseable('http:///path0/path1?query#fragment')) + + def test_compose(self): + self.assertEqual(URI.compose('path'), '/path') + self.assertEqual(URI.compose('/path'), '/path') # leading slash is not repeated + self.assertEqual(URI.compose('path', scheme='scheme'), 'scheme:/path') + self.assertEqual(URI.compose('path', authority='authority'), '//authority/path') + self.assertEqual(URI.compose('path', host='host'), '//host/path') + self.assertEqual(URI.compose('path', user='user'), '/path') # user w/o host is ignored + self.assertEqual(URI.compose('path', host='host', user='user'), '//user@host/path') + self.assertEqual(URI.compose('path', port='port'), '/path') # port w/o host is ignored + self.assertEqual(URI.compose('path', host='host', port=1234), '//host:1234/path') + self.assertEqual(URI.compose('path', host='host', port='1234'), '//host:1234/path') + self.assertRaises(ValueError, URI.compose, 'path', host='host', port='foo') # port must be a number + self.assertEqual(URI.compose('path', host='host', user='foo', port='1234'), '//foo@host:1234/path') + self.assertEqual(URI.compose('path', query='query'), '/path?query') + self.assertEqual(URI.compose('path', fragment='fragment'), '/path#fragment') + self.assertEqual(URI.compose('path', 'scheme', 'authority', 'user', 'host', 1234, 'query', 'fragment'), + 'scheme://user@host:1234/path?query#fragment') + + def test_get(self): + # get returns the respective component + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('scheme'), 'http') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('authority'), 'user@www.example.com:1234') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('userinfo'), 'user') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('host'), 'www.example.com') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('port'), 1234) + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('path'), '/path0/path1') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('query'), 'query') + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').get('fragment'), 'fragment') + # get returns a default value if the component is missing + class Foo(): pass + foo = Foo() + self.assertEqual(URI('//user@www.example.com:1234/path0/path1?query#fragment').get('scheme', foo), foo) + self.assertEqual(URI('/path0/path1?query#fragment').get('authority', foo), foo) + self.assertEqual(URI('http://www.example.com:1234/path0/path1?query#fragment').get('userinfo', foo), foo) + self.assertEqual(URI('/path0/path1?query#fragment').get('host', foo), foo) + self.assertEqual(URI('http://user@www.example.com/path0/path1?query#fragment').get('port', foo), foo) + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1#fragment').get('query', foo), foo) + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query').get('fragment', foo), foo) + # can only get components + self.assertRaises(ValueError, URI('').get, 'foobar') + + def test_scheme(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').scheme, 'http') + self.assertEqual(URI('ftp://user@www.example.com:1234/path0/path1?query#fragment').scheme, 'ftp') + self.assertEqual(URI('myown://user@www.example.com:1234/path0/path1?query#fragment').scheme, 'myown') + # empty scheme + self.assertRaises(ValueError, getattr, URI('www.example.com/path0/path1?query#fragment'), 'scheme') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'scheme') + + def test_authority(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').authority, 'user@www.example.com:1234') + # empty authority + self.assertRaises(ValueError, getattr, URI('http/path0/path1?query#fragment'), 'authority') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'authority') + + def test_userinfo(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').scheme, 'http') + # empty authority + self.assertRaises(ValueError, getattr, URI('http/path0/path1?query#fragment'), 'userinfo') + # empty userinfo + self.assertRaises(ValueError, getattr, URI('http://www.example.com:1234/path0/path1?query#fragment'), 'userinfo') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'userinfo') + + def test_host(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').host, 'www.example.com') + # IPv4 host + self.assertEqual(URI('http://user@10.0.0.1:1234/path0/path1?query#fragment').host, '10.0.0.1') + # IPv6 host + self.assertEqual(URI('http://user@[::64]:1234/path0/path1?query#fragment').host, '[::64]') + # empty authority + self.assertRaises(ValueError, getattr, URI('http/path0/path1?query#fragment'), 'host') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'host') + + def test_port(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').port, 1234) + # empty authority + self.assertRaises(ValueError, getattr, URI('http/path0/path1?query#fragment'), 'port') + # empty port + self.assertRaises(ValueError, getattr, URI('http://user@www.example.com/path0/path1?query#fragment'), 'port') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'port') + + def test_path(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').path, '/path0/path1') + # empty path + self.assertEqual(URI('http://user@www.example.com:1234?query#fragment').path, '') + # empty URI + self.assertEqual(URI('').path, '') + + def test_query(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').query, 'query') + # empty query + self.assertRaises(ValueError, getattr, URI('http://user@www.example.com:1234/path0/path1#fragment'), 'query') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'query') + + def test_fragment(self): + # full URI + self.assertEqual(URI('http://user@www.example.com:1234/path0/path1?query#fragment').fragment, 'fragment') + # empty fragment + self.assertRaises(ValueError, getattr, URI('http://user@www.example.com:1234/path0/path1?query'), 'fragment') + # empty URI + self.assertRaises(ValueError, getattr, URI(''), 'fragment') + + + +## main ## + +if __name__ == '__main__': + unittest.main() + +## EOF ## |