Package net.pricefx.formulaengine
Interface DistCalcFormulaContext
- All Superinterfaces:
DistFormulaContext
DistCalcFormulaContext is a groovy api exposed through the 'dist' binding when running a
DistributedCalculation DL (PA DataLoad job). A DL of this type runs on multiple stages:
-
Initialization: In this stage, the job executes the elements of the logic with
calculation context 'calculation-init', as well as those that do not have a
calculation context set. This step, executed on the master node/pod, should
create the items/batches (DLCIs) that are to be calculated, in parallel, in the
nest stage. It does not make sense to build caches here as the worker nodes/pods
that calculate the batches do not have access to them.
See
addOrUpdateCalcItem(Object, Object, Object)
for sample code. -
Calculation: In this stage, the job executes the elements of the logic with
calculation context equal to 'calculation' or null In fact, a number of worker
nodes/pods are each given one calculation item (DLCI) at the time, and the
mentioned sub-section of the logic is evaluated once per item.
The calculation results are stored in the calculation item
itself - see
DistFormulaContext.getCalcItem()
, while the rows that were generated (seegetDataLoader()
) are uploaded to the DL's Target table directly (seeDistFormulaContext.getLoadMode()
). It is highly recommended that no dependencies are assumed or created between item calculations, as there is no determined order of execution. In this step, caches can be build up to benefit the calculation, ideally limited to the scope of an individual calculation item. For example, if there is a calculation item for each ProductGroup, then product related caches can be limited to that ProductGroup. If caches are to be preserved across item calculations, then they can be put in the api.global map, but be mindful of the fact that a worker node/pod does not know upfront which items it will be processing. Note: globals are preserved on a given worker, not shared across workers. -
Summary: In this stage, the job executes the elements of the logic with
calculation context equal to 'calculation-summary' or null.
This step, executed on the master node/pod, is performed after all calculation
items have been processed by the workers. Typically it is used to produce
metrics about the calculation, or to do some validation of the results - see also
getResultTable()
. Note that the globals from the Calculation step are *not* available in this context.
-
Method Summary
Modifier and TypeMethodDescriptionaddOrUpdateCalcItem
(Object key1, Object key2, Object scope) Creates a new or updates an existing calculation item (entry in the Model CalcItems PP) for a given(key1,key2)
combination.)int
deleteCalcItems
(Filter... filters) Deprecated.As calcItems are now created on JST level, i.e.List
<?> findCalcItems
(Filter filter) Deprecated.Provided for backwards compatibility only.Provides access to the DataLoader instance that allows adding rows to the Result table linked to the DL.A Distributed Calculation stores the calculated rows in a table specific to the DL, and separate from the target table.Methods inherited from interface DistFormulaContext
getAllocationFields, getCalcItem, getIncLoadDate, getLoadMode, getTarget, getTargetFields, isIncremental
-
Method Details
-
getResultTable
DatamartContext.Table getResultTable()A Distributed Calculation stores the calculated rows in a table specific to the DL, and separate from the target table. Only when the calculation is successful, is this data uploaded to the target in the manner defined by the LoadMode. It may be useful to have access to the calculated rows, for example in calculation-summary elements of the logic, to distill summary metrics or perform validation.- Returns:
- The table in which the DL stores the calculated rows. The table can be used as a source to a groovy query.
-
addOrUpdateCalcItem
Creates a new or updates an existing calculation item (entry in the Model CalcItems PP) for a given(key1,key2)
combination.) Sample code:// always recreate the CalcItems as the number of batches may vary // from one run to another dist.deleteCalcItems() def ctx = api.datamartContext def batchFilters = ctx.batchFilters(source, null, 200000) batchFilters.eachWithIndex{ filter,i -> dist.addOrUpdateCalcItem("batch", i, filter) } return "Created ${batchFilters.size()} batches"
Note: Often, filters based on fields with actual business meaning are more suited to defining batches. If, for example, data can be partitioned by a product or customer attribute, then caches can also benefit from this partitioning scheme, in order to limit their sizes.- Parameters:
key1
-Key1
of the calculation item. Corresponds to the pre-12 Key1 fields (upper case K)key2
-Key2
of the calculation item. Corresponds to the pre-12 Key2 fields (upper case K)scope
- A JSON-serializable scope of calculation item. Usually a Filter object. This field corresponds to the pre-12 Value field, which like Key1 and Key2 can still be used for backwards compatibility, but new or updated logics should reference scope instead.- Returns:
- THe map representation of the calculation item just added or updated.
-
findCalcItems
Deprecated.Provided for backwards compatibility only. It is discouraged to configure behaviour where the processing of a calculation item (batch) is dependent on some other calculation item(s).- Parameters:
filter
-- Returns:
- List of maps, representing the calculation items in scope of the provided filter
-
deleteCalcItems
Deprecated.As calcItems are now created on JST level, i.e. for each run of the distributed DDL, it is no longer needed, or required, to delete previously created calcItems as there will be none. This method is still included to not break existing logics, where it was common to delete the calcItems in the 'init' stage of the job.- Parameters:
filters
- Optional filters when not all calcItems are to be deleted- Returns:
- The number of items deleted
-
getDataLoader
DatamartContext.DataLoader getDataLoader()Provides access to the DataLoader instance that allows adding rows to the Result table linked to the DL. Sample code:def ctx = api.datamartContext def target = ctx.getFieldCollection(dist.target) // best to avoid calling dist.calcItem multiple times, // to avoid multiple serializations of the object def calcItem = dist.calcItem // a value that allows us to identify which calculation item generated // a given row in the Result table def calcItemKey = dist.calcItem.Key2 def filter = api.filterFromMap(calcItem.scope def batch = ctx.newQuery(source, false) .selectAll(true) .where(filter) // consumeData can be used as well but only when the closure does not run // additional PA queries, as this would result in nested query executions, // which is not allowed. def result = ctx.streamQuery(batch) while (result.next()) { def row = result.get() // mapping logic assumed to be implemented in the Fieldapping element. // all the key fields need to be populated! def targetRow = FieldMapping.map(row) // CalcItem is a standard field in the Result table, // useful for testing and debugging targetRow.CalcItem = calcItemKey loader.addRow(targetRow) }
- Returns:
-