Flow

The flow components of python-weka-wrapper are not related to Weka’s KnowledgeFlow. Instead, they were inspired by the ADAMS workflow engine. It is a very simple workflow, aimed at automating tasks and easy to extend as well. Instead of linking operators with explicit connections, this flow uses a tree structure for implicitly defining how the data is processed.

Overview

A workflow component is called an actor. All actors are derived from the Actor class, but there are four different kinds of actors present:

  • source actors generate data, but don’t consume any
  • transformer actors consume and generate data, similar to a filter
  • sink actors only consume data, e.g., displaying data or writing to file
  • control actors define how the data is passed around in a flow, e.g., branching

Data itself is being passed around in Token containers.

Due to the limitation of the tree structure of providing only 1-to-n connections, objects can be stored internally in a flow using a simple dictionary (internal storage). Special actors store, retrieve, update and delete these objects.

For finding out more about a specific actor, and what parameters it offers (via the config dictionary property), you use one of the following actor methods:

  • print_help() – outputs a description of actor and its options on stdout
  • generate_help() – generates the help string output by print_help()

Printing the layout of a flow is very simple. Assuming you have a flow variable called myflow, you simply use the tree method to output the structure: print(myflow.tree)

All actors can return and restore from JSON as well, simply use the following property to access or set the JSON representation: json

Life cycle

The typical life-cycle of a flow (actually any actor) can be described through the following method calls:

  1. setup() configures and checks the flow (outputs error message if failed, None otherwise)
  2. execute() performs the execution of actors (outputs error message if failed, None otherwise)
  3. wrapup() finishes up the execution
  4. cleanup() destructive, frees up memory

Sources

The following source actors are available:

  • CombineStorage expands storage items in a format string and forwards the generated string
  • DataGenerator outputs artificial data
  • FileSupplier outputs predefined file names
  • ForLoop outputs integer tokens as defined by the loop setup (min, max, step)
  • GetStorageValue outputs a storage from internal storage
  • ListFiles lists files/directories
  • LoadDatabase loads data from a database using an SQL query
  • Start dummy source that just triggers the execution of other actors following
  • StringConstants simply outputs a list of predefined strings, one by one

Transformers

The following transformers are available:

  • AttributeSelection performs attribute selection on a dataset and outputs an AttributeSelectionContainer
  • ClassSelector sets/unsets the class attribute of a dataset
  • Convert applies simple conversion schemes to the data passing through
  • Copy creates a deep copy of serializable Java objects
  • CrossValidate performs cross-validation on a classifier or clusterer
  • DeleteFile deletes files that match a regular expression
  • DeleteStorageValue deletes a value from internal storage
  • Evaluatie evaluates a trained classifier/clusterer in internal storage on the data passing through
  • EvaluationSummary generates a summary from a classifier/clusterer evaluation object
  • Filter applies a Weka filter to the data passing through
  • InitStorageValue sets the initial value for a internal storage value
  • LoadDataset loads the data stored in the file received as input, either using automatic determined loader or user-specified one
  • MathExpression computes a numeric value from a expression and numeric input
  • ModelReader reads classifier/clusterer models from disk and forwards a ModelContainer
  • PassThrough is a dummy that just passes through the tokens
  • Predict applies classifier/clusterer model (serialized file or from storage) to incoming Instance objects
  • RenameRelation updates the relation name of Instance/Instances objects
  • SetStorageValue stores the payload of the current token in internal storage
  • Train builds a classifier/clusterer/associator and passes on a ModelContainer
  • UpdateStorageValue applies an expression to update an internal storage value, e.g. incrementing a counter

Sinks

The following sinks are available:

  • ClassifierErrors displays the classifier errors obtained from an Evaluation object
  • Console just outputs a string representation of the object on stdout
  • DumpFile similar to Console, but stores the string representation in a file
  • InstanceDumper dumps incoming Instance/Instances object in a file
  • LinePlot displays an Instances object as line plot, using the internal format
  • MatrixPlot displays an Instances object as matrix plot
  • ModelWriter stores a trained model on disk
  • Null simply swallows any token (like /dev/null on Linux)
  • PRC plots a precision-recall curve from an Evaluation object
  • ROC plots a receiver-operator curver from an Evaluation object

Control actors

The following control actors define how data is getting passed around in a workflow:

  • Branch forwards the same input token to all of its sub-branches
  • ContainerValuePicker extracts a named value from a container, e.g. the Model from a ModelContainer
  • Flow the outermost actor that also handles the internal storage
  • Sequence executes its sub-actors sequentially, with the data generated by the previous being the input for the next one
  • Stop stops the execution of the flow
  • Tee allows to tee off the current token and process it separately in a sub-flow before continuing with the processing; optional condition available that determines when a token gets tee’d off
  • Trigger executes its sub-actors whenever a token passes through (i.e., when the condition evaluates to True)

