This is a basic example using dsf. We will create a top sales product ranking from purchasing log data and extract top 3 from the ranking.
In this example, you can learn how to use command line tools and some of most basic of calculation components.
At first, we have to create project. In this example, We will use dsfg command to create project.
mkdir -p ~/Projects/top_sales_example
dsfg project \
--name top_sales_example \
--owner "Yuki Chiba" \
--start-date 2017-08-01 \
--root-dir ~/Projects/top_sales_example \
--dwh-root ~/Projects/top_sales_example/dw
ls ~/Projects/top_sales_example
# > top_sales_example dw profile
After creating project, we can prepare data sources and data files.
Please download sample data from dummy_data.zip.
tar xf dummy_data.zip
ls dummy_data
# > dummy_customers.csv dummy_products.csv dummy_transactions.csv
Extracted files have schema below:
name | type |
---|---|
customer_id | int |
gender | int |
birthday | datetime |
name | type |
---|---|
product_id | int |
product_category_id | int |
name | type |
---|---|
shop_id | int |
customer_id | int |
datetime | datetime |
receipt_cd | int |
count | int |
unit_price | int |
product_id | int |
unit_cost | float |
To register schema to project, we have to edit top_sales_example/source/files.py
.
from top_sales_example.source.filesource import FileSource
from abeja_dsf import IntIdSeries, TimestampSeries, IntValueSeries, ValueSeries
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader
files = [
FileSource('dummy_customers',
DSFCSVLoader([
IntIdSeries('customer_id'),
IntIdSeries('gender'),
TimestampSeries('birthday')
]),
ext='.csv'),
FileSource('dummy_products',
DSFCSVLoader([
IntIdSeries('product_id'),
IntIdSeries('product_category_id')
]),
ext='.csv'),
FileSource('dummy_transactions',
DSFCSVLoader([
IntIdSeries('shop_id'),
IntIdSeries('customer_id'),
TimestampSeries('datetime'),
IntIdSeries('receipt_cd'),
IntValueSeries('count'),
IntValueSeries('unit_price'),
IntIdSeries('product_id'),
ValueSeries('unit_cost')
]),
ext='.csv')]
def get_file(name: str) -> FileSource:
for f in files:
if f.name == name:
return f
To make files ready to load, we have to put them into dw/top_sales_example/source/static
directory.
mv dummy_data/* dw/top_sales_example/source/static
In this example, we will prepare three workflow:
load_master_workflow
to load dummy_customers and dummy_products.load_receipt_workflow
to load dummy_transactions.analysis_product_top_sales
to calculate top sales product ranking.We will use dsfg command to create them.
dsfg workflow \
--project-name top_sales_example \
--root-dir ~/Projects/top_sales_example \
--namespace load \
--workflow-name master
dsfg workflow \
--project-name top_sales_example \
--root-dir ~/Projects/top_sales_example \
--namespace load \
--workflow-name receipt
dsfg workflow \
--project-name top_sales_example \
--root-dir ~/Projects/top_sales_example \
--namespace analysis.product \
--workflow-name top_sales
cat top_sales_example/catalogue.py
# ># workflow
# >from top_sales_example.project import top_sales_example_project
# >
# >
# >from top_sales_example.workflow.analysis.product.top_sales import AnalysisProductTopSales
# >analysis_product_top_sales = top_sales_example_project.workflow('analysis_product_top_sales', AnalysisProductTopSales)
# >
# >
# >from top_sales_example.workflow.load.master import LoadMaster
# >load_master = top_sales_example_project.workflow('load_master', LoadMaster)
# >
# >
# >from top_sales_example.workflow.load.receipt import LoadReceipt
# >load_receipt = top_sales_example_project.workflow('load_receipt', LoadReceipt)
After creating workfow, we will edit them to define tasks.
To define tasks in load_master_workflow, we have edit LoadMaster
class in top_sales_example/workflow/load/master.py
.
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter
class LoadMaster(DSFBatchWorkflow):
start_date = abeja_dsf.config.top_sales_example.start_date
def _def_wp(self):
super(LoadMaster, self)._def_wp()
def _setup_wp(self):
super(LoadMaster, self)._setup_wp()
def _setup_loaders(self, execution_date: DSFLazyParameter):
from top_sales_example.source.files import get_file
self.customers_loader = get_file('dummy_customers').get_static_file_input()
self.products_loader = get_file('dummy_products').get_static_file_input()
def _setup_tasks(self):
load_customers = self.loader('load_customers', self.customers_loader)
load_products = self.loader('load_products', self.products_loader)
We added loaders in _setup_loaders
method using top_sales_example.source.files.get_file
.
And We added loader task definitions in _setup_tasks
method using self.loader
.
To define tasks in load_receipt_workflow, we have edit LoadReceipt
class in top_sales_example/workflow/load/receipt.py
.
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter
class LoadReceipt(DSFBatchWorkflow):
start_date = abeja_dsf.config.top_sales_example.start_date
def _def_wp(self):
super(LoadReceipt, self)._def_wp()
def _setup_wp(self):
super(LoadReceipt, self)._setup_wp()
def _setup_loaders(self, execution_date: DSFLazyParameter):
from top_sales_example.source.files import get_file
self.transaction_loader = get_file('dummy_transactions').get_static_file_input()
def _setup_tasks(self):
load_transaction = self.loader('load_transaction', self.transaction_loader)
The code above is very similar as LoadMaster
class.
To define tasks in analysis_product_top_sales_workflow, we have edit AnalysisProductTopSales
class in top_sales_example/workflow/analysis/product/top_sales.py
.
import abeja_dsf
from abeja_dsf.batch.batch_workflow import DSFBatchWorkflow
from abeja_dsf.core.calculation.support_types.lazy_parameter import DSFLazyParameter
class AnalysisProductRanking(DSFBatchWorkflow):
start_date = abeja_dsf.config.top_sales_example.start_date
def _def_wp(self):
super(AnalysisProductRanking, self)._def_wp()
def _setup_wp(self):
super(AnalysisProductRanking, self)._setup_wp()
def _setup_loaders(self, execution_date: DSFLazyParameter):
pass
def _setup_tasks(self):
from top_sales_example.project import load_receipt
load_transaction = self.require(load_receipt, 'load_transaction')
from abeja_dsf_opkg_df.components.aggregate import DSFDFAggregate
from abeja_dsf_opkg_math.components.op import DSFMathOp
from abeja_dsf_opkg_math.support_types.column_value import ColVal
from abeja_dsf.core.calculation.data_type.dataframe import ColDef
from abeja_dsf_opkg_df.components.aggregate import AggRule
from abeja_dsf_opkg_df.support_types.aggregation_methods import agg_sum
from abeja_dsf_opkg_df.components.rank import DSFDFRank
# calculate sales as (sales = unit_price * count)
sales = self.task('sales', DSFMathOp(DSFMathOp.Arguments(
expression=ColVal('unit_price') * ColVal('count'),
output=ColDef('sales', 'float', 'quantity')
)))
# calculate total_sales as (total_sales = sum of sales) for each product
total_sales = self.task('total_sales', DSFDFAggregate(DSFDFAggregate.Arguments(
rules=[AggRule('sales', agg_sum, 'total_sales')],
keys=['product_id']
)))
# calculate ranking from total_sales
total_sales_rank = self.task('total_sales_rank', DSFDFRank(DSFDFRank.Arguments(
column_name='total_sales', output_column_name='total_sales_rank',
method='first', ascending=False
)))
# extract top 3
from abeja_dsf_opkg_df.components.filter import DSFDFFilter
top3 = self.task('top3', DSFDFFilter(DSFDFFilter.Arguments(
expression=ColVal('total_sales_rank') <= 3.0
)))
sales(load_transaction)
total_sales(sales)
total_sales_rank(total_sales)
top3(total_sales_rank)
from abeja_dsf.core.io.protocol_loader_io import DSFProtocolLoaderOut
from abeja_dsf.core.io.protocols.file import DSFFileProtocol
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader
# output result
self.terminal('write_top3',
DSFProtocolLoaderOut(
DSFFileProtocol('top3.csv', 'w'),
DSFCSVLoader()),
[top3])
In the code above, there are many tasks that created by self.task
, self.require
, self.terminal
.
You can import tasks in other workflow by using self.require
, and you can define calculation task by using self.task
.
And you can export result of task by using self.terminal
.
The table below describe all tasks.
task name | component | description |
---|---|---|
load_transaction | - | importing from load_receipt workflow |
sales | DSFMathOp | calculating sales = unit_price * count |
total_sales | DSFDFAggregate | calculating total_sales = sum(sales) for each product |
total_sales_rank | DSFDFRank | ranking product with total_sales |
top3 | DSFDFFilter | filtering out products which have lower rank than 3 (1 is highest rank) |
write_top3 | - | exporting top3 data to file as csv |
At last, we will execute workflow that we created.
Before executing we have to create execflow
by using dsfg
command.
dsfg execflow \
--project-name top_sales_example
cat execflow_all
# >
# >analysis_product_top_sales
# >
# >load_master
# >
# >load_receipt
And we also have to fix execution order of workflow.
load_master
load_receipt
analysis_product_top_sales
Now, we have done all things to execute.
Let’s execute top_sales_example
project with dsfrun
command!
# for some environment setup
source profile
dsfrun project --project top_sales_example --flow execflow_all --date 2017-08-01