Interface DistCalcFormulaContext

All Superinterfaces:
DistFormulaContext

public interface DistCalcFormulaContext extends DistFormulaContext
DistCalcFormulaContext is a groovy api exposed through the 'dist' binding when running a DistributedCalculation DL (PA DataLoad job). A DL of this type runs on multiple stages:

  • Initialization: In this stage, the job executes the elements of the logic with calculation context 'calculation-init', as well as those that do not have a calculation context set. This step, executed on the master node/pod, should create the items/batches (DLCIs) that are to be calculated, in parallel, in the nest stage. It does not make sense to build caches here as the worker nodes/pods that calculate the batches do not have access to them. See addOrUpdateCalcItem(Object, Object, Object) for sample code.
  • Calculation: In this stage, the job executes the elements of the logic with calculation context equal to 'calculation' or null In fact, a number of worker nodes/pods are each given one calculation item (DLCI) at the time, and the mentioned sub-section of the logic is evaluated once per item. The calculation results are stored in the calculation item itself - see DistFormulaContext.getCalcItem(), while the rows that were generated (see getDataLoader()) are uploaded to the DL's Target table directly (see DistFormulaContext.getLoadMode()). It is highly recommended that no dependencies are assumed or created between item calculations, as there is no determined order of execution. In this step, caches can be build up to benefit the calculation, ideally limited to the scope of an individual calculation item. For example, if there is a calculation item for each ProductGroup, then product related caches can be limited to that ProductGroup. If caches are to be preserved across item calculations, then they can be put in the api.global map, but be mindful of the fact that a worker node/pod does not know upfront which items it will be processing. Note: globals are preserved on a given worker, not shared across workers.
  • Summary: In this stage, the job executes the elements of the logic with calculation context equal to 'calculation-summary' or null. This step, executed on the master node/pod, is performed after all calculation items have been processed by the workers. Typically it is used to produce metrics about the calculation, or to do some validation of the results - see also getResultTable(). Note that the globals from the Calculation step are *not* available in this context.

When all the stages have completed successfully, and the logic did not return an error for any of the calculation items, then the data in the Result table is uploaded to the target, in line with the DL's LoadMode In all other cases, the target remains unmodified, and the rows in the Result table, as well as the calculation results on item level, can be inspected to understand the reason for the job failure. Profiler breakdowns are also available on calculation item level, as are calculation messages.
  • Method Details

    • getResultTable

      DatamartContext.Table getResultTable()
      A Distributed Calculation stores the calculated rows in a table specific to the DL, and separate from the target table. Only when the calculation is successful, is this data uploaded to the target in the manner defined by the LoadMode. It may be useful to have access to the calculated rows, for example in calculation-summary elements of the logic, to distill summary metrics or perform validation.
      Returns:
      The table in which the DL stores the calculated rows. The table can be used as a source to a groovy query.
    • addOrUpdateCalcItem

      Object addOrUpdateCalcItem(Object key1, Object key2, Object scope)
      Creates a new or updates an existing calculation item (entry in the Model CalcItems PP) for a given (key1,key2) combination.)

      Sample code:
       // always recreate the CalcItems as the number of batches may vary
       // from one run to another
       dist.deleteCalcItems()
      
       def ctx = api.datamartContext
       def batchFilters = ctx.batchFilters(source, null, 200000)
      
       batchFilters.eachWithIndex{ filter,i ->
         dist.addOrUpdateCalcItem("batch", i, filter)
       }
       return "Created ${batchFilters.size()} batches"
       
      Note: Often, filters based on fields with actual business meaning are more suited to defining batches. If, for example, data can be partitioned by a product or customer attribute, then caches can also benefit from this partitioning scheme, in order to limit their sizes.
      Parameters:
      key1 - Key1 of the calculation item. Corresponds to the pre-12 Key1 fields (upper case K)
      key2 - Key2 of the calculation item. Corresponds to the pre-12 Key2 fields (upper case K)
      scope - A JSON-serializable scope of calculation item. Usually a Filter object. This field corresponds to the pre-12 Value field, which like Key1 and Key2 can still be used for backwards compatibility, but new or updated logics should reference scope instead.
      Returns:
      THe map representation of the calculation item just added or updated.
    • findCalcItems

      List<?> findCalcItems(Filter filter)
      Deprecated.
      Provided for backwards compatibility only. It is discouraged to configure behaviour where the processing of a calculation item (batch) is dependent on some other calculation item(s).
      Parameters:
      filter -
      Returns:
      List of maps, representing the calculation items in scope of the provided filter
    • deleteCalcItems

      @Deprecated int deleteCalcItems(Filter... filters)
      Deprecated.
      As calcItems are now created on JST level, i.e. for each run of the distributed DDL, it is no longer needed, or required, to delete previously created calcItems as there will be none. This method is still included to not break existing logics, where it was common to delete the calcItems in the 'init' stage of the job.
      Parameters:
      filters - Optional filters when not all calcItems are to be deleted
      Returns:
      The number of items deleted
    • getDataLoader

      Provides access to the DataLoader instance that allows adding rows to the Result table linked to the DL.

      Sample code:
       def ctx = api.datamartContext
       def target = ctx.getFieldCollection(dist.target)
      
       // best to avoid calling dist.calcItem multiple times,
       // to avoid multiple serializations of the object
       def calcItem = dist.calcItem
      
       // a value that allows us to identify which calculation item generated
       // a given row in the Result table
       def calcItemKey = dist.calcItem.Key2
      
       def filter = api.filterFromMap(calcItem.scope
       def batch = ctx.newQuery(source, false)
                    .selectAll(true)
                    .where(filter)
      
       // consumeData can be used as well but only when the closure does not run
       // additional PA queries, as this would result in nested query executions,
       // which is not allowed.
       def result = ctx.streamQuery(batch)
       while (result.next()) {
            def row = result.get()
      
            // mapping logic assumed to be implemented in the Fieldapping element.
            // all the key fields need to be populated!
            def targetRow = FieldMapping.map(row)
      
            // CalcItem is a standard field in the Result table,
            // useful for testing and debugging
            targetRow.CalcItem = calcItemKey
            loader.addRow(targetRow)
       }
       
      Returns: