Interface PipelineStage


public interface PipelineStage
The atomic building bock of QueryApi queries.

A PipelineStage can be seen as a data processing node that

  1. gets data from a source: either a Tables.Table or a PipelineStage
  2. transforms it
  3. provides the transformed data as output which can be used either as input for another pipeline stage or directly fetched using stream(Function)

A PipelineStage can be instantiated via QueryApi.source(Tables.Table, List, Expression), QueryApi.source(Tables.Table, List) or QueryApi.source(Tables.Table) methods.

Once instantiated, another data transformation stage can be applied to its output, such as:

Each of those methods returns a PipelineStage so that multiple of stages can be chained to form a data transformation pipeline.

Once all stages are specified, the stream(Function) method can be used to query and fetch data from the database.

Here is an example of a data pipeline chaining an innerJoin, a filter and finally an aggregation stage.


 def date = new Date()
 def qapi = api.queryApi()
 def products = q.tables().products()
 def pExtCost =  q.tables().productExtensionRows("Cost")
 return qapi.source(products, [products.sku(), products.ProductGroup])
     // joins the "Cost" product extension
     .innerJoin(
         pExtCost,
         { prev -> [pExtCost.Cost, pExtCost.Currency] },
         { prev -> pExtCost.sku().equal(prev.sku) }
     // retain only valid cost entries
     ).filter({ prev ->
         qapi.exprs().and(
             prev.ValidFrom.lessOrEqual(date),
             prev.ValidTo.greaterOrEqual(date)
         )
      })
      // computes the average cost by ProductGroup and Currency
     .aggregateBy(
          {prev -> [prev.ProductGroup, prev.Currency]},
          {prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
      )
      // fetches the result and do something with it
     .stream { it.each { row ->
         // do something with the row
      }}
 
Since:
14.0 - Caribou Lou
  • Method Details

    • innerJoin

      PipelineStage innerJoin(Tables.Table table, Function<Tables.Columns,Collection<? extends Selectable>> selectables, Function<Tables.Columns,Expression> joinCriteria)
      Adds an innerJoin stage that joins this PipelineStage to another Tables.Table.

      As an SQL join:

      • the list of columns or expressions to be joined to the previous pipeline stage should be provided by the selectable parameter.
      • the joining criteria as a boolean expression should also be provided in the join criteria.
        ⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
      Both of those parameters should be constructed via a function that gets as argument an accessor to the previous stage columns.

      Here is an example of a simple innerJoin:

      
       def date = new Date()
       def qapi = api.queryApi()
       def products = qapi.tables().products()
       def pExtCost =  qapi.tables().productExtensionRows("Cost")
       return qapi.source(products, [products.sku(), products.ProductGroup])
           // joins the "Cost" product extension
           .innerJoin(
               pExtCost,
               { prev -> [pExtCost.Cost, pExtCost.Currency]},
               { prev -> qapi.exprs().and(
                   pExtCost.sku().equal(prev.sku),
                   prev.ValidFrom.lessOrEqual(date),
                   prev.ValidTo.greaterOrEqual(date)
               )}
            )
            // computes the average cost by ProductGroup and Currency
           .aggregateBy(
                { prev -> [prev.ProductGroup, prev.Currency]},
                { prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
            )
            // fetches the result and do something with it
           .stream { it.each { row ->
               // do something with the row
            }}
       
      Parameters:
      table - the Table to be joined to
      selectables - a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.
      joinCriteria - a function getting the reference to this stage output columns and returning the joining criteria
      Returns:
      a PipelineStage as a result of applying this innerJoin stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • innerJoin

      Adds an innerJoin stage that joins this PipelineStage to another PipelineStage.

      As an SQL join:

      • the list of columns or expressions to be joined to the previous pipeline stage should be provided by the selectable parameter.
      • the joining criteria as a boolean expression should also be provided by the joinCriteria parameter.
        ⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
      Both of those parameters should be constructed via a function that gets as argument an accessor to the previous stage columns (left) and to the table to join columns (right).

      Here is an example of a simple innerJoin between two pipelines:

      
       def date = new Date()
       def qapi = api.queryApi()
       def products = qapi.tables().products()
       def pExtCost =  qapi.tables().productExtensionRows("Cost")
      
       def avgCostByCateg = qapi.source(pExtCost, [pExtCost.ProductGroup, pExtCost.cost])
                                .aggregateBy(prev -> [prev.ProductGroup], prev -> [prev.ProductGroup, qapi.exprs().avg(prev.cost).as("avgCost"))])
      
      
       return qapi.source(products, [products.sku(), products.ProductGroup])
           // joins the "Cost" product extension
           .innerJoin(
               avgCostByCateg,
               { left, right -> [right.avgCost]},
               { left, right  -> left.ProductGroup.equal(right.ProductGroup)}
            )
            // fetches the result and do something with it
           .stream { it.each { row ->
               // do something with the row
            }}
       
      Parameters:
      pipeline - the Table to be joined to
      selectables - a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.
      joinCriteria - a function getting as argument an accessor to the previous stage columns (left) and to the * joined pipeline columns (right) and returning the joining criteria
      Returns:
      a PipelineStage as a result of applying this innerJoin stage
      Since:
      15.1 - Southside
      See Also:
    • leftOuterJoin

      PipelineStage leftOuterJoin(Tables.Table table, Function<Tables.Columns,Collection<? extends Selectable>> selectables, Function<Tables.Columns,Expression> joinCriteria)
      Adds a leftOuterJoin stage that joins to this PipelineStage another Tables.Table.
      Parameters:
      table - the Table to be joined to
      selectables - a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.
      joinCriteria - the joining criteria
      Returns:
      a PipelineStage as a result of applying this leftOuterJoin stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • leftOuterJoin

      Adds a leftOuterJoin stage that joins to this PipelineStage another PipelineStage.
      Parameters:
      pipeline - the Table to be joined to
      selectables - a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.
      joinCriteria - a function getting as argument an accessor to the previous stage columns (left) and to the * joined pipeline columns (right) and returning the joining criteria
      Returns:
      a PipelineStage as a result of applying this innerJoin stage
      Since:
      15.1 - Southside
      See Also:
    • crossJoin

      PipelineStage crossJoin(Tables.Table table, Function<Tables.Columns,Collection<? extends Selectable>> selectables)
      Adds a crossJoin stage that joins to this PipelineStage another Tables.Table.
      Parameters:
      table - the Table to be joined to
      selectables - a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.
      Returns:
      a PipelineStage as a result of applying this crossJoin stage
      Since:
      15.1 - Southside
    • crossJoin

      Adds a crossJoin stage that joins to this PipelineStage another PipelineStage.
      Parameters:
      pipeline - the Table to be joined to
      selectables - a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.
      Returns:
      a PipelineStage as a result of applying this crossJoin stage
      Since:
      15.1 - Southside
    • union

      PipelineStage union(PipelineStage rightPipeline)
      Adds a union stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this union stage
      Since:
      15.2 - Southside
    • unionAll

      PipelineStage unionAll(PipelineStage rightPipeline)
      Adds a unionAll stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this unionAll stage
      Since:
      15.2 - Southside
    • intersect

      PipelineStage intersect(PipelineStage rightPipeline)
      Adds an intersect stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this intersect stage
      Since:
      15.2 - Southside
    • intersectAll

      PipelineStage intersectAll(PipelineStage rightPipeline)
      Adds an intersectAll stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this intersectAll stage
      Since:
      15.2 - Southside
    • except

      PipelineStage except(PipelineStage rightPipeline)
      Adds an except stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this except stage
      Since:
      15.2 - Southside
    • exceptAll

      PipelineStage exceptAll(PipelineStage rightPipeline)
      Adds an exceptAll stage that combines this PipelineStage to another PipelineStage.
      Parameters:
      rightPipeline - the pipeline to be combined with
      Returns:
      a PipelineStage as a result of applying this exceptAll stage
      Since:
      15.2 - Southside
    • aggregateBy

      Adds a stage that aggregates the outputs of this stage by a given set of expressions.

      It is similar to the standard GROUP BY SQL clause.

      As an example here is a pipeline computing the minimum competition price for each sku

      
       def qapi = api.queryApi()
       def competition = qapi.tables().productExtensionRows("Competition")
       def exprs = qapi.exprs()
      
       return qapi.source(competition, [competition.sku(), competition.Price])
               .aggregateBy({ prev -> [prev.sku] }, { prev -> [prev.sku, qapi.exprs().min(prev.Price).as("MinPrice")] })
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      by - a function getting the reference to this stage output columns and returning the list of expressions to group by
      selectables - a function getting the reference to this stage output columns and returning a list of named aggregated expressions using the reference to the last stage output. QueryApi will accept either aggregated expressions using an aggregation function or a non aggregated expression belonging to the result of the by parameter
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • aggregate

      PipelineStage aggregate(Function<Tables.Columns,Collection<? extends Selectable>> selectables)
      Adds a stage that aggregates all this stage output rows.

      As an example here is a pipeline computing the number of rows in the Competition product extension table

      
       def qapi = api.queryApi()
       def competition = qapi.tables().productExtensionRows("Competition")
      
       return qapi.source(competition)
               .aggregate { prev -> [qapi.exprs().count().as("nbRows")] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      selectables - a function getting the reference to this stage output columns and returning a list of named aggregated expressions. QueryApi will only accept aggregated expressions
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • filter

      Adds a stage that only keeps this stage output rows which verify the given expression.

      As an example, here is a pipeline filtering out null cost products

      
       def qapi = api.queryApi()
       def products = qapi.tables().products()
       return qapi.source(products, [products.sku(), products.Cost])
               .filter { prev -> prev.Cost.isNotNull() }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      condition - a function getting the reference to this stage output columns and returning the filtering expression
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • sortBy

      Adds a stage that sorts this stage output rows.

      As an example, here is a pipeline sorting

      
       def qapi = api.queryApi()
       def prodHierarchy = qapi.tables().companyParameterRows("ProductHierarchy")
       return qapi.source(prodHierarchy, [prodHierarchy.ProductGroup, prodHierarchy.ProductLine])
               .sortBy { cols -> [qapi.orders().ascNullsLast(cols.ProductGroup), qapi.orders().ascNullsLast(cols.ProductLine)] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       

      Ordering should be expressed as a list of Orders.Order which can be built via QueryApi.orders():

      Parameters:
      orders - a function getting the reference to this stage output columns and returning the list of orders that specify the expecting row sorting.
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • take

      PipelineStage take(int count)
      Adds a stage that only returns the first maxResult rows of this pipeline stage output.

      If there is less than maxResult rows, then the complete set of rows will be returned.

      As an example, here is a pipeline getting the first 10 least costing products.

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p)
               .sortBy { prev -> [qapi.orders().ascNullsLast(prev.Cost)] }
               .take(10)
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      count - the maximum number of rows to take from the start of the previous stage output row set
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • distinct

      PipelineStage distinct()
      Adds a stage that removes duplicate rows from this stage output row set.
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
    • addColumns

      PipelineStage addColumns(Function<Tables.Columns,Collection<? extends Selectable>> expressions)
      Adds a stage that adds columns to this stage output.

      This operation is commonly used to enrich the data set with expressions combining several columns.

      As an example here is a pipeline that adds computes a ProductGroup column from the content of the sku column.

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku(), p.label()])
               .addColumns { prev -> [qapi.exprs().caseWhen(prev.sku.like("foo%"), qapi.exprs().string("FooStyle"), qapi.exprs().string("Generic")).as("ProductGroup")] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      expressions - a function getting the reference to this stage output columns and returning the list of columns to be added to the previous stage output columns
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • retainColumns

      @Deprecated(since="15.1") PipelineStage retainColumns(Function<Tables.Columns,Collection<? extends Selectable>> columns)
      Deprecated.
      Adds a stage that keeps only the given columns from this stage output.

      As an example here is a very simple example that only keeps the sku and the label from the product table

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p)
               .filter { prev -> prev.Cost.isNotNull() }
               .retainColumns { prev -> [prev.sku, prev.label] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      columns - a function getting the reference to this stage output columns and returning the list of columns to be kept
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • selectColumns

      PipelineStage selectColumns(Function<Tables.Columns,Collection<? extends Selectable>> columns)
      Adds a stage that keeps and adds the given columns from this stage output.

      As an example here is a very simple example that only keeps the sku and the label and add new column as concat of sku and label from the product table

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p)
               .filter { prev -> prev.Cost.isNotNull() }
               .selectColumns { prev -> [prev.sku, prev.label, prev.sku.concat(prev.label).as("skuLabel")] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      columns - a function getting the reference to this stage output columns and returning the list of columns to be kept and added
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      15.1 - Southside
      See Also:
    • removeColumns

      PipelineStage removeColumns(Function<Tables.Columns,Collection<? extends Selectable>> columns)
      Adds a stage that remove the given columns from this stage output.

      As an example here is a very simple example that only remove a column previously used for filtering out rows

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku(), p.label(), p.Cost])
               .filter { prev -> prev.Cost.isNotNull() }
               .removeColumns { prev -> [prev.Cost] }
               .stream { it.each {
                    row -> // do something with the row
                }}
       
      Parameters:
      columns - a function getting the reference to this stage output columns returning the list of columns to be removed
      Returns:
      a PipelineStage as a result of applying this stage
      Since:
      14.0 - Caribou Lou
      See Also:
    • stream

      <T> T stream(Function<PipelineStage.ResultStream,T> function)
      Queries the database, fetches the result as a stream and provides it to the given consumer function.

      The function argument should iterate over the PipelineStage.ResultStream to either do a side effect on each row or compute a result from the stream.
      After the execution of this function, the stream will be automatically closed.

      A connection to the database is kept open during the entire stream iteration. This is why the processing of each row should be as fast as possible. The client code should then wisely choose the right tradeoff between:

      • either collecting quickly big number of records into memory using collect {it} and processing it later → which may lead to OutOfMemory exception
      • or processing it "on the flight" → which may lead to too long processing time.

      If the result content is not that big, then the usual way of getting the result will be using .stream { it.toList() }. In that case the return type will be a List<Map<String, Object>> each map being the representation of a result row. As an example, this code will return [[sku:"MB-0001"]]:

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku()], [p.sku().equal("MB-0001"))
               .stream { it.each {
                    row -> // do something with the row
                }}
       

      Even if PipelineStage.ResultStream is exposing an immutable map view for each PipelineStage.ResultStream.ResultRow, enriching this map view with other entries is possible using common non mutating Groovy Map operators like the + operator:

      
       def qapi = api.queryApi()
       def p = qapi.tables().products()
       return qapi.source(p, [p.sku()])
               .stream { it.collect {
                   it + [size: it.sku.size()] // adds the 'size' entry with the size of sku String.
               }}
       
      Type Parameters:
      T - the return type of the function parameter
      Parameters:
      function - the function in charge of consuming the result stream and returning a value which will be itself the return value of this method.
      Returns:
      the value returned by function
      Since:
      14.0 - Caribou Lou
    • queryString

      String queryString()
      Builds up the query string resulting from all the stages of this pipeline.

      Only for debugging purpose.

      Returns:
      the string representation of the query that will be sent to the database when calling stream(Function)
      Since:
      14.0 - Caribou Lou
    • traceQuery

      PipelineStage traceQuery()
      Prints the result of queryString() into the current logic execution trace using PublicGroovyAPI.trace(String, String, Object).

      Only for debugging purpose.

      Returns:
      this pipeline stage
      Since:
      14.0 - Caribou Lou