diff options
author | Matthias Baumgartner <dev@igsor.net> | 2022-12-08 16:32:52 +0100 |
---|---|---|
committer | Matthias Baumgartner <dev@igsor.net> | 2022-12-08 16:32:52 +0100 |
commit | bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8 (patch) | |
tree | ca7fda37b4f01fe969b4f9b2497d900cbabc7d4e /bsfs/utils/uri.py | |
parent | 8c0fbff6e05e5e4afe5605925052f114f2b95ba2 (diff) | |
download | bsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.tar.gz bsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.tar.bz2 bsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.zip |
uri and some utils
Diffstat (limited to 'bsfs/utils/uri.py')
-rw-r--r-- | bsfs/utils/uri.py | 196 |
1 files changed, 196 insertions, 0 deletions
diff --git a/bsfs/utils/uri.py b/bsfs/utils/uri.py new file mode 100644 index 0000000..a56423a --- /dev/null +++ b/bsfs/utils/uri.py @@ -0,0 +1,196 @@ +""" + +Part of the BlackStar filesystem (bsfs) module. +A copy of the license is provided with the project. +Author: Matthias Baumgartner, 2022 +""" +# imports +import re +import typing + +# constants +RX_URI = re.compile(r''' + ^ + (?:(?P<scheme>[^:/?#]+):)? # scheme, ://-delimited + (?://(?P<authority>[^/?#]*))? # authority (user@host:port), [/#?]-delimited + (?P<path>[^?#]*) # path, [#?]-delimited + (?:\?(?P<query>[^#]*))? # query, [#]-delimited + (?:\#(?P<fragment>.*))? # fragment, remaining characters + $ + ''', re.VERBOSE + re.IGNORECASE) + +RX_HOST = re.compile(r''' + ^ + (?:(?P<userinfo>[^@]*)@)? # userinfo + (?P<host> + (?:\[[^\]]+\]) | # IPv6 address + (?:[^:]+) # IPv4 address or regname + ) + (?::(?P<port>\d*))? # port + $ + ''', re.VERBOSE + re.IGNORECASE) + +# exports +__all__: typing.Sequence[str] = ( + 'URI', + ) + + +## code ## + +def _get_optional( + regexp: re.Pattern, + query: str, + grp: str + ) -> str: + """Return the regular expression *regexp*'s group *grp* of *query* + or raise a `ValueError` if the *query* doesn't match the expression. + """ + parts = regexp.search(query) + if parts is not None: + if parts.group(grp) is not None: + return parts.group(grp) + raise ValueError(query) + + +class URI(str): + """URI additions to built-in strings. + + Provides properties to access the different components of an URI, + according to RFC 3986 (https://datatracker.ietf.org/doc/html/rfc3986). + + Note that this class does not actually validate an URI but only offers + access to components of a *well-formed* URI. Use `urllib.parse` for + more advanced purposes. + + """ + + def __new__(cls, value: str): + """Create a new URI instance. + Raises a `ValueError` if the (supposed) URI is malformatted. + """ + if not cls.is_parseable(value): + raise ValueError(value) + return str.__new__(cls, value) + + @staticmethod + def is_parseable(query: str) -> bool: + """Return True if the *query* can be decomposed into the URI components. + + Note that a valid URI is always parseable, however, an invalid URI + might be parseable as well. The return value of this method makes + no claim about the validity of an URI! + + """ + # check uri + parts = RX_URI.match(query) + if parts is not None: + # check authority + authority = parts.group('authority') + if authority is None or RX_HOST.match(authority) is not None: + return True + # some check not passed + return False + + @staticmethod + def compose( + path: str, + scheme: typing.Optional[str] = None, + authority: typing.Optional[str] = None, + user: typing.Optional[str] = None, + host: typing.Optional[str] = None, + port: typing.Optional[int] = None, + query: typing.Optional[str] = None, + fragment: typing.Optional[str] = None, + ): + """URI composition from components. + + If the *host* argument is supplied, the authority is composed of *user*, + *host*, and *port* arguments, and the *authority* argument is ignored. + Note that if the *host* is an IPv6 address, it must be enclosed in brackets. + """ + # strip whitespaces + path = path.strip() + + # compose authority + if host is not None: + authority = '' + if user is not None: + authority += user + '@' + authority += host + if port is not None: + authority += ':' + str(port) + + # ensure root on path + if path[0] != '/': + path = '/' + path + + # compose uri + uri = '' + if scheme is not None: + uri += scheme + ':' + if authority is not None: + uri += '//' + authority + uri += path + if query is not None: + uri += '?' + query + if fragment is not None: + uri += '#' + fragment + + # return as URI + return URI(uri) + + @property + def scheme(self) -> str: + """Return the protocol/scheme part of the URI.""" + return _get_optional(RX_URI, self, 'scheme') + + @property + def authority(self) -> str: + """Return the authority part of the URI, including userinfo and port.""" + return _get_optional(RX_URI, self, 'authority') + + @property + def userinfo(self) -> str: + """Return the userinfo part of the URI.""" + return _get_optional(RX_HOST, self.authority, 'userinfo') + + @property + def host(self) -> str: + """Return the host part of the URI.""" + return _get_optional(RX_HOST, self.authority, 'host') + + @property + def port(self) -> int: + """Return the port part of the URI.""" + return int(_get_optional(RX_HOST, self.authority, 'port')) + + @property + def path(self) -> str: + """Return the path part of the URI.""" + return _get_optional(RX_URI, self, 'path') + + @property + def query(self) -> str: + """Return the query part of the URI.""" + return _get_optional(RX_URI, self, 'query') + + @property + def fragment(self) -> str: + """Return the fragment part of the URI.""" + return _get_optional(RX_URI, self, 'fragment') + + def get(self, component: str, default: typing.Optional[typing.Any] = None) -> typing.Optional[typing.Any]: + """Return the component or a default value.""" + # check args + if component not in ('scheme', 'authority', 'userinfo', 'host', + 'port', 'path', 'query', 'fragment'): + raise ValueError(component) + try: + # return component's value + return getattr(self, component) + except ValueError: + # return the default value + return default + +## EOF ## |