We can create project using dsfg command. This command is installed with dsf_core automatically.
If you can’t find dsfg command, then please check our installtion guide again.
dsfg project \
--name {project name} \
--owner {your name} \
--start-date {project start date by iso format} \
--root-dir {your project directory} \
--dwh-root {your data store directory}
If you want more detail of project, please check here.
Next, you have to register your data source to project. In here, document explains in focus on using csv file.
Please edit source/filesource.py in generated project directory.
from abeja_dsf import IntIdSeries, TimestampSeries, IntValueSeries, ValueSeries
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader
files = [
FileSource('some_data',
DSFCSVLoader(schema=[
IntIdSeries('integer_id_column'),
TimestampSeries('timestamp_columns'),
IntValueSeries('integer_value_column'),
ValueSeries('float_value_column')]),
ext='.csv')
]
Please customize schema for DSFCSVLoader according to your data.
If you want more information about schema, please check here.
After register your data source, let’s put your file to source data store.
cp some_file.csv {your data storedirectory}/{project name}/source/static/{file source name}.csv
File name must be same as first argument of FileSource.
And extension must be same as third argument of FileSource.
To create workflow, we can use dsfg command again.
dsfg workflow \
--project-name {project name}\
--root-dir {your project directory}\
--namespace {workflow name space split by .}\
--workflow-name {workflow name}
This command create new workflow class new launcher script in your project direcotry
and also it changes catalogue.py automatically.
If you want more detail of workflow, please check here.
To setup loader, please edit _setup_loaders method in workflow class file in your workflow directory.
def _setup_loaders(self, execution_date: DSFLazyParameter):
from {project name}.source.files import get_file
self.some_data_loader = get_file('some_data').get_static_file_input()
Now you can write analysis task in your workflow class.
Let’s edit _setup_tasks method.
def _setup_tasks(self):
load_some_data = self.loader('load_some_data', self.some_data_loader)
from abeja_dsf_opkg_df.components.aggregate import DSFDFAggregate
from abeja_dsf_opkg_math.components.op import DSFMathOp
from abeja_dsf_opkg_math.support_types.column_value import ColVal
from abeja_dsf.core.calculation.data_type.dataframe import ColDef
from abeja_dsf_opkg_df.components.aggregate import AggRule
from abeja_dsf_opkg_df.support_types.aggregation_methods import agg_sum
from abeja_dsf_opkg_df.components.rank import DSFDFRank
from abeja_dsf_opkg_df.components.filter import DSFDFFilter
# calculate sales as `sales = unit_price * count`
sales = self.task('sales', DSFMathOp(DSFMathOp.Arguments(
expression=ColVal('unit_price') * ColVal('count'),
output=ColDef('sales', 'float', 'quantity')
)))
# calculate total_sales as `total_sales = sum of sales` for each product
total_sales = self.task('total_sales', DSFDFAggregate(DSFDFAggregate.Arguments(
rules=[AggRule('sales', agg_sum, 'total_sales')],
keys=['product_id']
)))
# calculate ranking from total_sales
total_sales_rank = self.task('total_sales_rank', DSFDFRank(DSFDFRank.Arguments(
column_name='total_sales', output_column_name='total_sales_rank',
method='first', ascending=False
)))
# extract top 3
top3 = self.task('top3', DSFDFFilter(DSFDFFilter.Arguments(
expression=ColVal('total_sales_rank') <= 3.0
)))
# set dependence relationship
sales(load_transaction)
total_sales(sales)
total_sales_rank(total_sales)
top3(total_sales_rank)
from abeja_dsf.core.io.protocol_loader_io import DSFProtocolLoaderOut
from abeja_dsf.core.io.protocols.file import DSFFileProtocol
from abeja_dsf.core.io.loaders.csv import DSFCSVLoader
# output result
self.terminal('write_top3',
DSFProtocolLoaderOut(
DSFFileProtocol('top3.csv', 'w'),
DSFCSVLoader()),
[top3])
This is workflow example to load data and extract top 3 sold products.
What are written in example is:
self.loader methodself.task methodDSFAggregate or DSFMathOptask(required_task) syntaxFor mere detail of task, please check here and for more about calculation component, please check here.
To execute workflow, we can use dsfrun workflow command.
# configure some environment variables
source profile
# and execute
dsfrun workflow \
--project {project_name} \
--workflow {workflow full name} \
--date {execution date by iso format}
To check workflow full name, please see catalogue.py.
For more detail of dsfrun command, please check here.
All of quick start is done. Let’s enjoy data analysis with DSF with more documents.