Create First Project

Create first project using command line tool

We can create project using dsfg command. This command is installed with dsf_core automatically.

If you can’t find dsfg command, then please check our installtion guide again.

dsfg project \
    --name {project name} \
    --owner {your name} \
    --start-date {project start date by iso format} \
    --root-dir {your project directory} \
    --dwh-root {your data store directory}

If you want more detail of project, please check here.

Set up your data source

Next, you have to register your data source to project. In here, document explains in focus on using csv file.

Please edit source/filesource.py in generated project directory.

from abeja_dsf import IntIdSeries, TimestampSeries, IntValueSeries, ValueSeries
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader

files = [
    FileSource('some_data',
               DSFCSVLoader(schema=[
                   IntIdSeries('integer_id_column'),
                   TimestampSeries('timestamp_columns'),
                   IntValueSeries('integer_value_column'),
                   ValueSeries('float_value_column')]),
               ext='.csv')
]

Please customize schema for DSFCSVLoader according to your data.

If you want more information about schema, please check here.

Setup your data file

After register your data source, let’s put your file to source data store.

cp some_file.csv {your data storedirectory}/{project name}/source/static/{file source name}.csv

Create analysis workflow

To create workflow, we can use dsfg command again.

dsfg workflow \
    --project-name {project name}\
    --root-dir {your project directory}\
    --namespace {workflow name space split by .}\
    --workflow-name {workflow name}

This command create new workflow class new launcher script in your project direcotry and also it changes catalogue.py automatically.

If you want more detail of workflow, please check here.

Setup loader to load your data source

To setup loader, please edit _setup_loaders method in workflow class file in your workflow directory.

    def _setup_loaders(self, execution_date: DSFLazyParameter):
        from {project name}.source.files import get_file
        self.some_data_loader = get_file('some_data').get_static_file_input()

Setup analysis tasks

Now you can write analysis task in your workflow class.

Let’s edit _setup_tasks method.

    def _setup_tasks(self):
        load_some_data = self.loader('load_some_data', self.some_data_loader)

        from abeja_dsf_opkg_df.components.aggregate import DSFDFAggregate
        from abeja_dsf_opkg_math.components.op import DSFMathOp
        from abeja_dsf_opkg_math.support_types.column_value import ColVal
        from abeja_dsf.core.calculation.data_type.dataframe import ColDef
        from abeja_dsf_opkg_df.components.aggregate import AggRule
        from abeja_dsf_opkg_df.support_types.aggregation_methods import agg_sum
        from abeja_dsf_opkg_df.components.rank import DSFDFRank
        from abeja_dsf_opkg_df.components.filter import DSFDFFilter

        
        # calculate sales as `sales = unit_price * count`
        sales = self.task('sales', DSFMathOp(DSFMathOp.Arguments(
            expression=ColVal('unit_price') * ColVal('count'),
            output=ColDef('sales', 'float', 'quantity')
        )))

        # calculate total_sales as `total_sales = sum of sales` for each product
        total_sales = self.task('total_sales', DSFDFAggregate(DSFDFAggregate.Arguments(
            rules=[AggRule('sales', agg_sum, 'total_sales')],
            keys=['product_id']
        )))

        # calculate ranking from total_sales
        total_sales_rank = self.task('total_sales_rank', DSFDFRank(DSFDFRank.Arguments(
            column_name='total_sales', output_column_name='total_sales_rank',
            method='first', ascending=False
        )))

        # extract top 3
        top3 = self.task('top3', DSFDFFilter(DSFDFFilter.Arguments(
            expression=ColVal('total_sales_rank') <= 3.0
        )))

        # set dependence relationship
        sales(load_transaction)
        total_sales(sales)
        total_sales_rank(total_sales)
        top3(total_sales_rank)

        from abeja_dsf.core.io.protocol_loader_io import DSFProtocolLoaderOut
        from abeja_dsf.core.io.protocols.file import DSFFileProtocol
        from abeja_dsf.core.io.loaders.csv import DSFCSVLoader

        # output result
        self.terminal('write_top3',
                      DSFProtocolLoaderOut(
                          DSFFileProtocol('top3.csv', 'w'),
                          DSFCSVLoader()),
                      [top3])

This is workflow example to load data and extract top 3 sold products.

What are written in example is:

  • load some_data using self.loader method
  • import many important variables
  • create calculation task using self.task method
  • fill how to calculation with calculation component like DSFAggregate or DSFMathOp
  • define task dependencies using task(required_task) syntax

For mere detail of task, please check here and for more about calculation component, please check here.

Execute workflow

To execute workflow, we can use dsfrun workflow command.

# configure some environment variables
source profile

# and execute
dsfrun workflow \
    --project {project_name} \
    --workflow {workflow full name} \
    --date {execution date by iso format}

To check workflow full name, please see catalogue.py.

For more detail of dsfrun command, please check here.


All of quick start is done. Let’s enjoy data analysis with DSF with more documents.