Turning a bunch of MIDI files into parquet data

init_stats[source]

init_stats()

Resets reporting stats to zero.

report[source]

report(logger)

Reports current metrics, either to Spell or to a logger.

Untar'ing the file

The first step is to untar the file containing the MIDI files.

[source]

(*args:Any, mapped:bool=False, task_args:dict=None, upstream_tasks:Iterable[Any]=None, flow:Flow=None, **kwargs:Any)

A convenience Task for functionally creating Task instances with arbitrary callable run methods.

Args:

- fn (callable): the function to be the task's `run` method
- name (str, optional): the name of this task
- **kwargs: keyword arguments that will be passed to the Task
    constructor

Raises:

- ValueError: if the provided function violates signature requirements
    for Task run methods

Example:

task = FunctionTask(lambda x: x - 42, name="Subtract 42")

with Flow("My Flow") as f:
    result = task(42)

Partitioning the files in minibatches

Since the tar.gz file may contain a huge amount of MIDI files, we'll partition those files into minibatches that we can process in parallel.

[source]

(*args:Any, mapped:bool=False, task_args:dict=None, upstream_tasks:Iterable[Any]=None, flow:Flow=None, **kwargs:Any)

A convenience Task for functionally creating Task instances with arbitrary callable run methods.

Args:

- fn (callable): the function to be the task's `run` method
- name (str, optional): the name of this task
- **kwargs: keyword arguments that will be passed to the Task
    constructor

Raises:

- ValueError: if the provided function violates signature requirements
    for Task run methods

Example:

task = FunctionTask(lambda x: x - 42, name="Subtract 42")

with Flow("My Flow") as f:
    result = task(42)

Processing a minibatch

For each minibatch, we'll go through its MIDI files, parse them, and write them to a separate Parquet file.

[source]

(*args:Any, mapped:bool=False, task_args:dict=None, upstream_tasks:Iterable[Any]=None, flow:Flow=None, **kwargs:Any)

A convenience Task for functionally creating Task instances with arbitrary callable run methods.

Args:

- fn (callable): the function to be the task's `run` method
- name (str, optional): the name of this task
- **kwargs: keyword arguments that will be passed to the Task
    constructor

Raises:

- ValueError: if the provided function violates signature requirements
    for Task run methods

Example:

task = FunctionTask(lambda x: x - 42, name="Subtract 42")

with Flow("My Flow") as f:
    result = task(42)

Merging the parquet files

Once we have all the minibatches in separate parquet files, merging them into a single dataset is trivial.

[source]

(*args:Any, mapped:bool=False, task_args:dict=None, upstream_tasks:Iterable[Any]=None, flow:Flow=None, **kwargs:Any)

A convenience Task for functionally creating Task instances with arbitrary callable run methods.

Args:

- fn (callable): the function to be the task's `run` method
- name (str, optional): the name of this task
- **kwargs: keyword arguments that will be passed to the Task
    constructor

Raises:

- ValueError: if the provided function violates signature requirements
    for Task run methods

Example:

task = FunctionTask(lambda x: x - 42, name="Subtract 42")

with Flow("My Flow") as f:
    result = task(42)

Putting everything together

Now we can build the ETL flow!

build_etl[source]

build_etl(cfg)

Builds the ETL flow.

Testing the ETL

The ETL accepts a tar.gz file input containing MIDI files:

# test
from testing import test_eq, path
from omegaconf import OmegaConf
import fastparquet

tmp_path = "/tmp/neuralmusic_etl"

targz_path = path("data/midi.tar.gz")

dot_list = [f"tar_gz_path={targz_path}", f"outdir={tmp_path}", "partition_size=1"]
etl_cfg = OmegaConf.from_dotlist(dot_list)
flow = build_etl(etl_cfg)

init_stats()

started_at = time.time()
flow.run()

test_eq(4, total_songs)
test_eq(0, malformed_songs)
test_eq(4, valid_songs)
test_eq(4, valid_rows)


df = fastparquet.ParquetFile(tmp_path, verify=True).to_pandas()
test_eq(4, len(df))

