🧰 Usage

Here we discuss all necessary elements of setting up a Planchet processing job. If you are looking for a complete example, look at Quickstart. If you are in a hurry, skip to the Client section. If you want a quick overview of what is possible, you can take a look at the PlanchetClient object or at the Swagger API page typically under http://localhost:5005/docs.

Jobs

The job is essentially a big of metadata that controls the data management on the server side. It is created though a separate request before starting to do processing. To set up a regular job you will need a name, and to specify the reading and writing methods through the parameters. There are, however, some other types of jobs you may want to run, like an error logging job or a reading job, or a repair job. Here’s a list of all parameters relevant for setting up a job:

  • name: the name of the job. If the job already exists, your initial request will fail.

  • reader_name: the name of the reader class, e.g. CsvReader.

  • writer_name: the name of the writer class, e.g. CsvWriter.

  • metadata: the metadata passed on the reader and the writer classes (see more below).

  • clean_start: restarts the job if it exists.

  • mode: I/O mode for the job. Possible values: read, write, read-write.

  • cont: if true, resets the iterator of the reader and allows to serve all incomplete items again.

  • token: if used, the job will require an authentication token

Now that we know what the parameters of a job are, let’s consider the scenarios we mentioned above.

Reading job: if you are interested in reading data from Planchet and not storing the results, you can set the mode parameter to read which will remove the requirement of specifying a writer.

Error logging job: if you just want to dump things through Planchet, like your errors, you can set the mode parameter to write and disable the reader.

Repair job: you will likely have a case where you interrupt a process or you crash the system but you want to continue your processing. Typically, Planchet will just give you the next items available and simply ignore the ones that it served but never received. Making this smart has a lot of complications so instead we handle it by running a repair job at the end using the cont parameter. It essentially resets the iterator and wipes all non-complete items from the log. Then it goes and reads them again while skipping all the complete items.

The data

You can currently process anything you want that can be read and written to a CSV or a JSONL file. The constraints to this are basically independence of the data points and considerate sizes of the items and the workers pool.

You could process data from/to other types of format if you build your own reader and writer as discussed in the advanced section.

In practical terms, the data is served in two formats based on whether it comes from a CSV or JSONL file:

items = client.get(job_name, n_items)
for id_, item in items:
    print(item)
    # prints a list if reading from a CSV file
    # prints a dictionary or a list if reading from a JSONL file

Readers & writers

Planchet currently supports CSV through the CsvReader and CsvWriter classes, and JSONL through the JsonlReader and JsonlWriter classes. You need to specify one of each pair as the name of your reader and writer in order to confuigure a job. You will also need to provide a shared metadata file for the reader and the writer, which is essentially a configuration.

Currently, the following parameters are used:

  • input_file_path: path to the data file for the job (both formats)

  • output_file_path: path to the output file for the job (both formats)

  • chunk_size: size of the chunk to be read by the CSV reading iterator; you probably don’t need to worry about this one.

  • overwrite: if true, existing files are overwritten; if false existing files are appended.

Example

{
  "input_file_path": "/path/to/file",
  "output_file_path": "/path/to/output",
  "chunk_size": 100,
  "overwrite": False
}

The endpoints

scramble: starts a job. Requires name, reader_name, writer_name, and metadata parameters. Can be further parametrised by cont to make a repair job and mode to control whether it will be a read-only, write-only or read and write job.

/serve: serves a batch of items from a job (job_name). The number of items depends on the batch_size.

/receive: receives a batch of items from a job (job_name) sent through the items parameter.

/mark_errors: marks items from job job_name spacified in ids as errors.

/delete: deletes job_name and all items associated with it. Does not clean the output file.

/clean: deletes all items assiciated with job_name.

/report: returns the status of job_name and numbers of completed items and currently in flight.

/health_check: checks if the service is healthy.

The client

It is possible to use Planchet by directly querying the API endpoints, but it is much more convenient to use the PlanchetClient object. This section will briefly show how to create and execute a regular job and how a to change it to a repair job using the client. For a full description of all methods, refer to the source documentation.

Example

from planchet import PlanchetClient
from tqdm import tqdm

PLANCHET_HOST = '0.0.0.0'  # <--- CHANGE IF NEEDED
PLANCHET_PORT = 5005  #  <-- CHANGE IF NEEDED

url = f'http://{PLANCHET_HOST}:{PLANCHET_PORT}'
client = PlanchetClient(url)

job_name = 'regular-job'
metadata = {
    'input_file_path': '/data/data.jsonl',
    'output_file_path': '/data/output.jsonl'
}

# make sure you don't use the clean_start option here
# to make this a REPAIR JOB, set --> cont=True
client.start_job(job_name, metadata, 'JsonlReader', writer_name='JsonlWriter')

# make sure the number of items is large enough to avoid blocking the server
n_items = 100
items = client.get(job_name, n_items)

while items:
    processed = []
    print('Processing item batch...')
    for id_, item in tqdm(items):
        item['hash'] = hash(item['text'])
        processed.append((id_, item))
    client.send(job_name, processed)
    items = client.get(job_name, n_items)