Module fontai.io.storage
This module provides an abstraction fo the storage layer in order to read or write bytestreams to different media. Currently, read/writes are supported for local storage and GCS, and reads are also supported for URLs
Expand source code
"""This module provides an abstraction fo the storage layer in order to read or write bytestreams to different media. Currently, read/writes are supported for local storage and GCS, and reads are also supported for URLs
"""
from __future__ import annotations
from pathlib import Path
import io
import zipfile
import sys
import re
import typing as t
import requests
from abc import ABC, abstractmethod
import logging
from apache_beam.io.gcp.gcsio import GcsIO
from numpy import ndarray
logger = logging.getLogger(__name__)
class BytestreamHandler(ABC):
"""This class provides an interface to underlying storage media
"""
@abstractmethod
def read(self, path: str) -> bytes:
"""Reads the byte contents from a file
Args:
path (str): path to file
Returns:
bytes object
"""
pass
@abstractmethod
def write(self, path: str, content: bytes) -> None:
"""Writes a bytestream to storage
Args:
path (str): Description
content (bytes): Description
"""
pass
@abstractmethod
def list_sources(self, path: str) -> t.Generator[str, None, None]:
"""List files in the folder that path points to
Args:
path (str): Target folder
Returns:
A generator containing string paths to all sources inside target folder
"""
pass
class LocalBytestreamHandler(BytestreamHandler):
"""Class to interface with local storage
"""
def read(self, path: str) -> bytes:
path = Path(path)
if path.is_file():
#return InMemoryBytestream(name=path.name, content=path.read_bytes())
return path.read_bytes()
else:
raise Exception(f"Path ({str(path)}) does not point to file")
#raise Exception(f"An error occurred while trying to read {str(path)}: {e}")
def write(self, path: str, content: bytes) -> None:
path = Path(path)
Path(path.parent).mkdir(parents=True, exist_ok=True)
path.write_bytes(content)
def list_sources(self, path: str):
path = Path(path)
contents = path.iterdir()
return (str(content) for content in contents if content.is_file())
class GcsBytestreamHandler(BytestreamHandler):
"""Class to interface with Google Cloud Storage.
"""
def read(self, url: str) -> bytes:
gcs_file = GcsIO().open(url,mode="r")
content = gcs_file.read()
gcs_file.close()
return content
def write(self, url: str, content: bytes) -> None:
gcs_file = GcsIO().open(url,mode="w")
gcs_file.write(content)
gcs_file.close()
def list_sources(self,url: str) -> t.List[str]:
#url = self.as_str(url)
raw_list = list(GcsIO().list_prefix(url).keys())
return (elem for elem in raw_list if Path(elem) != Path(url))
class UrlBytestreamHandler(BytestreamHandler):
"""Class to download files from URLs
"""
def read(self, url: str) ->bytes:
r = requests.get(url, stream=True)
bf = io.BytesIO()
with io.BytesIO() as bf:
for chunk in r.iter_content(chunk_size=1024*1024):
bf.write(chunk)
content = bf.getvalue()
return content
def write(self, url: str, content: bytes):
raise NotImplementedError("Bytestreams cannot be written to a url address")
def list_sources(self,url: str) -> t.List[str]:
raise NotImplementedError("Bytestreams cannot be listed from a url address")
class BytestreamHandlerFactory(object):
"""Factory method that determines the appropriate file handler class based on the string path
"""
allowed_prefixes = {
"gs://": GcsBytestreamHandler,
"https://": UrlBytestreamHandler,
"http://": UrlBytestreamHandler
}
@classmethod
def create(cls, path: str):
"""Creates an appropriate BystreamHandler instance for the storage medium referenced in path; defaults to local storage if no match is found for remote storage media.
Args:
path (str): Path to storage location
Returns:
BytestreamHandler: storage interface for the matched storage medium.
"""
for prefix in cls.allowed_prefixes:
if prefix in path:
return cls.allowed_prefixes[prefix]()
return LocalBytestreamHandler()
class BytestreamPath(object):
"""
Data reader/writer class that abstracts the underlying storage location. Supports local and GCS storage and downloadable URLs
"""
def __init__(self, source_str: str):
"""
Args:
source_str (str): target storage location
"""
self.string = str(source_str)
#extensions
self.is_gcs = "gs://" in self.string
self.is_http = "http://" in self.string
self.is_https = "https://" in self.string
self.handler = BytestreamHandlerFactory.create(self.string)
@property
def filename(self):
if self.is_url():
filename = self.string.split("/")[-1]
else:
filename = Path(self.string).name
return filename
def is_url(self):
"""Returns a boolean
"""
return self.is_gcs or self.is_http or self.is_https
def extend_url_path(self, suffix: str) -> BytestreamPath:
"""Appends a suffix to the instance's storage path
Args:
suffix (str): suffix. Usually a filename.
Returns:
BytestreamPath: BytestreamPath pointing to the suffixed storage path
Raises:
ValueError: If no remote storage is matched to the instance's path
"""
def extend(preffix, string, suffix):
suffixed = string.replace(preffix,"") + "/" + suffix
suffixed = re.sub("/+","/",suffixed)
return BytestreamPath(preffix + suffixed)
if self.is_gcs:
return extend("gs://", self.string, suffix)
elif self.is_http:
return extend("http://", self.string, suffix)
elif self.is_https:
return extend("https://", self.string, suffix)
else:
raise ValueError("url does not match any valid preffix.")
def read_bytes(self) -> bytes:
"""
Reads bystream from the path
Returns :
the file's bytestream
"""
return self.handler.read(self.string)
def write_bytes(self,content: bytes) -> None:
"""
Writes bytestream to path
"""
self.handler.write(self.string, content)
def list_sources(self) -> t.Generator[BytestreamPath, None, None]:
"""
List files (but not dirs) in the folder given by the instance's storage path
Returns a generator of BytestreamPath objects corresponding to each source file.
"""
for elem in self.handler.list_sources(self.string):
yield BytestreamPath(elem)
def __truediv__(self, path: str) -> BytestreamPath:
if not isinstance(path, str):
raise TypeError("path must be a string")
elif self.is_url():
return self.extend_url_path(path)
else:
return BytestreamPath(str(Path(self.string) / path))
def __str__(self):
return self.string
Classes
class BytestreamHandler
-
This class provides an interface to underlying storage media
Expand source code
class BytestreamHandler(ABC): """This class provides an interface to underlying storage media """ @abstractmethod def read(self, path: str) -> bytes: """Reads the byte contents from a file Args: path (str): path to file Returns: bytes object """ pass @abstractmethod def write(self, path: str, content: bytes) -> None: """Writes a bytestream to storage Args: path (str): Description content (bytes): Description """ pass @abstractmethod def list_sources(self, path: str) -> t.Generator[str, None, None]: """List files in the folder that path points to Args: path (str): Target folder Returns: A generator containing string paths to all sources inside target folder """ pass
Ancestors
- abc.ABC
Subclasses
Methods
def list_sources(self, path: str) ‑> Generator[str, None, None]
-
List files in the folder that path points to
Args
path
:str
- Target folder
Returns
A generator containing string paths to all sources inside target folder
Expand source code
@abstractmethod def list_sources(self, path: str) -> t.Generator[str, None, None]: """List files in the folder that path points to Args: path (str): Target folder Returns: A generator containing string paths to all sources inside target folder """ pass
def read(self, path: str) ‑> bytes
-
Reads the byte contents from a file
Args
path
:str
- path to file
Returns
bytes object
Expand source code
@abstractmethod def read(self, path: str) -> bytes: """Reads the byte contents from a file Args: path (str): path to file Returns: bytes object """ pass
def write(self, path: str, content: bytes) ‑> None
-
Writes a bytestream to storage
Args
path
:str
- Description
content
:bytes
- Description
Expand source code
@abstractmethod def write(self, path: str, content: bytes) -> None: """Writes a bytestream to storage Args: path (str): Description content (bytes): Description """ pass
class BytestreamHandlerFactory
-
Factory method that determines the appropriate file handler class based on the string path
Expand source code
class BytestreamHandlerFactory(object): """Factory method that determines the appropriate file handler class based on the string path """ allowed_prefixes = { "gs://": GcsBytestreamHandler, "https://": UrlBytestreamHandler, "http://": UrlBytestreamHandler } @classmethod def create(cls, path: str): """Creates an appropriate BystreamHandler instance for the storage medium referenced in path; defaults to local storage if no match is found for remote storage media. Args: path (str): Path to storage location Returns: BytestreamHandler: storage interface for the matched storage medium. """ for prefix in cls.allowed_prefixes: if prefix in path: return cls.allowed_prefixes[prefix]() return LocalBytestreamHandler()
Class variables
var allowed_prefixes
Static methods
def create(path: str)
-
Creates an appropriate BystreamHandler instance for the storage medium referenced in path; defaults to local storage if no match is found for remote storage media.
Args
path
:str
- Path to storage location
Returns
BytestreamHandler
- storage interface for the matched storage medium.
Expand source code
@classmethod def create(cls, path: str): """Creates an appropriate BystreamHandler instance for the storage medium referenced in path; defaults to local storage if no match is found for remote storage media. Args: path (str): Path to storage location Returns: BytestreamHandler: storage interface for the matched storage medium. """ for prefix in cls.allowed_prefixes: if prefix in path: return cls.allowed_prefixes[prefix]() return LocalBytestreamHandler()
class BytestreamPath (source_str: str)
-
Data reader/writer class that abstracts the underlying storage location. Supports local and GCS storage and downloadable URLs
Args
source_str
:str
- target storage location
Expand source code
class BytestreamPath(object): """ Data reader/writer class that abstracts the underlying storage location. Supports local and GCS storage and downloadable URLs """ def __init__(self, source_str: str): """ Args: source_str (str): target storage location """ self.string = str(source_str) #extensions self.is_gcs = "gs://" in self.string self.is_http = "http://" in self.string self.is_https = "https://" in self.string self.handler = BytestreamHandlerFactory.create(self.string) @property def filename(self): if self.is_url(): filename = self.string.split("/")[-1] else: filename = Path(self.string).name return filename def is_url(self): """Returns a boolean """ return self.is_gcs or self.is_http or self.is_https def extend_url_path(self, suffix: str) -> BytestreamPath: """Appends a suffix to the instance's storage path Args: suffix (str): suffix. Usually a filename. Returns: BytestreamPath: BytestreamPath pointing to the suffixed storage path Raises: ValueError: If no remote storage is matched to the instance's path """ def extend(preffix, string, suffix): suffixed = string.replace(preffix,"") + "/" + suffix suffixed = re.sub("/+","/",suffixed) return BytestreamPath(preffix + suffixed) if self.is_gcs: return extend("gs://", self.string, suffix) elif self.is_http: return extend("http://", self.string, suffix) elif self.is_https: return extend("https://", self.string, suffix) else: raise ValueError("url does not match any valid preffix.") def read_bytes(self) -> bytes: """ Reads bystream from the path Returns : the file's bytestream """ return self.handler.read(self.string) def write_bytes(self,content: bytes) -> None: """ Writes bytestream to path """ self.handler.write(self.string, content) def list_sources(self) -> t.Generator[BytestreamPath, None, None]: """ List files (but not dirs) in the folder given by the instance's storage path Returns a generator of BytestreamPath objects corresponding to each source file. """ for elem in self.handler.list_sources(self.string): yield BytestreamPath(elem) def __truediv__(self, path: str) -> BytestreamPath: if not isinstance(path, str): raise TypeError("path must be a string") elif self.is_url(): return self.extend_url_path(path) else: return BytestreamPath(str(Path(self.string) / path)) def __str__(self): return self.string
Instance variables
var filename
-
Expand source code
@property def filename(self): if self.is_url(): filename = self.string.split("/")[-1] else: filename = Path(self.string).name return filename
Methods
def extend_url_path(self, suffix: str) ‑> BytestreamPath
-
Appends a suffix to the instance's storage path
Args
suffix
:str
- suffix. Usually a filename.
Returns
BytestreamPath
- BytestreamPath pointing to the suffixed storage path
Raises
ValueError
- If no remote storage is matched to the instance's path
Expand source code
def extend_url_path(self, suffix: str) -> BytestreamPath: """Appends a suffix to the instance's storage path Args: suffix (str): suffix. Usually a filename. Returns: BytestreamPath: BytestreamPath pointing to the suffixed storage path Raises: ValueError: If no remote storage is matched to the instance's path """ def extend(preffix, string, suffix): suffixed = string.replace(preffix,"") + "/" + suffix suffixed = re.sub("/+","/",suffixed) return BytestreamPath(preffix + suffixed) if self.is_gcs: return extend("gs://", self.string, suffix) elif self.is_http: return extend("http://", self.string, suffix) elif self.is_https: return extend("https://", self.string, suffix) else: raise ValueError("url does not match any valid preffix.")
def is_url(self)
-
Returns a boolean
Expand source code
def is_url(self): """Returns a boolean """ return self.is_gcs or self.is_http or self.is_https
def list_sources(self) ‑> Generator[BytestreamPath, None, None]
-
List files (but not dirs) in the folder given by the instance's storage path
Returns a generator of BytestreamPath objects corresponding to each source file.
Expand source code
def list_sources(self) -> t.Generator[BytestreamPath, None, None]: """ List files (but not dirs) in the folder given by the instance's storage path Returns a generator of BytestreamPath objects corresponding to each source file. """ for elem in self.handler.list_sources(self.string): yield BytestreamPath(elem)
def read_bytes(self) ‑> bytes
-
Reads bystream from the path
Returns : the file's bytestream
Expand source code
def read_bytes(self) -> bytes: """ Reads bystream from the path Returns : the file's bytestream """ return self.handler.read(self.string)
def write_bytes(self, content: bytes) ‑> None
-
Writes bytestream to path
Expand source code
def write_bytes(self,content: bytes) -> None: """ Writes bytestream to path """ self.handler.write(self.string, content)
class GcsBytestreamHandler
-
Class to interface with Google Cloud Storage.
Expand source code
class GcsBytestreamHandler(BytestreamHandler): """Class to interface with Google Cloud Storage. """ def read(self, url: str) -> bytes: gcs_file = GcsIO().open(url,mode="r") content = gcs_file.read() gcs_file.close() return content def write(self, url: str, content: bytes) -> None: gcs_file = GcsIO().open(url,mode="w") gcs_file.write(content) gcs_file.close() def list_sources(self,url: str) -> t.List[str]: #url = self.as_str(url) raw_list = list(GcsIO().list_prefix(url).keys()) return (elem for elem in raw_list if Path(elem) != Path(url))
Ancestors
- BytestreamHandler
- abc.ABC
Inherited members
class LocalBytestreamHandler
-
Class to interface with local storage
Expand source code
class LocalBytestreamHandler(BytestreamHandler): """Class to interface with local storage """ def read(self, path: str) -> bytes: path = Path(path) if path.is_file(): #return InMemoryBytestream(name=path.name, content=path.read_bytes()) return path.read_bytes() else: raise Exception(f"Path ({str(path)}) does not point to file") #raise Exception(f"An error occurred while trying to read {str(path)}: {e}") def write(self, path: str, content: bytes) -> None: path = Path(path) Path(path.parent).mkdir(parents=True, exist_ok=True) path.write_bytes(content) def list_sources(self, path: str): path = Path(path) contents = path.iterdir() return (str(content) for content in contents if content.is_file())
Ancestors
- BytestreamHandler
- abc.ABC
Inherited members
class UrlBytestreamHandler
-
Class to download files from URLs
Expand source code
class UrlBytestreamHandler(BytestreamHandler): """Class to download files from URLs """ def read(self, url: str) ->bytes: r = requests.get(url, stream=True) bf = io.BytesIO() with io.BytesIO() as bf: for chunk in r.iter_content(chunk_size=1024*1024): bf.write(chunk) content = bf.getvalue() return content def write(self, url: str, content: bytes): raise NotImplementedError("Bytestreams cannot be written to a url address") def list_sources(self,url: str) -> t.List[str]: raise NotImplementedError("Bytestreams cannot be listed from a url address")
Ancestors
- BytestreamHandler
- abc.ABC
Inherited members