aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/utils/uri.py
diff options
context:
space:
mode:
authorMatthias Baumgartner <dev@igsor.net>2022-12-08 16:32:52 +0100
committerMatthias Baumgartner <dev@igsor.net>2022-12-08 16:32:52 +0100
commitbbfcee4fffc553b5dd08f37a79dd6ccddbf340f8 (patch)
treeca7fda37b4f01fe969b4f9b2497d900cbabc7d4e /bsfs/utils/uri.py
parent8c0fbff6e05e5e4afe5605925052f114f2b95ba2 (diff)
downloadbsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.tar.gz
bsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.tar.bz2
bsfs-bbfcee4fffc553b5dd08f37a79dd6ccddbf340f8.zip
uri and some utils
Diffstat (limited to 'bsfs/utils/uri.py')
-rw-r--r--bsfs/utils/uri.py196
1 files changed, 196 insertions, 0 deletions
diff --git a/bsfs/utils/uri.py b/bsfs/utils/uri.py
new file mode 100644
index 0000000..a56423a
--- /dev/null
+++ b/bsfs/utils/uri.py
@@ -0,0 +1,196 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import re
+import typing
+
+# constants
+RX_URI = re.compile(r'''
+ ^
+ (?:(?P<scheme>[^:/?#]+):)? # scheme, ://-delimited
+ (?://(?P<authority>[^/?#]*))? # authority (user@host:port), [/#?]-delimited
+ (?P<path>[^?#]*) # path, [#?]-delimited
+ (?:\?(?P<query>[^#]*))? # query, [#]-delimited
+ (?:\#(?P<fragment>.*))? # fragment, remaining characters
+ $
+ ''', re.VERBOSE + re.IGNORECASE)
+
+RX_HOST = re.compile(r'''
+ ^
+ (?:(?P<userinfo>[^@]*)@)? # userinfo
+ (?P<host>
+ (?:\[[^\]]+\]) | # IPv6 address
+ (?:[^:]+) # IPv4 address or regname
+ )
+ (?::(?P<port>\d*))? # port
+ $
+ ''', re.VERBOSE + re.IGNORECASE)
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'URI',
+ )
+
+
+## code ##
+
+def _get_optional(
+ regexp: re.Pattern,
+ query: str,
+ grp: str
+ ) -> str:
+ """Return the regular expression *regexp*'s group *grp* of *query*
+ or raise a `ValueError` if the *query* doesn't match the expression.
+ """
+ parts = regexp.search(query)
+ if parts is not None:
+ if parts.group(grp) is not None:
+ return parts.group(grp)
+ raise ValueError(query)
+
+
+class URI(str):
+ """URI additions to built-in strings.
+
+ Provides properties to access the different components of an URI,
+ according to RFC 3986 (https://datatracker.ietf.org/doc/html/rfc3986).
+
+ Note that this class does not actually validate an URI but only offers
+ access to components of a *well-formed* URI. Use `urllib.parse` for
+ more advanced purposes.
+
+ """
+
+ def __new__(cls, value: str):
+ """Create a new URI instance.
+ Raises a `ValueError` if the (supposed) URI is malformatted.
+ """
+ if not cls.is_parseable(value):
+ raise ValueError(value)
+ return str.__new__(cls, value)
+
+ @staticmethod
+ def is_parseable(query: str) -> bool:
+ """Return True if the *query* can be decomposed into the URI components.
+
+ Note that a valid URI is always parseable, however, an invalid URI
+ might be parseable as well. The return value of this method makes
+ no claim about the validity of an URI!
+
+ """
+ # check uri
+ parts = RX_URI.match(query)
+ if parts is not None:
+ # check authority
+ authority = parts.group('authority')
+ if authority is None or RX_HOST.match(authority) is not None:
+ return True
+ # some check not passed
+ return False
+
+ @staticmethod
+ def compose(
+ path: str,
+ scheme: typing.Optional[str] = None,
+ authority: typing.Optional[str] = None,
+ user: typing.Optional[str] = None,
+ host: typing.Optional[str] = None,
+ port: typing.Optional[int] = None,
+ query: typing.Optional[str] = None,
+ fragment: typing.Optional[str] = None,
+ ):
+ """URI composition from components.
+
+ If the *host* argument is supplied, the authority is composed of *user*,
+ *host*, and *port* arguments, and the *authority* argument is ignored.
+ Note that if the *host* is an IPv6 address, it must be enclosed in brackets.
+ """
+ # strip whitespaces
+ path = path.strip()
+
+ # compose authority
+ if host is not None:
+ authority = ''
+ if user is not None:
+ authority += user + '@'
+ authority += host
+ if port is not None:
+ authority += ':' + str(port)
+
+ # ensure root on path
+ if path[0] != '/':
+ path = '/' + path
+
+ # compose uri
+ uri = ''
+ if scheme is not None:
+ uri += scheme + ':'
+ if authority is not None:
+ uri += '//' + authority
+ uri += path
+ if query is not None:
+ uri += '?' + query
+ if fragment is not None:
+ uri += '#' + fragment
+
+ # return as URI
+ return URI(uri)
+
+ @property
+ def scheme(self) -> str:
+ """Return the protocol/scheme part of the URI."""
+ return _get_optional(RX_URI, self, 'scheme')
+
+ @property
+ def authority(self) -> str:
+ """Return the authority part of the URI, including userinfo and port."""
+ return _get_optional(RX_URI, self, 'authority')
+
+ @property
+ def userinfo(self) -> str:
+ """Return the userinfo part of the URI."""
+ return _get_optional(RX_HOST, self.authority, 'userinfo')
+
+ @property
+ def host(self) -> str:
+ """Return the host part of the URI."""
+ return _get_optional(RX_HOST, self.authority, 'host')
+
+ @property
+ def port(self) -> int:
+ """Return the port part of the URI."""
+ return int(_get_optional(RX_HOST, self.authority, 'port'))
+
+ @property
+ def path(self) -> str:
+ """Return the path part of the URI."""
+ return _get_optional(RX_URI, self, 'path')
+
+ @property
+ def query(self) -> str:
+ """Return the query part of the URI."""
+ return _get_optional(RX_URI, self, 'query')
+
+ @property
+ def fragment(self) -> str:
+ """Return the fragment part of the URI."""
+ return _get_optional(RX_URI, self, 'fragment')
+
+ def get(self, component: str, default: typing.Optional[typing.Any] = None) -> typing.Optional[typing.Any]:
+ """Return the component or a default value."""
+ # check args
+ if component not in ('scheme', 'authority', 'userinfo', 'host',
+ 'port', 'path', 'query', 'fragment'):
+ raise ValueError(component)
+ try:
+ # return component's value
+ return getattr(self, component)
+ except ValueError:
+ # return the default value
+ return default
+
+## EOF ##