Top sales products

Description

This is a basic example using dsf. We will create a top sales product ranking from purchasing log data and extract top 3 from the ranking.

In this example, you can learn how to use command line tools and some of most basic of calculation components.

Flow

  • Creating project
  • Preparing data sources and files
  • Creating workflow
  • Defining tasks in workflow
  • Executing

Creating project

At first, we have to create project. In this example, We will use dsfg command to create project.

mkdir -p ~/Projects/top_sales_example

dsfg project \
    --name top_sales_example \
    --owner "Yuki Chiba" \
    --start-date 2017-08-01 \
    --root-dir ~/Projects/top_sales_example \
    --dwh-root ~/Projects/top_sales_example/dw

ls ~/Projects/top_sales_example
# > top_sales_example   dw  profile

Preparing data sources and files

After creating project, we can prepare data sources and data files.

Please download sample data from dummy_data.zip.

tar xf dummy_data.zip

ls dummy_data
# > dummy_customers.csv dummy_products.csv  dummy_transactions.csv

Extracted files have schema below:

customers.csv
name type
customer_id int
gender int
birthday datetime
products.csv
name type
product_id int
product_category_id int
transactions.csv
name type
shop_id int
customer_id int
datetime datetime
receipt_cd int
count int
unit_price int
product_id int
unit_cost float

To register schema to project, we have to edit top_sales_example/source/files.py.

top_sales_example/top_sales_example/source/files.py
from top_sales_example.source.filesource import FileSource
from abeja_dsf import IntIdSeries, TimestampSeries, IntValueSeries, ValueSeries
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader

files = [
    FileSource('dummy_customers',
               DSFCSVLoader([
                   IntIdSeries('customer_id'),
                   IntIdSeries('gender'),
                   TimestampSeries('birthday')
               ]),
               ext='.csv'),
    FileSource('dummy_products',
               DSFCSVLoader([
                   IntIdSeries('product_id'),
                   IntIdSeries('product_category_id')
               ]),
               ext='.csv'),
    FileSource('dummy_transactions',
               DSFCSVLoader([
                   IntIdSeries('shop_id'),
                   IntIdSeries('customer_id'),
                   TimestampSeries('datetime'),
                   IntIdSeries('receipt_cd'),
                   IntValueSeries('count'),
                   IntValueSeries('unit_price'),
                   IntIdSeries('product_id'),
                   ValueSeries('unit_cost')
               ]),
               ext='.csv')]

def get_file(name: str) -> FileSource:
    for f in files:
        if f.name == name:
            return f

To make files ready to load, we have to put them into dw/top_sales_example/source/static directory.