# TODO: figure out order!
# test_eq(["7.11.2", "7", "7"], pitches[0:3])
# test_eq([1.75, 0.5, 0.5], durations[0:3])
# test_eq([110, 110, 110], velocities[0:3])
[2019-12-21 16:07:51,597] INFO - prefect.FlowRunner | Beginning Flow run for 'Neuralmusic Data ETL'
[2019-12-21 16:07:51,600] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-21 16:07:51,626] INFO - prefect.TaskRunner | Task 'untar_cmd': Starting task run...
[2019-12-21 16:07:51,643] INFO - prefect.TaskRunner | Task 'untar_cmd': finished task run for task with final state: 'Skipped'
[2019-12-21 16:07:51,665] INFO - prefect.TaskRunner | Task 'untar_task': Starting task run...
[2019-12-21 16:07:51,677] INFO - prefect.TaskRunner | Task 'untar_task': finished task run for task with final state: 'Skipped'
[2019-12-21 16:07:51,699] INFO - prefect.TaskRunner | Task 'partition_files': Starting task run...
[2019-12-21 16:07:51,702] INFO - prefect.Task: partition_files | Processing 4 MIDI files partitioned into groups of 1
[2019-12-21 16:07:51,719] INFO - prefect.TaskRunner | Task 'partition_files': finished task run for task with final state: 'Success'
[2019-12-21 16:07:51,734] INFO - prefect.TaskRunner | Task 'process_and_write': Starting task run...
[2019-12-21 16:07:51,753] INFO - prefect.TaskRunner | Task 'process_and_write[0]': Starting task run...
[2019-12-21 16:07:53,568] INFO - prefect.TaskRunner | Task 'process_and_write[0]': finished task run for task with final state: 'Success'
[2019-12-21 16:07:53,578] INFO - prefect.TaskRunner | Task 'process_and_write[1]': Starting task run...
[2019-12-21 16:07:55,107] INFO - prefect.TaskRunner | Task 'process_and_write[1]': finished task run for task with final state: 'Success'
[2019-12-21 16:07:55,113] INFO - prefect.TaskRunner | Task 'process_and_write[2]': Starting task run...
[2019-12-21 16:07:56,148] INFO - prefect.TaskRunner | Task 'process_and_write[2]': finished task run for task with final state: 'Success'
[2019-12-21 16:07:56,153] INFO - prefect.TaskRunner | Task 'process_and_write[3]': Starting task run...
[2019-12-21 16:07:56,849] INFO - prefect.TaskRunner | Task 'process_and_write[3]': finished task run for task with final state: 'Success'
[2019-12-21 16:07:56,858] INFO - prefect.TaskRunner | Task 'process_and_write': finished task run for task with final state: 'Mapped'
[2019-12-21 16:07:56,882] INFO - prefect.TaskRunner | Task 'combine_parquet_files': Starting task run...
[2019-12-21 16:07:56,911] INFO - prefect.TaskRunner | Task 'combine_parquet_files': finished task run for task with final state: 'Success'
[2019-12-21 16:07:56,914] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded

It also accepts a path to a folder with MIDI files:

# test
from testing import test_eq, path
from omegaconf import OmegaConf
import fastparquet

tmp_path = "/tmp/neuralmusic_etl"

midi_path = path("data")

dot_list = [f"midi_path={midi_path}", f"outdir={tmp_path}", "partition_size=1"]
etl_cfg = OmegaConf.from_dotlist(dot_list)
flow = build_etl(etl_cfg)

init_stats()

started_at = time.time()
flow.run()

test_eq(4, total_songs)
test_eq(0, malformed_songs)
test_eq(4, valid_songs)
test_eq(4, valid_rows)


df = fastparquet.ParquetFile(tmp_path, verify=True).to_pandas()
test_eq(4, len(df))

# TODO: figure out order!
# test_eq(["7.11.2", "7", "7"], pitches[0:3])
# test_eq([1.75, 0.5, 0.5], durations[0:3])
# test_eq([110, 110, 110], velocities[0:3])
[2019-12-21 16:08:00,888] INFO - prefect.FlowRunner | Beginning Flow run for 'Neuralmusic Data ETL'
[2019-12-21 16:08:00,905] INFO - prefect.FlowRunner | Starting flow run.
[2019-12-21 16:08:00,926] INFO - prefect.TaskRunner | Task 'partition_files': Starting task run...
[2019-12-21 16:08:00,934] INFO - prefect.Task: partition_files | Processing 4 MIDI files partitioned into groups of 1
[2019-12-21 16:08:00,966] INFO - prefect.TaskRunner | Task 'partition_files': finished task run for task with final state: 'Success'
[2019-12-21 16:08:00,984] INFO - prefect.TaskRunner | Task 'process_and_write': Starting task run...
[2019-12-21 16:08:00,999] INFO - prefect.TaskRunner | Task 'process_and_write[0]': Starting task run...
[2019-12-21 16:08:01,764] INFO - prefect.TaskRunner | Task 'process_and_write[0]': finished task run for task with final state: 'Success'
[2019-12-21 16:08:01,772] INFO - prefect.TaskRunner | Task 'process_and_write[1]': Starting task run...
[2019-12-21 16:08:02,979] INFO - prefect.TaskRunner | Task 'process_and_write[1]': finished task run for task with final state: 'Success'
[2019-12-21 16:08:02,987] INFO - prefect.TaskRunner | Task 'process_and_write[2]': Starting task run...
[2019-12-21 16:08:04,316] INFO - prefect.TaskRunner | Task 'process_and_write[2]': finished task run for task with final state: 'Success'
[2019-12-21 16:08:04,323] INFO - prefect.TaskRunner | Task 'process_and_write[3]': Starting task run...
[2019-12-21 16:08:05,565] INFO - prefect.TaskRunner | Task 'process_and_write[3]': finished task run for task with final state: 'Success'
[2019-12-21 16:08:05,571] INFO - prefect.TaskRunner | Task 'process_and_write': finished task run for task with final state: 'Mapped'
[2019-12-21 16:08:05,618] INFO - prefect.TaskRunner | Task 'combine_parquet_files': Starting task run...
[2019-12-21 16:08:05,646] INFO - prefect.TaskRunner | Task 'combine_parquet_files': finished task run for task with final state: 'Success'
[2019-12-21 16:08:05,648] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded