aboutsummaryrefslogtreecommitdiffstats
path: root/bsfs/utils
diff options
context:
space:
mode:
Diffstat (limited to 'bsfs/utils')
-rw-r--r--bsfs/utils/__init__.py25
-rw-r--r--bsfs/utils/commons.py23
-rw-r--r--bsfs/utils/errors.py41
-rw-r--r--bsfs/utils/uri.py246
-rw-r--r--bsfs/utils/uuid.py108
5 files changed, 443 insertions, 0 deletions
diff --git a/bsfs/utils/__init__.py b/bsfs/utils/__init__.py
new file mode 100644
index 0000000..94680ee
--- /dev/null
+++ b/bsfs/utils/__init__.py
@@ -0,0 +1,25 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# inner-module imports
+from . import errors
+from .commons import typename
+from .uri import URI
+from .uuid import UUID, UCID
+
+# exports
+__all__ : typing.Sequence[str] = (
+ 'UCID',
+ 'URI',
+ 'UUID',
+ 'errors',
+ 'typename',
+ )
+
+## EOF ##
diff --git a/bsfs/utils/commons.py b/bsfs/utils/commons.py
new file mode 100644
index 0000000..bad2fe0
--- /dev/null
+++ b/bsfs/utils/commons.py
@@ -0,0 +1,23 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'typename',
+ )
+
+
+## code ##
+
+def typename(obj) -> str:
+ """Return the type name of *obj*."""
+ return type(obj).__name__
+
+
+## EOF ##
diff --git a/bsfs/utils/errors.py b/bsfs/utils/errors.py
new file mode 100644
index 0000000..c5e8e16
--- /dev/null
+++ b/bsfs/utils/errors.py
@@ -0,0 +1,41 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import typing
+
+# exports
+__all__: typing.Sequence[str] = (
+ )
+
+
+## code ##
+
+class _BSFSError(Exception):
+ """Generic bsfs error."""
+
+class SchemaError(_BSFSError):
+ """Generic schema errios."""
+
+class ConsistencyError(SchemaError):
+ """A requested operation is inconsistent with the schema."""
+
+class InstanceError(SchemaError):
+ """An instance affected by some operation is inconsistent with the schema."""
+
+class PermissionDeniedError(_BSFSError):
+ """An operation was aborted due to access control restrictions."""
+
+class ProgrammingError(_BSFSError):
+ """An assertion-like error that indicates a code-base issue."""
+
+class UnreachableError(ProgrammingError):
+ """Bravo, you've reached a point in code that should logically not be reachable."""
+
+class ConfigError(_BSFSError):
+ """User config issue."""
+
+## EOF ##
diff --git a/bsfs/utils/uri.py b/bsfs/utils/uri.py
new file mode 100644
index 0000000..84854a4
--- /dev/null
+++ b/bsfs/utils/uri.py
@@ -0,0 +1,246 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+import re
+import typing
+
+# constants
+RX_URI = re.compile(r'''
+ ^
+ (?:(?P<scheme>[^:/?#]+):)? # scheme, ://-delimited
+ (?://(?P<authority>[^/?#]*))? # authority (user@host:port), [/#?]-delimited
+ (?P<path>[^?#]*) # path, [#?]-delimited
+ (?:\?(?P<query>[^#]*))? # query, [#]-delimited
+ (?:\#(?P<fragment>.*))? # fragment, remaining characters
+ $
+ ''', re.VERBOSE + re.IGNORECASE)
+
+RX_HOST = re.compile(r'''
+ ^
+ (?:(?P<userinfo>[^@]*)@)? # userinfo
+ (?P<host>
+ (?:\[[^\]]+\]) | # IPv6 address
+ (?:[^:]+) # IPv4 address or regname
+ )
+ (?::(?P<port>\d*))? # port
+ $
+ ''', re.VERBOSE + re.IGNORECASE)
+
+# exports
+__all__: typing.Sequence[str] = (
+ 'URI',
+ )
+
+
+## code ##
+
+def _get_optional(
+ regexp: re.Pattern,
+ query: str,
+ grp: str
+ ) -> str:
+ """Return the regular expression *regexp*'s group *grp* of *query*
+ or raise a `ValueError` if the *query* doesn't match the expression.
+ """
+ parts = regexp.search(query)
+ if parts is not None:
+ if parts.group(grp) is not None:
+ return parts.group(grp)
+ raise ValueError(query)
+
+
+class URI(str):
+ """URI additions to built-in strings.
+
+ Provides properties to access the different components of an URI,
+ according to RFC 3986 (https://datatracker.ietf.org/doc/html/rfc3986).
+
+ Note that this class does not actually validate an URI but only offers
+ access to components of a *well-formed* URI. Use `urllib.parse` for
+ more advanced purposes.
+
+ """
+
+ def __new__(cls, value: str):
+ """Create a new URI instance.
+ Raises a `ValueError` if the (supposed) URI is malformatted.
+ """
+ if not cls.is_parseable(value):
+ raise ValueError(value)
+ return str.__new__(cls, value)
+
+ @staticmethod
+ def is_parseable(query: str) -> bool:
+ """Return True if the *query* can be decomposed into the URI components.
+
+ Note that a valid URI is always parseable, however, an invalid URI
+ might be parseable as well. The return value of this method makes
+ no claim about the validity of an URI!
+
+ """
+ # check uri
+ parts = RX_URI.match(query)
+ if parts is not None:
+ # check authority
+ authority = parts.group('authority')
+ if authority is None or RX_HOST.match(authority) is not None:
+ return True
+ # some check not passed
+ return False
+
+ @staticmethod
+ def compose(
+ path: str,
+ scheme: typing.Optional[str] = None,
+ authority: typing.Optional[str] = None,
+ user: typing.Optional[str] = None,
+ host: typing.Optional[str] = None,
+ port: typing.Optional[int] = None,
+ query: typing.Optional[str] = None,
+ fragment: typing.Optional[str] = None,
+ ):
+ """URI composition from components.
+
+ If the *host* argument is supplied, the authority is composed of *user*,
+ *host*, and *port* arguments, and the *authority* argument is ignored.
+ Note that if the *host* is an IPv6 address, it must be enclosed in brackets.
+ """
+ # strip whitespaces
+ path = path.strip()
+
+ # compose authority
+ if host is not None:
+ authority = ''
+ if user is not None:
+ authority += user + '@'
+ authority += host
+ if port is not None:
+ authority += ':' + str(port)
+
+ # ensure root on path
+ if path[0] != '/':
+ path = '/' + path
+
+ # compose uri
+ uri = ''
+ if scheme is not None:
+ uri += scheme + ':'
+ if authority is not None:
+ uri += '//' + authority
+ uri += path
+ if query is not None:
+ uri += '?' + query
+ if fragment is not None:
+ uri += '#' + fragment
+
+ # return as URI
+ return URI(uri)
+
+ @property
+ def scheme(self) -> str:
+ """Return the protocol/scheme part of the URI."""
+ return _get_optional(RX_URI, self, 'scheme')
+
+ @property
+ def authority(self) -> str:
+ """Return the authority part of the URI, including userinfo and port."""
+ return _get_optional(RX_URI, self, 'authority')
+
+ @property
+ def userinfo(self) -> str:
+ """Return the userinfo part of the URI."""
+ return _get_optional(RX_HOST, self.authority, 'userinfo')
+
+ @property
+ def host(self) -> str:
+ """Return the host part of the URI."""
+ return _get_optional(RX_HOST, self.authority, 'host')
+
+ @property
+ def port(self) -> int:
+ """Return the port part of the URI."""
+ return int(_get_optional(RX_HOST, self.authority, 'port'))
+
+ @property
+ def path(self) -> str:
+ """Return the path part of the URI."""
+ return _get_optional(RX_URI, self, 'path')
+
+ @property
+ def query(self) -> str:
+ """Return the query part of the URI."""
+ return _get_optional(RX_URI, self, 'query')
+
+ @property
+ def fragment(self) -> str:
+ """Return the fragment part of the URI."""
+ return _get_optional(RX_URI, self, 'fragment')
+
+ def get(self, component: str, default: typing.Optional[typing.Any] = None) -> typing.Optional[typing.Any]:
+ """Return the component or a default value."""
+ # check args
+ if component not in ('scheme', 'authority', 'userinfo', 'host',
+ 'port', 'path', 'query', 'fragment'):
+ raise ValueError(component)
+ try:
+ # return component's value
+ return getattr(self, component)
+ except ValueError:
+ # return the default value
+ return default
+
+
+ # overload composition methods
+
+ def __add__(self, *args) -> 'URI':
+ return URI(super().__add__(*args))
+
+ def join(self, *args) -> 'URI':
+ return URI(super().join(*args))
+
+ def __mul__(self, *args) -> 'URI':
+ return URI(super().__mul__(*args))
+
+ def __rmul__(self, *args) -> 'URI':
+ return URI(super().__rmul__(*args))
+
+
+ # overload casefold methods
+
+ def lower(self, *args) -> 'URI':
+ return URI(super().lower(*args))
+
+ def upper(self, *args) -> 'URI':
+ return URI(super().upper(*args))
+
+
+ # overload stripping methods
+
+ def strip(self, *args) -> 'URI':
+ return URI(super().strip(*args))
+
+ def lstrip(self, *args) -> 'URI':
+ return URI(super().lstrip(*args))
+
+ def rstrip(self, *args) -> 'URI':
+ return URI(super().rstrip(*args))
+
+
+ # overload formatting methods
+
+ def format(self, *args, **kwargs) -> 'URI':
+ return URI(super().format(*args, **kwargs))
+
+ def __mod__(self, *args) -> 'URI':
+ return URI(super().__mod__(*args))
+
+ def replace(self, *args) -> 'URI':
+ return URI(super().replace(*args))
+
+
+
+## EOF ##
diff --git a/bsfs/utils/uuid.py b/bsfs/utils/uuid.py
new file mode 100644
index 0000000..6366b18
--- /dev/null
+++ b/bsfs/utils/uuid.py
@@ -0,0 +1,108 @@
+"""
+
+Part of the BlackStar filesystem (bsfs) module.
+A copy of the license is provided with the project.
+Author: Matthias Baumgartner, 2022
+"""
+# imports
+from collections import abc
+import hashlib
+import os
+import platform
+import random
+import threading
+import time
+import typing
+import uuid
+
+# constants
+HASH = hashlib.sha256
+
+# exports
+__all__: typing.Sequence[str] = [
+ 'UCID',
+ 'UUID',
+ ]
+
+
+## code ##
+
+class UUID(abc.Iterator, abc.Callable): # type: ignore [misc] # abc.Callable "is an invalid base class"
+ """Generate 256-bit universally unique IDs.
+
+ This is a 'best-effort' kind of implementation that tries to ensure global
+ uniqueness, even tough actual uniqueness cannot be guaranteed.
+ The approach is different from python's uuid module (which implements
+ RFC 4122) in that it generates longer UUIDs and in that it cannot be
+ reconstructed whether two UUIDs were generated on the same system.
+
+ The ID is a cryptographic hash over several components:
+ * host
+ * system
+ * process
+ * thread
+ * random
+ * time
+ * cpu cycles
+ * content (if available)
+
+ """
+
+ # host identifier
+ host: str
+
+ # system identifier
+ system: str
+
+ # process identifier
+ process: str
+
+ # thread identifier
+ thread: str
+
+ def __init__(self, seed: typing.Optional[int] = None):
+ # initialize static components
+ self.host = str(uuid.getnode())
+ self.system = '-'.join(platform.uname())
+ self.process = str(os.getpid())
+ self.thread = str(threading.get_ident())
+ # initialize random component
+ random.seed(seed)
+
+ def __call__(self, content: typing.Optional[str] = None) -> str: # pylint: disable=arguments-differ
+ """Return a globally unique ID."""
+ # content component
+ content = str(content) if content is not None else ''
+ # time component
+ now = str(time.time())
+ # clock component
+ clk = str(time.perf_counter())
+ # random component
+ rnd = str(random.random())
+ # build the token from all available components
+ token = self.host + self.system + self.process + self.thread + rnd + now + clk + content
+ # return the token's hash
+ return HASH(token.encode('ascii', 'ignore')).hexdigest()
+
+ def __iter__(self) -> typing.Iterator[str]:
+ """Iterate indefinitely over universally unique IDs."""
+ return self
+
+ def __next__(self) -> str:
+ """Generate universally unique IDs."""
+ return self()
+
+
+class UCID():
+ """Generate 256-bit content IDs.
+
+ Effectively computes a cryptographic hash over the content.
+
+ """
+ @staticmethod
+ def from_path(path: str) -> str:
+ """Read the content from a file."""
+ with open(path, 'rb') as ifile:
+ return HASH(ifile.read()).hexdigest()
+
+## EOF ##