Conversions

The following conversion schemes can be used in conjunction with the Convert transformer:

  • AnyToCommandline generates a command-line string from an object, e.g., a classifier
  • CommandlineToAny generates an object from a command-line string, e.g., a classifier setup
  • PassThrough is a dummy conversion that just passes through the data

Examples

Check out the examples available through the python-weka-wrapper-examples project on Github:

The example scripts are located in the src/wekaexamples/flow sub-directory.

Below is a code snippet for building a flow that cross-validates a classifier on a dataset and outputs the evaluation summary and the ROC and PRC curves:

from weka.classifiers import Classifier
from weka.flow.control import Flow, Branch, Sequence
from weka.flow.source import FileSupplier
from weka.flow.transformer import LoadDataset, ClassSelector, CrossValidate, EvaluationSummary
from weka.flow.sink import Console, ClassifierErrors, ROC, PRC

flow = Flow(name="cross-validate classifier")

filesupplier = FileSupplier()
filesupplier.config["files"] = ["/some/where/iris.arff"]
flow.actors.append(filesupplier)

loaddataset = LoadDataset()
flow.actors.append(loaddataset)

select = ClassSelector()
select.config["index"] = "last"
flow.actors.append(select)

cv = CrossValidate()
cv.config["setup"] = Classifier(classname="weka.classifiers.trees.J48")
flow.actors.append(cv)

branch = Branch()
flow.actors.append(branch)

seqsum = Sequence()
seqsum.name = "summary"
branch.actors.append(seqsum)

summary = EvaluationSummary()
summary.config["title"] = "=== J48/iris ==="
summary.config["complexity"] = False
summary.config["matrix"] = True
seqsum.actors.append(summary)

console = Console()
seqsum.actors.append(console)

seqerr = Sequence()
seqerr.name = "errors"
branch.actors.append(seqerr)

errors = ClassifierErrors()
errors.config["wait"] = False
seqerr.actors.append(errors)

seqroc = Sequence()
seqroc.name = "roc"
branch.actors.append(seqroc)

roc = ROC()
roc.config["wait"] = False
roc.config["class_index"] = [0, 1, 2]
seqroc.actors.append(roc)

seqprc = Sequence()
seqprc.name = "prc"
branch.actors.append(seqprc)

prc = PRC()
prc.config["wait"] = True
prc.config["class_index"] = [0, 1, 2]
seqprc.actors.append(prc)

# run the flow
msg = flow.setup()
if msg is None:
    msg = flow.execute()
    if msg is not None:
        print("Error executing flow:\n" + msg)
else:
    print("Error setting up flow:\n" + msg)
flow.wrapup()
flow.cleanup()

With the following command you can output the built flow tree:

print(flow.tree)

The above example gets printed like this:

Flow 'cross-validate classifier'
|-FileSupplier [files: 1]
|-LoadDataset [incremental: False, custom: False, loader: weka.core.converters.ArffLoader]
|-ClassSelector [index: last]
|-CrossValidate [setup: weka.classifiers.trees.J48 -C 0.25 -M 2, folds: 10]
|-Branch
| |-Sequence 'summary'
| | |-EvaluationSummary [title: === J48/iris ===, complexity: False, matrix: True]
| | |-Console [prefix: '']
| |-Sequence 'errors'
| | |-ClassifierErrors [absolute: True, title: None, outfile: None, wait: False]
| |-Sequence 'roc'
| | |-ROC [classes: [0, 1, 2], title: None, outfile: None, wait: False]
| |-Sequence 'prc'
| | |-PRC [classes: [0, 1, 2], title: None, outfile: None, wait: True]

Extending

Adding additional flow components is quite easy:

  • Choose the superclass, based on how the actor is supposed to behave:
  • sourceweka.flow.source.Source
  • transformerweka.flow.transformer.Transformer
  • sinkweka.flow.sink.Sink
  • add the new class with a constructor like this def __init__(self, name=None, options=None):
  • add a description method that returns a string, describing what your actor does
  • added a fix_config method that ensures that all configurations are present and help for them as well (e.g., transformer.ClassSelector)
  • if you want to output some additional info in the tree layout, implement a quickinfo method (see, e.g., transformer.ClassSelector)
  • override the setup method if you require some additional checks (e.g., file actually exists) or setup steps (e.g., loading of model from disk); return None if everything OK, otherwise the error; don’t forget to call the super-method.
  • transformers or sink can check the input data by overriding the check_input method
  • the actual execution or processing of input data happens in the do_execute method; return an error string if something failed, otherwise None; sources and transformers can append the generated data (wrapped in Token objects) to the self._output list