mv dummy_data/* dw/top_sales_example/source/static

Creating workflow

In this example, we will prepare three workflow:

  • load_master_workflow to load dummy_customers and dummy_products.
  • load_receipt_workflow to load dummy_transactions.
  • analysis_product_top_sales to calculate top sales product ranking.

We will use dsfg command to create them.

dsfg workflow \
    --project-name top_sales_example \
    --root-dir ~/Projects/top_sales_example \
    --namespace load \
    --workflow-name master

dsfg workflow \
    --project-name top_sales_example \
    --root-dir ~/Projects/top_sales_example \
    --namespace load \
    --workflow-name receipt

dsfg workflow \
    --project-name top_sales_example \
    --root-dir ~/Projects/top_sales_example \
    --namespace analysis.product \
    --workflow-name top_sales

cat top_sales_example/catalogue.py
# ># workflow
# >from top_sales_example.project import top_sales_example_project
# >
# >
# >from top_sales_example.workflow.analysis.product.top_sales import AnalysisProductTopSales
# >analysis_product_top_sales = top_sales_example_project.workflow('analysis_product_top_sales', AnalysisProductTopSales)
# >
# >
# >from top_sales_example.workflow.load.master import LoadMaster
# >load_master = top_sales_example_project.workflow('load_master', LoadMaster)
# >
# >
# >from top_sales_example.workflow.load.receipt import LoadReceipt
# >load_receipt = top_sales_example_project.workflow('load_receipt', LoadReceipt)

Defining tasks in workflow

After creating workfow, we will edit them to define tasks.

load_master_workflow

To define tasks in load_master_workflow, we have edit LoadMaster class in top_sales_example/workflow/load/master.py.

top_sales_example/workflow/load/master.py
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter


class LoadMaster(DSFBatchWorkflow):
    start_date = abeja_dsf.config.top_sales_example.start_date

    def _def_wp(self):
        super(LoadMaster, self)._def_wp()

    def _setup_wp(self):
        super(LoadMaster, self)._setup_wp()

    def _setup_loaders(self, execution_date: DSFLazyParameter):
        from top_sales_example.source.files import get_file
        self.customers_loader = get_file('dummy_customers').get_static_file_input()
        self.products_loader = get_file('dummy_products').get_static_file_input()

    def _setup_tasks(self):
        load_customers = self.loader('load_customers', self.customers_loader)
        load_products = self.loader('load_products', self.products_loader)

We added loaders in _setup_loaders method using top_sales_example.source.files.get_file.

And We added loader task definitions in _setup_tasks method using self.loader.

load_receipt_workflow

To define tasks in load_receipt_workflow, we have edit LoadReceipt class in top_sales_example/workflow/load/receipt.py.

top_sales_example/workflow/load/master.py
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter


class LoadReceipt(DSFBatchWorkflow):
    start_date = abeja_dsf.config.top_sales_example.start_date

    def _def_wp(self):
        super(LoadReceipt, self)._def_wp()

    def _setup_wp(self):
        super(LoadReceipt, self)._setup_wp()

    def _setup_loaders(self, execution_date: DSFLazyParameter):
        from top_sales_example.source.files import get_file
        self.transaction_loader = get_file('dummy_transactions').get_static_file_input()

    def _setup_tasks(self):
        load_transaction = self.loader('load_transaction', self.transaction_loader)

The code above is very similar as LoadMaster class.

analysis_product_top_sales_workflow

To define tasks in analysis_product_top_sales_workflow, we have edit AnalysisProductTopSales class in top_sales_example/workflow/analysis/product/top_sales.py.

top_sales_example/workflow/analysis/product/top_sales.py
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter

class AnalysisProductRanking(DSFBatchWorkflow):
    start_date = abeja_dsf.config.top_sales_example.start_date

    def _def_wp(self):
        super(AnalysisProductRanking, self)._def_wp()

    def _setup_wp(self):
        super(AnalysisProductRanking, self)._setup_wp()

    def _setup_loaders(self, execution_date: DSFLazyParameter):
        pass

    def _setup_tasks(self):
        from top_sales_example.project import load_receipt
        load_transaction = self.require(load_receipt, 'load_transaction')

        from abeja_dsf_opkg_df.components.aggregate import DSFDFAggregate
        from abeja_dsf_opkg_math.components.op import DSFMathOp
        from abeja_dsf_opkg_math.support_types.column_value import ColVal
        from abeja_dsf.core.calculation.data_type.dataframe import ColDef
        from abeja_dsf_opkg_df.components.aggregate import AggRule
        from abeja_dsf_opkg_df.support_types.aggregation_methods import agg_sum
        from abeja_dsf_opkg_df.components.rank import DSFDFRank

        # calculate sales as (sales = unit_price * count)
        sales = self.task('sales', DSFMathOp(DSFMathOp.Arguments(
            expression=ColVal('unit_price') * ColVal('count'),
            output=ColDef('sales', 'float', 'quantity')
        )))

        # calculate total_sales as (total_sales = sum of sales) for each product
        total_sales = self.task('total_sales', DSFDFAggregate(DSFDFAggregate.Arguments(
            rules=[AggRule('sales', agg_sum, 'total_sales')],
            keys=['product_id']
        )))

        # calculate ranking from total_sales
        total_sales_rank = self.task('total_sales_rank', DSFDFRank(DSFDFRank.Arguments(
            column_name='total_sales', output_column_name='total_sales_rank',
            method='first', ascending=False
        )))

        # extract top 3
        from abeja_dsf_opkg_df.components.filter import DSFDFFilter
        top3 = self.task('top3', DSFDFFilter(DSFDFFilter.Arguments(
            expression=ColVal('total_sales_rank') <= 3.0
        )))

        sales(load_transaction)
        total_sales(sales)
        total_sales_rank(total_sales)
        top3(total_sales_rank)

        from abeja_dsf.core.io.protocol_loader_io import DSFProtocolLoaderOut
        from abeja_dsf.core.io.protocols.file import DSFFileProtocol
        from abeja_dsf.core.io.loaders.csv import DSFCSVLoader

        # output result
        self.terminal('write_top3',
                      DSFProtocolLoaderOut(
                          DSFFileProtocol('top3.csv', 'w'),
                          DSFCSVLoader()),
                      [top3])

In the code above, there are many tasks that created by self.task, self.require, self.terminal.

You can import tasks in other workflow by using self.require, and you can define calculation task by using self.task.

And you can export result of task by using self.terminal.

The table below describe all tasks.

task name component description
load_transaction - importing from load_receipt workflow
sales DSFMathOp calculating sales = unit_price * count
total_sales DSFDFAggregate calculating total_sales = sum(sales) for each product
total_sales_rank DSFDFRank ranking product with total_sales
top3 DSFDFFilter filtering out products which have lower rank than 3 (1 is highest rank)
write_top3 - exporting top3 data to file as csv

Executing

At last, we will execute workflow that we created.

Before executing we have to create execflow by using dsfg command.

dsfg execflow \
    --project-name top_sales_example

cat execflow_all
# >
# >analysis_product_top_sales
# >
# >load_master
# >
# >load_receipt

And we also have to fix execution order of workflow.

execflow_all
load_master
load_receipt
analysis_product_top_sales

Now, we have done all things to execute. Let’s execute top_sales_example project with dsfrun command!

# for some environment setup
source profile

dsfrun project --project top_sales_example --flow execflow_all --date 2017-08-01