Module fontai.runners.stages
This module contains the definitions of high-level ML lifecycle stage classes; at the moment this includes ingestion, preprocessing and training/scoring.
Expand source code
"""
This module contains the definitions of high-level ML lifecycle stage classes; at the moment this includes ingestion, preprocessing and training/scoring.
"""
import logging
from collections import OrderedDict
from pathlib import Path
import typing
import traceback
import os
import string
import sys
import signal
import typing as t
from PIL import ImageFont
from numpy import ndarray
from strictyaml import as_document
from tensorflow import Tensor
import tensorflow as tf
from tensorflow.data import TFRecordDataset
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from fontai.config.preprocessing import Config as ProcessingConfig, ConfigHandler as ProcessingConfigHandler
from fontai.config.ingestion import Config as IngestionConfig, ConfigHandler as IngestionConfigHandler
from fontai.config.prediction import ModelFactory, TrainingConfig, Config as ScoringConfig, ConfigHandler as ScoringConfigHandler
from fontai.config.deployment import Config as DeploymentConfig, ConfigHandler as DeploymentConfigHandler, Grid
from fontai.prediction.input_processing import RecordPreprocessor
from fontai.preprocessing.mappings import PipelineFactory, BeamCompatibleWrapper, Writer, ZipToFontFiles
#from fontai.runners.base import MLPipelineTransform
from fontai.io.storage import BytestreamPath
from fontai.io.readers import ScrapperReader, ZipReader, TfrReader
from fontai.io.writers import TfrWriter, ZipWriter
from fontai.io.records import TfrWritable
from fontai.io.formats import InMemoryFile, InMemoryZipfile
from fontai.runners.base import ConfigurableTransform, IdentityTransform, FittableTransform
from fontai.deployment.plotters import AlphabetPlotter
import dash
import dash_core_components as dcc
import dash_html_components as html
from dash.dependencies import Input, Output
from numpy import array as np_array, clip as np_clip
import mlflow
__all__ = [
"Ingestion",
"Preprocessing",
"Scoring",
"Deployment"
]
logger = logging.getLogger(__name__)
class Ingestion(ConfigurableTransform, IdentityTransform):
"""Retrieves zipped font files. It takes a list of scrappers defined in `fontai.io.scrappers` from which it downloads files to storage.
"""
def __init__(self, config: IngestionConfig = None):
self.config = config
@classmethod
def get_config_parser(cls):
return IngestionConfigHandler()
@classmethod
def from_config_object(cls, config: IngestionConfig, **kwargs):
return cls(config)
@classmethod
def run_from_config_object(cls, config: IngestionConfig, **kwargs):
ingestor = cls.from_config_object(config)
font_extractor = ZipToFontFiles()
writer = ZipWriter(ingestor.config.output_path, config.max_output_file_size)
for file in ScrapperReader(config.scrappers).get_files():
for font_file in font_extractor.map(file):
writer.write(font_file)
@classmethod
def get_stage_name(cls):
return "ingestion"
def transform_batch(self, input_path: str, output_path: str):
raise NotImplementedError("This method is not implemented for ingestion.")
class Preprocessing(ConfigurableTransform):
"""
Processes zipped font files and outputs Tensorflow records consisting of labeled images for ML consumption.
"""
def __init__(
self,
output_record_class: type,
charset: str,
font_extraction_size: int,
canvas_size: int,
canvas_padding: int,
output_array_size: int,
beam_cmd_line_args: t.List[str] = []):
"""
Args:
output_record_class (type): Output schema class, inheriting from TfrWritable
charset (str): String with characters to be extracted
font_extraction_size (int): Font size to use when extracting font images
canvas_size (int): Image canvas size in which fonts will be extracted
canvas_padding (int): Padding in the image extraction canvas
output_array_size (int): Final character image size
beam_cmd_line_args (t.List[str], optional): List of Apache Beam command line arguments for distributed processing
"""
self.beam_cmd_line_args = beam_cmd_line_args
self.pipeline = PipelineFactory.create(
output_record_class = output_record_class,
charset = charset,
font_extraction_size = font_extraction_size,
canvas_size = canvas_size,
canvas_padding = canvas_padding,
output_array_size = output_array_size)
@classmethod
def from_config_object(cls, config: ProcessingConfig, **kwargs):
return cls(
output_record_class = config.output_record_class,
output_array_size = config.output_array_size,
beam_cmd_line_args = config.beam_cmd_line_args,
**config.font_to_array_config.dict())
return cls()
def transform(self, data):
return self.pipeline.map(data)
@classmethod
def get_config_parser(cls):
return ProcessingConfigHandler()
@classmethod
def get_stage_name(cls):
return "preprocessing"
@classmethod
def run_from_config_object(cls, config: ProcessingConfig, **kwargs):
# set provisional value for CUDA env variable to prevent out of memory errors from the GPU; this occurs because the preprocessing code depends on (CPU-bound) Tensorflow functionality, which attempts to seize memory from the GPU automatically, but this throws an error when Beam uses multiple threads.
visible_devices = os.getenv("CUDA_VISIBLE_DEVICES")
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
output_path, input_path = config.output_path, config.input_path
processor = cls.from_config_object(config)
# if output is locally persisted, create parent folders
if not BytestreamPath(output_path).is_url():
Path(str(output_path)).mkdir(parents=True, exist_ok=True)
pipeline_options = PipelineOptions(processor.beam_cmd_line_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
with beam.Pipeline(options=pipeline_options) as p:
# execute pipeline
(p
| 'create source list' >> beam.Create(BytestreamPath(input_path).list_sources()) #create list of sources as strings
| 'read zipped files' >> beam.Map(lambda filepath: InMemoryZipfile.from_bytestream_path(filepath)) #line needed to load files lazily and not overwhelm memory
| 'get labeled exampes from zip files' >> beam.ParDo(
BeamCompatibleWrapper(
mapper = PipelineFactory.create(
output_record_class = config.output_record_class,
output_array_size = config.output_array_size,
**config.font_to_array_config.dict())
)
)
| "write to storage" >> beam.ParDo(Writer(TfrWriter(output_path, config.max_output_file_size))))
# unset provisional value for env variable
if visible_devices is None:
del os.environ["CUDA_VISIBLE_DEVICES"]
else:
os.environ["CUDA_VISIBLE_DEVICES"] = visible_devices
class Scoring(FittableTransform):
"""
Trains a prediction model or uses one to score input data.
Attributes:
model (keras.Model): Scoring model
CHARSET_OPTIONS (t.Dict): Dictionary from allowed charsets names to charsets
training_config (TrainingConfig): training configuration wrapper
charset_tensor (Tensor): Tensor with an entry per character in the current charset
"""
CHARSET_OPTIONS = {
"uppercase": string.ascii_letters[26::],
"lowercase": string.ascii_letters[0:26],
"digits": string.digits,
"all": string.ascii_letters + string.digits
}
def __init__(
self,
model: tf.keras.Model,
training_config: TrainingConfig = None,
charset: str = "lowercase"):
"""
Args:
model (tf.keras.Model): Scoring model
training_config (TrainingConfig, optional): Training configuration wrapper
charset (str): charset to use for training and batch scoring. It must be one of 'lowercase', 'uppercase' or 'digits', or otherwise a string with all characters under consideration
"""
self.model = model
self.training_config = training_config
self.charset = charset
try:
self.charset = self.CHARSET_OPTIONS[charset]
except KeyError as e:
logger.warning(f"Charset string is not one from {list(self.CHARSET_OPTIONS.keys())}; creating custom charset from provided string instead.")
self.charset = "".join(list(set(charset)))
self.num_classes = len(self.charset)
self.charset_tensor = np_array([str.encode(x) for x in list(self.charset)])
def fit(self, data: TFRecordDataset):
"""Fits the scoring model with the passed data
Args:
data (TFRecordDataset): training data
Returns:
Scoring: Scoring with trained model
Raises:
ValueError: If training_config is None (not provided).
"""
if self.training_config is None:
raise ValueError("Training configuration not provided at instantiation time.")
self.model.compile(
loss = self.training_config.loss,
optimizer = self.training_config.optimizer,
metrics = self.training_config.metrics,
run_eagerly=False)
self.model.fit(
data,
#training_data,
steps_per_epoch = self.training_config.steps_per_epoch,
epochs = self.training_config.epochs,
callbacks = self.training_config.callbacks)
return self
def _to_shape(self, x: t.Union[ndarray, Tensor]):
"""Reshape single example to be transformed in-memory by the `transform` method.
Args:
x (t.Union[ndarray, Tensor]): Single input
Returns:
t.Union[ndarray, Tensor]: Reshaped input
"""
if len(x.shape) == 2:
x = x.reshape((1,) + x.shape + (1,))
elif len(x.shape) == 3:
x = x.reshape((1,) + x.shape)
return x
def transform(self, input_data: t.Union[ndarray, Tensor, TfrWritable]) -> t.Union[Tensor, ndarray, TfrWritable]:
"""
Process a single instance
Args:
input_data (t.Union[ndarray, Tensor, TfrWritable]): Input instance
Returns:
t.Union[Tensor, ndarray, TfrWritable]: Scored example in the corresponding format, depending on the input type.
Raises:
TypeError: If input type is not allowed.
"""
if isinstance(input_data, (ndarray, Tensor)):
return self.model.predict(self._to_shape(input_data))
elif isinstance(input_data, TfrWritable):
return input_data.add_score(
score = self.model.predict(self._to_shape(input_data.features)),
charset_tensor=self.charset_tensor)
else:
raise TypeError("Input type is not one of ndarray, Tensor or TfrWritable")
@classmethod
def get_config_parser(cls):
return ScoringConfigHandler()
@classmethod
def get_stage_name(cls):
return "scoring"
@classmethod
def from_config_object(cls, config: ScoringConfig, load_from_model_path = False):
"""Initialises a Scoring instance from a configuration object
Args:
config (ScoringConfig): COnfiguration object
load_from_model_path (bool, optional): If True, the model is loaded from the model_path argument in the configuration object.
Returns:
Scoring: Instantiated Scoring object.
"""
if load_from_model_path:
model_class_name = config.model.__class__.__name__
classname_tuple = ("custom_class", model_class_name if model_class_name != "Sequential" else None)
# dict -> YAML -> Model
input_dict = {"path": config.model_path}
if model_class_name != "Sequential":
input_dict["custom_class"] = model_class_name
model_yaml = as_document(input_dict)
message = f"load_from_model_path flag set to True; loading model of class {model_class_name} from {config.model_path}"
#print(message)
logger.info(message)
model = ModelFactory().from_yaml(model_yaml)
else:
model = config.model
predictor = Scoring(model = model, training_config = config.training_config, charset = config.charset)
return predictor
@classmethod
def run_from_config_object(cls, config: ScoringConfig, load_from_model_path = False):
predictor = cls.from_config_object(config, load_from_model_path)
data_fetcher = RecordPreprocessor(
input_record_class = config.input_record_class,
charset_tensor = predictor.charset_tensor,
custom_filters = predictor.training_config.custom_filters,
custom_mappers = predictor.training_config.custom_mappers)
writer = TfrWriter(config.output_path)
files = TfrReader(config.input_path).get_files()
data = data_fetcher.fetch(files, training_format=False, batch_size = predictor.training_config.batch_size)
counter = 0
for features, labels, fontnames in data:
try:
scores = predictor.transform(features)
scored_records = config.input_record_class.from_scored_batch(
features = features.numpy(),
labels = labels.numpy(),
fontnames = fontnames.numpy(),
scores = scores,
charset_tensor = predictor.charset_tensor)
for record in scored_records:
writer.write(record)
counter += 1
except Exception as e:
logger.exception(f"Exception scoring batch with features: {features}. Full trace: {traceback.format_exc()}")
writer.close()
logger.info(f"Processed {counter} examples.")
@classmethod
def fit_from_config_object(cls, config: ScoringConfig, load_from_model_path = False, run_id: str = None):
# implement graceful interruptions
def save_on_sigint(sig, frame):
predictor.model.save(config.model_path)
logger.info(f"Training stopped by SIGINT: saving current model to {config.model_path}")
sys.exit(0)
signal.signal(signal.SIGINT, save_on_sigint)
# start MLFlow run
with mlflow.start_run(run_id=run_id, nested=False) as run:
logger.info(f"MLFlow run id: {run.info.run_id}")
cfg_log_path = "run-configs"
# check whether there are previous run configs in this MLFLow run
client = mlflow.tracking.MlflowClient()
n_previous_runs = len(client.list_artifacts(run.info.run_id, cfg_log_path))
current_run = f"{n_previous_runs + 1}.yaml"
# log config file
with open(current_run,"w") as f:
f.write(config.yaml.as_yaml())
mlflow.log_artifact(current_run,cfg_log_path)
os.remove(current_run)
#start keras autologging
mlflow.tensorflow.autolog()
# fetch data and fit model
predictor = cls.from_config_object(config, load_from_model_path)
data = TfrReader(config.input_path).get_files()
data_fetcher = RecordPreprocessor(
input_record_class = config.input_record_class,
charset_tensor = predictor.charset_tensor,
custom_filters = predictor.training_config.custom_filters,
custom_mappers = predictor.training_config.custom_mappers)
predictor.fit(data_fetcher.fetch(data, training_format=True, batch_size=predictor.training_config.batch_size))
logger.info(f"Saving trained model to {config.model_path}")
predictor.model.save(config.model_path)
class Deployment(ConfigurableTransform):
"""Deploys a small Dash app in which a fitted typeface generative model's style space can be explored
"""
def __init__(
self,
model: tf.keras.Model,
sampler: t.Callable,
grid: Grid,
plotter: AlphabetPlotter,
charset_size: int):
"""
Args:
model (tf.keras.Model): Generative model
sampler (t.Callable): Style vectors' sampling function
grid (Grid): Grid describing the style space's grid to explore
plotter (AlphabetPlotter): Plotter object
charset_size (int): Number of characters in font
"""
self.model = model
self.sampler = sampler
self.grid = grid
self.charset_size = charset_size
self.plotter = plotter
self.external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css']
def launch(self,**kwargs):
"""Launches a Dash app
Args:
**kwargs: Additional arguments passed to Dash app
"""
app = dash.Dash("fontai-deployment", external_stylesheets=self.external_stylesheets)
step = (self.grid.largest - self.grid.lowest)/self.grid.size
middle = (self.grid.largest + self.grid.lowest)/2
app.layout = html.Div(children=[
html.Div(children=[html.Button('Random', id='button')] +
[dcc.Slider(min=self.grid.lowest,max=self.grid.largest,step=step, value=middle, id=f"slider-{k}") for k in range(self.grid.dim)],
style = {'display': 'inline-block', 'width': '25%'}),
html.Img(id="font_figure")
])
@app.callback(
[Output(f"slider-{k}","value") for k in range(self.grid.dim)],
Input("button","n_clicks"))
def update_random(n_clicks):
return list(np_clip(self.sampler(size=self.grid.dim), a_min=self.grid.lowest, a_max=self.grid.largest))
@app.callback(
Output('font_figure', 'src'),
[Input(f"slider-{k}", 'value') for k in range(self.grid.dim)])
def update(*args):
style_vector = np_array(args).reshape((1,-1))
font = self.plotter.generate_font(model=self.model, style_vector=style_vector, charset_size=self.charset_size)
img = self.plotter.plot_font(font)
return self.plotter.fig_to_str(img)
app.run_server(**kwargs)
def process(self, x):
raise NotImplementedError("This method is not implemented for deployment.")
@classmethod
def get_config_parser(cls):
return DeploymentConfigHandler()
@classmethod
def from_config_object(cls, config: DeploymentConfig, **kwargs):
return cls(
model = config.model,
sampler = config.sampler,
grid = config.grid,
plotter = config.plotter,
charset_size = config.charset_size)
@classmethod
def run_from_config_object(cls, config: DeploymentConfig, **kwargs):
cls.from_config_object(config).launch(**config.dash_args)
@classmethod
def get_stage_name(cls):
return "deployment"
def transform_batch(self, input_path: str, output_path: str):
raise NotImplementedError("This method is not implemented for deployment.")
Classes
class Deployment (model: tensorflow.python.keras.engine.training.Model, sampler: Callable, grid: Grid, plotter: AlphabetPlotter, charset_size: int)
-
Deploys a small Dash app in which a fitted typeface generative model's style space can be explored
Args
model
:tf.keras.Model
- Generative model
sampler
:t.Callable
- Style vectors' sampling function
grid
:Grid
- Grid describing the style space's grid to explore
plotter
:AlphabetPlotter
- Plotter object
charset_size
:int
- Number of characters in font
Expand source code
class Deployment(ConfigurableTransform): """Deploys a small Dash app in which a fitted typeface generative model's style space can be explored """ def __init__( self, model: tf.keras.Model, sampler: t.Callable, grid: Grid, plotter: AlphabetPlotter, charset_size: int): """ Args: model (tf.keras.Model): Generative model sampler (t.Callable): Style vectors' sampling function grid (Grid): Grid describing the style space's grid to explore plotter (AlphabetPlotter): Plotter object charset_size (int): Number of characters in font """ self.model = model self.sampler = sampler self.grid = grid self.charset_size = charset_size self.plotter = plotter self.external_stylesheets = ['https://codepen.io/chriddyp/pen/bWLwgP.css'] def launch(self,**kwargs): """Launches a Dash app Args: **kwargs: Additional arguments passed to Dash app """ app = dash.Dash("fontai-deployment", external_stylesheets=self.external_stylesheets) step = (self.grid.largest - self.grid.lowest)/self.grid.size middle = (self.grid.largest + self.grid.lowest)/2 app.layout = html.Div(children=[ html.Div(children=[html.Button('Random', id='button')] + [dcc.Slider(min=self.grid.lowest,max=self.grid.largest,step=step, value=middle, id=f"slider-{k}") for k in range(self.grid.dim)], style = {'display': 'inline-block', 'width': '25%'}), html.Img(id="font_figure") ]) @app.callback( [Output(f"slider-{k}","value") for k in range(self.grid.dim)], Input("button","n_clicks")) def update_random(n_clicks): return list(np_clip(self.sampler(size=self.grid.dim), a_min=self.grid.lowest, a_max=self.grid.largest)) @app.callback( Output('font_figure', 'src'), [Input(f"slider-{k}", 'value') for k in range(self.grid.dim)]) def update(*args): style_vector = np_array(args).reshape((1,-1)) font = self.plotter.generate_font(model=self.model, style_vector=style_vector, charset_size=self.charset_size) img = self.plotter.plot_font(font) return self.plotter.fig_to_str(img) app.run_server(**kwargs) def process(self, x): raise NotImplementedError("This method is not implemented for deployment.") @classmethod def get_config_parser(cls): return DeploymentConfigHandler() @classmethod def from_config_object(cls, config: DeploymentConfig, **kwargs): return cls( model = config.model, sampler = config.sampler, grid = config.grid, plotter = config.plotter, charset_size = config.charset_size) @classmethod def run_from_config_object(cls, config: DeploymentConfig, **kwargs): cls.from_config_object(config).launch(**config.dash_args) @classmethod def get_stage_name(cls): return "deployment" def transform_batch(self, input_path: str, output_path: str): raise NotImplementedError("This method is not implemented for deployment.")
Ancestors
- ConfigurableTransform
- Transform
- abc.ABC
Static methods
def get_stage_name()
-
Expand source code
@classmethod def get_stage_name(cls): return "deployment"
Methods
def launch(self, **kwargs)
-
Launches a Dash app
Args
**kwargs
- Additional arguments passed to Dash app
Expand source code
def launch(self,**kwargs): """Launches a Dash app Args: **kwargs: Additional arguments passed to Dash app """ app = dash.Dash("fontai-deployment", external_stylesheets=self.external_stylesheets) step = (self.grid.largest - self.grid.lowest)/self.grid.size middle = (self.grid.largest + self.grid.lowest)/2 app.layout = html.Div(children=[ html.Div(children=[html.Button('Random', id='button')] + [dcc.Slider(min=self.grid.lowest,max=self.grid.largest,step=step, value=middle, id=f"slider-{k}") for k in range(self.grid.dim)], style = {'display': 'inline-block', 'width': '25%'}), html.Img(id="font_figure") ]) @app.callback( [Output(f"slider-{k}","value") for k in range(self.grid.dim)], Input("button","n_clicks")) def update_random(n_clicks): return list(np_clip(self.sampler(size=self.grid.dim), a_min=self.grid.lowest, a_max=self.grid.largest)) @app.callback( Output('font_figure', 'src'), [Input(f"slider-{k}", 'value') for k in range(self.grid.dim)]) def update(*args): style_vector = np_array(args).reshape((1,-1)) font = self.plotter.generate_font(model=self.model, style_vector=style_vector, charset_size=self.charset_size) img = self.plotter.plot_font(font) return self.plotter.fig_to_str(img) app.run_server(**kwargs)
def process(self, x)
-
Expand source code
def process(self, x): raise NotImplementedError("This method is not implemented for deployment.")
def transform_batch(self, input_path: str, output_path: str)
-
Expand source code
def transform_batch(self, input_path: str, output_path: str): raise NotImplementedError("This method is not implemented for deployment.")
Inherited members
class Ingestion (config: Config = None)
-
Retrieves zipped font files. It takes a list of scrappers defined in
fontai.io.scrappers
from which it downloads files to storage.Expand source code
class Ingestion(ConfigurableTransform, IdentityTransform): """Retrieves zipped font files. It takes a list of scrappers defined in `fontai.io.scrappers` from which it downloads files to storage. """ def __init__(self, config: IngestionConfig = None): self.config = config @classmethod def get_config_parser(cls): return IngestionConfigHandler() @classmethod def from_config_object(cls, config: IngestionConfig, **kwargs): return cls(config) @classmethod def run_from_config_object(cls, config: IngestionConfig, **kwargs): ingestor = cls.from_config_object(config) font_extractor = ZipToFontFiles() writer = ZipWriter(ingestor.config.output_path, config.max_output_file_size) for file in ScrapperReader(config.scrappers).get_files(): for font_file in font_extractor.map(file): writer.write(font_file) @classmethod def get_stage_name(cls): return "ingestion" def transform_batch(self, input_path: str, output_path: str): raise NotImplementedError("This method is not implemented for ingestion.")
Ancestors
Static methods
def get_stage_name()
-
Expand source code
@classmethod def get_stage_name(cls): return "ingestion"
Methods
def transform_batch(self, input_path: str, output_path: str)
-
Expand source code
def transform_batch(self, input_path: str, output_path: str): raise NotImplementedError("This method is not implemented for ingestion.")
Inherited members
class Preprocessing (output_record_class: type, charset: str, font_extraction_size: int, canvas_size: int, canvas_padding: int, output_array_size: int, beam_cmd_line_args: List[str] = [])
-
Processes zipped font files and outputs Tensorflow records consisting of labeled images for ML consumption.
Args
output_record_class
:type
- Output schema class, inheriting from TfrWritable
charset
:str
- String with characters to be extracted
font_extraction_size
:int
- Font size to use when extracting font images
canvas_size
:int
- Image canvas size in which fonts will be extracted
canvas_padding
:int
- Padding in the image extraction canvas
output_array_size
:int
- Final character image size
beam_cmd_line_args
:t.List[str]
, optional- List of Apache Beam command line arguments for distributed processing
Expand source code
class Preprocessing(ConfigurableTransform): """ Processes zipped font files and outputs Tensorflow records consisting of labeled images for ML consumption. """ def __init__( self, output_record_class: type, charset: str, font_extraction_size: int, canvas_size: int, canvas_padding: int, output_array_size: int, beam_cmd_line_args: t.List[str] = []): """ Args: output_record_class (type): Output schema class, inheriting from TfrWritable charset (str): String with characters to be extracted font_extraction_size (int): Font size to use when extracting font images canvas_size (int): Image canvas size in which fonts will be extracted canvas_padding (int): Padding in the image extraction canvas output_array_size (int): Final character image size beam_cmd_line_args (t.List[str], optional): List of Apache Beam command line arguments for distributed processing """ self.beam_cmd_line_args = beam_cmd_line_args self.pipeline = PipelineFactory.create( output_record_class = output_record_class, charset = charset, font_extraction_size = font_extraction_size, canvas_size = canvas_size, canvas_padding = canvas_padding, output_array_size = output_array_size) @classmethod def from_config_object(cls, config: ProcessingConfig, **kwargs): return cls( output_record_class = config.output_record_class, output_array_size = config.output_array_size, beam_cmd_line_args = config.beam_cmd_line_args, **config.font_to_array_config.dict()) return cls() def transform(self, data): return self.pipeline.map(data) @classmethod def get_config_parser(cls): return ProcessingConfigHandler() @classmethod def get_stage_name(cls): return "preprocessing" @classmethod def run_from_config_object(cls, config: ProcessingConfig, **kwargs): # set provisional value for CUDA env variable to prevent out of memory errors from the GPU; this occurs because the preprocessing code depends on (CPU-bound) Tensorflow functionality, which attempts to seize memory from the GPU automatically, but this throws an error when Beam uses multiple threads. visible_devices = os.getenv("CUDA_VISIBLE_DEVICES") os.environ["CUDA_VISIBLE_DEVICES"] = "-1" output_path, input_path = config.output_path, config.input_path processor = cls.from_config_object(config) # if output is locally persisted, create parent folders if not BytestreamPath(output_path).is_url(): Path(str(output_path)).mkdir(parents=True, exist_ok=True) pipeline_options = PipelineOptions(processor.beam_cmd_line_args) pipeline_options.view_as(SetupOptions).save_main_session = True with beam.Pipeline(options=pipeline_options) as p: # execute pipeline (p | 'create source list' >> beam.Create(BytestreamPath(input_path).list_sources()) #create list of sources as strings | 'read zipped files' >> beam.Map(lambda filepath: InMemoryZipfile.from_bytestream_path(filepath)) #line needed to load files lazily and not overwhelm memory | 'get labeled exampes from zip files' >> beam.ParDo( BeamCompatibleWrapper( mapper = PipelineFactory.create( output_record_class = config.output_record_class, output_array_size = config.output_array_size, **config.font_to_array_config.dict()) ) ) | "write to storage" >> beam.ParDo(Writer(TfrWriter(output_path, config.max_output_file_size)))) # unset provisional value for env variable if visible_devices is None: del os.environ["CUDA_VISIBLE_DEVICES"] else: os.environ["CUDA_VISIBLE_DEVICES"] = visible_devices
Ancestors
- ConfigurableTransform
- Transform
- abc.ABC
Static methods
def get_stage_name()
-
Expand source code
@classmethod def get_stage_name(cls): return "preprocessing"
Inherited members
class Scoring (model: tensorflow.python.keras.engine.training.Model, training_config: TrainingConfig = None, charset: str = 'lowercase')
-
Trains a prediction model or uses one to score input data.
Attributes
model
:keras.Model
- Scoring model
CHARSET_OPTIONS
:t.Dict
- Dictionary from allowed charsets names to charsets
training_config
:TrainingConfig
- training configuration wrapper
charset_tensor
:Tensor
- Tensor with an entry per character in the current charset
Args
model
:tf.keras.Model
- Scoring model
training_config
:TrainingConfig
, optional- Training configuration wrapper
charset
:str
- charset to use for training and batch scoring. It must be one of 'lowercase', 'uppercase' or 'digits', or otherwise a string with all characters under consideration
Expand source code
class Scoring(FittableTransform): """ Trains a prediction model or uses one to score input data. Attributes: model (keras.Model): Scoring model CHARSET_OPTIONS (t.Dict): Dictionary from allowed charsets names to charsets training_config (TrainingConfig): training configuration wrapper charset_tensor (Tensor): Tensor with an entry per character in the current charset """ CHARSET_OPTIONS = { "uppercase": string.ascii_letters[26::], "lowercase": string.ascii_letters[0:26], "digits": string.digits, "all": string.ascii_letters + string.digits } def __init__( self, model: tf.keras.Model, training_config: TrainingConfig = None, charset: str = "lowercase"): """ Args: model (tf.keras.Model): Scoring model training_config (TrainingConfig, optional): Training configuration wrapper charset (str): charset to use for training and batch scoring. It must be one of 'lowercase', 'uppercase' or 'digits', or otherwise a string with all characters under consideration """ self.model = model self.training_config = training_config self.charset = charset try: self.charset = self.CHARSET_OPTIONS[charset] except KeyError as e: logger.warning(f"Charset string is not one from {list(self.CHARSET_OPTIONS.keys())}; creating custom charset from provided string instead.") self.charset = "".join(list(set(charset))) self.num_classes = len(self.charset) self.charset_tensor = np_array([str.encode(x) for x in list(self.charset)]) def fit(self, data: TFRecordDataset): """Fits the scoring model with the passed data Args: data (TFRecordDataset): training data Returns: Scoring: Scoring with trained model Raises: ValueError: If training_config is None (not provided). """ if self.training_config is None: raise ValueError("Training configuration not provided at instantiation time.") self.model.compile( loss = self.training_config.loss, optimizer = self.training_config.optimizer, metrics = self.training_config.metrics, run_eagerly=False) self.model.fit( data, #training_data, steps_per_epoch = self.training_config.steps_per_epoch, epochs = self.training_config.epochs, callbacks = self.training_config.callbacks) return self def _to_shape(self, x: t.Union[ndarray, Tensor]): """Reshape single example to be transformed in-memory by the `transform` method. Args: x (t.Union[ndarray, Tensor]): Single input Returns: t.Union[ndarray, Tensor]: Reshaped input """ if len(x.shape) == 2: x = x.reshape((1,) + x.shape + (1,)) elif len(x.shape) == 3: x = x.reshape((1,) + x.shape) return x def transform(self, input_data: t.Union[ndarray, Tensor, TfrWritable]) -> t.Union[Tensor, ndarray, TfrWritable]: """ Process a single instance Args: input_data (t.Union[ndarray, Tensor, TfrWritable]): Input instance Returns: t.Union[Tensor, ndarray, TfrWritable]: Scored example in the corresponding format, depending on the input type. Raises: TypeError: If input type is not allowed. """ if isinstance(input_data, (ndarray, Tensor)): return self.model.predict(self._to_shape(input_data)) elif isinstance(input_data, TfrWritable): return input_data.add_score( score = self.model.predict(self._to_shape(input_data.features)), charset_tensor=self.charset_tensor) else: raise TypeError("Input type is not one of ndarray, Tensor or TfrWritable") @classmethod def get_config_parser(cls): return ScoringConfigHandler() @classmethod def get_stage_name(cls): return "scoring" @classmethod def from_config_object(cls, config: ScoringConfig, load_from_model_path = False): """Initialises a Scoring instance from a configuration object Args: config (ScoringConfig): COnfiguration object load_from_model_path (bool, optional): If True, the model is loaded from the model_path argument in the configuration object. Returns: Scoring: Instantiated Scoring object. """ if load_from_model_path: model_class_name = config.model.__class__.__name__ classname_tuple = ("custom_class", model_class_name if model_class_name != "Sequential" else None) # dict -> YAML -> Model input_dict = {"path": config.model_path} if model_class_name != "Sequential": input_dict["custom_class"] = model_class_name model_yaml = as_document(input_dict) message = f"load_from_model_path flag set to True; loading model of class {model_class_name} from {config.model_path}" #print(message) logger.info(message) model = ModelFactory().from_yaml(model_yaml) else: model = config.model predictor = Scoring(model = model, training_config = config.training_config, charset = config.charset) return predictor @classmethod def run_from_config_object(cls, config: ScoringConfig, load_from_model_path = False): predictor = cls.from_config_object(config, load_from_model_path) data_fetcher = RecordPreprocessor( input_record_class = config.input_record_class, charset_tensor = predictor.charset_tensor, custom_filters = predictor.training_config.custom_filters, custom_mappers = predictor.training_config.custom_mappers) writer = TfrWriter(config.output_path) files = TfrReader(config.input_path).get_files() data = data_fetcher.fetch(files, training_format=False, batch_size = predictor.training_config.batch_size) counter = 0 for features, labels, fontnames in data: try: scores = predictor.transform(features) scored_records = config.input_record_class.from_scored_batch( features = features.numpy(), labels = labels.numpy(), fontnames = fontnames.numpy(), scores = scores, charset_tensor = predictor.charset_tensor) for record in scored_records: writer.write(record) counter += 1 except Exception as e: logger.exception(f"Exception scoring batch with features: {features}. Full trace: {traceback.format_exc()}") writer.close() logger.info(f"Processed {counter} examples.") @classmethod def fit_from_config_object(cls, config: ScoringConfig, load_from_model_path = False, run_id: str = None): # implement graceful interruptions def save_on_sigint(sig, frame): predictor.model.save(config.model_path) logger.info(f"Training stopped by SIGINT: saving current model to {config.model_path}") sys.exit(0) signal.signal(signal.SIGINT, save_on_sigint) # start MLFlow run with mlflow.start_run(run_id=run_id, nested=False) as run: logger.info(f"MLFlow run id: {run.info.run_id}") cfg_log_path = "run-configs" # check whether there are previous run configs in this MLFLow run client = mlflow.tracking.MlflowClient() n_previous_runs = len(client.list_artifacts(run.info.run_id, cfg_log_path)) current_run = f"{n_previous_runs + 1}.yaml" # log config file with open(current_run,"w") as f: f.write(config.yaml.as_yaml()) mlflow.log_artifact(current_run,cfg_log_path) os.remove(current_run) #start keras autologging mlflow.tensorflow.autolog() # fetch data and fit model predictor = cls.from_config_object(config, load_from_model_path) data = TfrReader(config.input_path).get_files() data_fetcher = RecordPreprocessor( input_record_class = config.input_record_class, charset_tensor = predictor.charset_tensor, custom_filters = predictor.training_config.custom_filters, custom_mappers = predictor.training_config.custom_mappers) predictor.fit(data_fetcher.fetch(data, training_format=True, batch_size=predictor.training_config.batch_size)) logger.info(f"Saving trained model to {config.model_path}") predictor.model.save(config.model_path)
Ancestors
Class variables
var CHARSET_OPTIONS
Static methods
def from_config_object(config: Config, load_from_model_path=False)
-
Initialises a Scoring instance from a configuration object
Args
config
:ScoringConfig
- COnfiguration object
load_from_model_path
:bool
, optional- If True, the model is loaded from the model_path argument in the configuration object.
Returns
Scoring
- Instantiated Scoring object.
Expand source code
@classmethod def from_config_object(cls, config: ScoringConfig, load_from_model_path = False): """Initialises a Scoring instance from a configuration object Args: config (ScoringConfig): COnfiguration object load_from_model_path (bool, optional): If True, the model is loaded from the model_path argument in the configuration object. Returns: Scoring: Instantiated Scoring object. """ if load_from_model_path: model_class_name = config.model.__class__.__name__ classname_tuple = ("custom_class", model_class_name if model_class_name != "Sequential" else None) # dict -> YAML -> Model input_dict = {"path": config.model_path} if model_class_name != "Sequential": input_dict["custom_class"] = model_class_name model_yaml = as_document(input_dict) message = f"load_from_model_path flag set to True; loading model of class {model_class_name} from {config.model_path}" #print(message) logger.info(message) model = ModelFactory().from_yaml(model_yaml) else: model = config.model predictor = Scoring(model = model, training_config = config.training_config, charset = config.charset) return predictor
def get_stage_name()
-
Expand source code
@classmethod def get_stage_name(cls): return "scoring"
Methods
def fit(self, data: tensorflow.python.data.ops.readers.TFRecordDatasetV2)
-
Fits the scoring model with the passed data
Args
data
:TFRecordDataset
- training data
Returns
Scoring
- Scoring with trained model
Raises
ValueError
- If training_config is None (not provided).
Expand source code
def fit(self, data: TFRecordDataset): """Fits the scoring model with the passed data Args: data (TFRecordDataset): training data Returns: Scoring: Scoring with trained model Raises: ValueError: If training_config is None (not provided). """ if self.training_config is None: raise ValueError("Training configuration not provided at instantiation time.") self.model.compile( loss = self.training_config.loss, optimizer = self.training_config.optimizer, metrics = self.training_config.metrics, run_eagerly=False) self.model.fit( data, #training_data, steps_per_epoch = self.training_config.steps_per_epoch, epochs = self.training_config.epochs, callbacks = self.training_config.callbacks) return self
def transform(self, input_data: Union[numpy.ndarray, tensorflow.python.framework.ops.Tensor, TfrWritable]) ‑> Union[numpy.ndarray, tensorflow.python.framework.ops.Tensor, TfrWritable]
-
Process a single instance
Args
input_data
:t.Union[ndarray, Tensor, TfrWritable]
- Input instance
Returns
t.Union[Tensor, ndarray, TfrWritable]
- Scored example in the corresponding format, depending on the input type.
Raises
TypeError
- If input type is not allowed.
Expand source code
def transform(self, input_data: t.Union[ndarray, Tensor, TfrWritable]) -> t.Union[Tensor, ndarray, TfrWritable]: """ Process a single instance Args: input_data (t.Union[ndarray, Tensor, TfrWritable]): Input instance Returns: t.Union[Tensor, ndarray, TfrWritable]: Scored example in the corresponding format, depending on the input type. Raises: TypeError: If input type is not allowed. """ if isinstance(input_data, (ndarray, Tensor)): return self.model.predict(self._to_shape(input_data)) elif isinstance(input_data, TfrWritable): return input_data.add_score( score = self.model.predict(self._to_shape(input_data.features)), charset_tensor=self.charset_tensor) else: raise TypeError("Input type is not one of ndarray, Tensor or TfrWritable")
Inherited members