Interface PipelineStage
A PipelineStage can be seen as a data processing node that
- gets data from a source: either a
Tables.Tableor aPipelineStage - transforms it
- provides the transformed data as output which can be used either as input for another pipeline stage or directly
fetched using
stream(Function)
A PipelineStage can be instantiated via QueryApi.source(Tables.Table, List, Expression),
QueryApi.source(Tables.Table, List) or QueryApi.source(Tables.Table) methods.
Once instantiated, another data transformation stage can be applied to its output, such as:
innerJoin(Tables.Table, Function, Function)leftOuterJoin(Tables.Table, Function, Function)aggregateBy(Function, Function)aggregate(Function)filter(Function)sortBy(Function)take(int)distinct()addColumns(Function)selectColumns(Function)removeColumns(Function)
PipelineStage so that multiple of stages can be chained to form a
data transformation pipeline.
Once all stages are specified, the stream(Function) method can be used to query and fetch data
from the database.
Here is an example of a data pipeline chaining an innerJoin, a filter and finally an aggregation stage.
def date = new Date()
def qapi = api.queryApi()
def products = q.tables().products()
def pExtCost = q.tables().productExtensionRows("Cost")
return qapi.source(products, [products.sku(), products.ProductGroup])
// joins the "Cost" product extension
.innerJoin(
pExtCost,
{ prev -> [pExtCost.Cost, pExtCost.Currency] },
{ prev -> pExtCost.sku().equal(prev.sku) }
// retain only valid cost entries
).filter({ prev ->
qapi.exprs().and(
prev.ValidFrom.lessOrEqual(date),
prev.ValidTo.greaterOrEqual(date)
)
})
// computes the average cost by ProductGroup and Currency
.aggregateBy(
{prev -> [prev.ProductGroup, prev.Currency]},
{prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]}
)
// fetches the result and do something with it
.stream { it.each { row ->
// do something with the row
}}
- Since:
- 14.0 - Caribou Lou
-
Nested Class Summary
Nested ClassesModifier and TypeInterfaceDescriptionstatic interfaceThe result type to be consumed by the function given tostream(Function). -
Method Summary
Modifier and TypeMethodDescriptionaddColumns(Function<Tables.Columns, Collection<? extends Selectable>> expressions) Adds a stage that adds columns to this stage output.aggregate(Function<Tables.Columns, Collection<? extends Selectable>> selectables) Adds a stage that aggregates all this stage output rows.aggregateBy(Function<Tables.Columns, Collection<? extends Expression>> by, Function<Tables.Columns, Collection<? extends Selectable>> selectables) Adds a stage that aggregates the outputs of this stage by a given set of expressions.crossJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables) Adds a crossJoin stage that joins to thisPipelineStageanotherPipelineStage.crossJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables) Adds a crossJoin stage that joins to thisPipelineStageanotherTables.Table.distinct()Adds a stage that removes duplicate rows from this stage output row set.except(PipelineStage rightPipeline) Adds an except stage that combines thisPipelineStageto anotherPipelineStage.exceptAll(PipelineStage rightPipeline) Adds an exceptAll stage that combines thisPipelineStageto anotherPipelineStage.filter(Function<Tables.Columns, Expression> condition) Adds a stage that only keeps this stage output rows which verify the given expression.innerJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables, BiFunction<Tables.Columns, Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStageto anotherPipelineStage.innerJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStageto anotherTables.Table.intersect(PipelineStage rightPipeline) Adds an intersect stage that combines thisPipelineStageto anotherPipelineStage.intersectAll(PipelineStage rightPipeline) Adds an intersectAll stage that combines thisPipelineStageto anotherPipelineStage.leftOuterJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables, BiFunction<Tables.Columns, Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStageanotherPipelineStage.leftOuterJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStageanotherTables.Table.Builds up the query string resulting from all the stages of this pipeline.removeColumns(Function<Tables.Columns, Collection<? extends Selectable>> columns) Adds a stage that remove the given columns from this stage output.retainColumns(Function<Tables.Columns, Collection<? extends Selectable>> columns) Deprecated.selectColumns(Function<Tables.Columns, Collection<? extends Selectable>> columns) Adds a stage that keeps and adds the given columns from this stage output.sortBy(Function<Tables.Columns, List<Orders.Order>> orders) Adds a stage that sorts this stage output rows.<T> Tstream(Function<PipelineStage.ResultStream, T> function) Queries the database, fetches the result as a stream and provides it to the given consumer function.take(int count) Adds a stage that only returns the firstmaxResultrows of this pipeline stage output.Prints the result ofqueryString()into the current logic execution trace usingPublicGroovyAPI.trace(String, String, Object).union(PipelineStage rightPipeline) Adds a union stage that combines thisPipelineStageto anotherPipelineStage.unionAll(PipelineStage rightPipeline) Adds a unionAll stage that combines thisPipelineStageto anotherPipelineStage.
-
Method Details
-
innerJoin
PipelineStage innerJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStageto anotherTables.Table.As an SQL join:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
selectableparameter. - the joining criteria as a boolean expression should also be provided in the join criteria.
⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
Here is an example of a simple innerJoin:
def date = new Date() def qapi = api.queryApi() def products = qapi.tables().products() def pExtCost = qapi.tables().productExtensionRows("Cost") return qapi.source(products, [products.sku(), products.ProductGroup]) // joins the "Cost" product extension .innerJoin( pExtCost, { prev -> [pExtCost.Cost, pExtCost.Currency]}, { prev -> qapi.exprs().and( pExtCost.sku().equal(prev.sku), prev.ValidFrom.lessOrEqual(date), prev.ValidTo.greaterOrEqual(date) )} ) // computes the average cost by ProductGroup and Currency .aggregateBy( { prev -> [prev.ProductGroup, prev.Currency]}, { prev -> [qapi.exprs().avg(prev.Cost).as("averageCost")]} ) // fetches the result and do something with it .stream { it.each { row -> // do something with the row }}- Parameters:
table- the Table to be joined toselectables- a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.joinCriteria- a function getting the reference to this stage output columns and returning the joining criteria- Returns:
- a
PipelineStageas a result of applying thisinnerJoinstage - Since:
- 14.0 - Caribou Lou
- See Also:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
-
innerJoin
PipelineStage innerJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables, BiFunction<Tables.Columns, Tables.Columns, Expression> joinCriteria) Adds an innerJoin stage that joins thisPipelineStageto anotherPipelineStage.As an SQL join:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
selectableparameter. - the joining criteria as a boolean expression should also be provided by the
joinCriteriaparameter.
⚠ It is strongly recommended that the joining conditions use all fields of the database primary index.
Here is an example of a simple innerJoin between two pipelines:
def date = new Date() def qapi = api.queryApi() def products = qapi.tables().products() def pExtCost = qapi.tables().productExtensionRows("Cost") def avgCostByCateg = qapi.source(pExtCost, [pExtCost.ProductGroup, pExtCost.cost]) .aggregateBy(prev -> [prev.ProductGroup], prev -> [prev.ProductGroup, qapi.exprs().avg(prev.cost).as("avgCost"))]) return qapi.source(products, [products.sku(), products.ProductGroup]) // joins the "Cost" product extension .innerJoin( avgCostByCateg, { left, right -> [right.avgCost]}, { left, right -> left.ProductGroup.equal(right.ProductGroup)} ) // fetches the result and do something with it .stream { it.each { row -> // do something with the row }}- Parameters:
pipeline- the Table to be joined toselectables- a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.joinCriteria- a function getting as argument an accessor to the previous stage columns (left) and to the * joined pipeline columns (right) and returning the joining criteria- Returns:
- a
PipelineStageas a result of applying thisinnerJoinstage - Since:
- 15.1 - Southside
- See Also:
- the list of columns or expressions to be joined to the previous pipeline stage should be provided
by the
-
leftOuterJoin
PipelineStage leftOuterJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables, Function<Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStageanotherTables.Table.- Parameters:
table- the Table to be joined toselectables- a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.joinCriteria- the joining criteria- Returns:
- a
PipelineStageas a result of applying thisleftOuterJoinstage - Since:
- 14.0 - Caribou Lou
- See Also:
-
leftOuterJoin
PipelineStage leftOuterJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables, BiFunction<Tables.Columns, Tables.Columns, Expression> joinCriteria) Adds a leftOuterJoin stage that joins to thisPipelineStageanotherPipelineStage.- Parameters:
pipeline- the Table to be joined toselectables- a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.joinCriteria- a function getting as argument an accessor to the previous stage columns (left) and to the * joined pipeline columns (right) and returning the joining criteria- Returns:
- a
PipelineStageas a result of applying thisinnerJoinstage - Since:
- 15.1 - Southside
- See Also:
-
crossJoin
PipelineStage crossJoin(Tables.Table table, Function<Tables.Columns, Collection<? extends Selectable>> selectables) Adds a crossJoin stage that joins to thisPipelineStageanotherTables.Table.- Parameters:
table- the Table to be joined toselectables- a function getting the reference to this stage output columns and returning the list of columns or expressions to be added to this stage output.- Returns:
- a
PipelineStageas a result of applying thiscrossJoinstage - Since:
- 15.1 - Southside
-
crossJoin
PipelineStage crossJoin(PipelineStage pipeline, BiFunction<Tables.Columns, Tables.Columns, Collection<? extends Selectable>> selectables) Adds a crossJoin stage that joins to thisPipelineStageanotherPipelineStage.- Parameters:
pipeline- the Table to be joined toselectables- a function getting as argument an accessor to the previous stage columns (left) and to the joined pipeline columns (right) and returns the list of columns or expressions to be added to this stage output.- Returns:
- a
PipelineStageas a result of applying thiscrossJoinstage - Since:
- 15.1 - Southside
-
union
Adds a union stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisunionstage - Since:
- 15.2 - Southside
-
unionAll
Adds a unionAll stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisunionAllstage - Since:
- 15.2 - Southside
-
intersect
Adds an intersect stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisintersectstage - Since:
- 15.2 - Southside
-
intersectAll
Adds an intersectAll stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisintersectAllstage - Since:
- 15.2 - Southside
-
except
Adds an except stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisexceptstage - Since:
- 15.2 - Southside
-
exceptAll
Adds an exceptAll stage that combines thisPipelineStageto anotherPipelineStage.- Parameters:
rightPipeline- the pipeline to be combined with- Returns:
- a
PipelineStageas a result of applying thisexceptAllstage - Since:
- 15.2 - Southside
-
aggregateBy
PipelineStage aggregateBy(Function<Tables.Columns, Collection<? extends Expression>> by, Function<Tables.Columns, Collection<? extends Selectable>> selectables) Adds a stage that aggregates the outputs of this stage by a given set of expressions.It is similar to the standard GROUP BY SQL clause.
As an example here is a pipeline computing the minimum competition price for each sku
def qapi = api.queryApi() def competition = qapi.tables().productExtensionRows("Competition") def exprs = qapi.exprs() return qapi.source(competition, [competition.sku(), competition.Price]) .aggregateBy({ prev -> [prev.sku] }, { prev -> [prev.sku, qapi.exprs().min(prev.Price).as("MinPrice")] }) .stream { it.each { row -> // do something with the row }}- Parameters:
by- a function getting the reference to this stage output columns and returning the list of expressions to group byselectables- a function getting the reference to this stage output columns and returning a list of named aggregated expressions using the reference to the last stage output. QueryApi will accept either aggregated expressions using an aggregation function or a non aggregated expression belonging to the result of thebyparameter- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
aggregate
Adds a stage that aggregates all this stage output rows.As an example here is a pipeline computing the number of rows in the Competition product extension table
def qapi = api.queryApi() def competition = qapi.tables().productExtensionRows("Competition") return qapi.source(competition) .aggregate { prev -> [qapi.exprs().count().as("nbRows")] } .stream { it.each { row -> // do something with the row }}- Parameters:
selectables- a function getting the reference to this stage output columns and returning a list of named aggregated expressions. QueryApi will only accept aggregated expressions- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
filter
Adds a stage that only keeps this stage output rows which verify the given expression.As an example, here is a pipeline filtering out null cost products
def qapi = api.queryApi() def products = qapi.tables().products() return qapi.source(products, [products.sku(), products.Cost]) .filter { prev -> prev.Cost.isNotNull() } .stream { it.each { row -> // do something with the row }}- Parameters:
condition- a function getting the reference to this stage output columns and returning the filtering expression- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
sortBy
Adds a stage that sorts this stage output rows.As an example, here is a pipeline sorting
def qapi = api.queryApi() def prodHierarchy = qapi.tables().companyParameterRows("ProductHierarchy") return qapi.source(prodHierarchy, [prodHierarchy.ProductGroup, prodHierarchy.ProductLine]) .sortBy { cols -> [qapi.orders().ascNullsLast(cols.ProductGroup), qapi.orders().ascNullsLast(cols.ProductLine)] } .stream { it.each { row -> // do something with the row }}Ordering should be expressed as a list of
Orders.Orderwhich can be built viaQueryApi.orders():Orders.ascNullsFirst(Expression)returns an ascending order with null values evaluated as the lowest value.Orders.ascNullsLast(Expression)returns an ascending order with null values evaluated as the highest value.Orders.descNullsFirst(Expression)returns a descending order with null values evaluated as the highest value.Orders.descNullsLast(Expression)returns a descending order with null values evaluated as the lower value.
- Parameters:
orders- a function getting the reference to this stage output columns and returning the list of orders that specify the expecting row sorting.- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
take
Adds a stage that only returns the firstmaxResultrows of this pipeline stage output.If there is less than
maxResultrows, then the complete set of rows will be returned.As an example, here is a pipeline getting the first 10 least costing products.
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p) .sortBy { prev -> [qapi.orders().ascNullsLast(prev.Cost)] } .take(10) .stream { it.each { row -> // do something with the row }}- Parameters:
count- the maximum number of rows to take from the start of the previous stage output row set- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
distinct
PipelineStage distinct()Adds a stage that removes duplicate rows from this stage output row set.- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
-
addColumns
Adds a stage that adds columns to this stage output.This operation is commonly used to enrich the data set with expressions combining several columns.
As an example here is a pipeline that adds computes a
ProductGroupcolumn from the content of theskucolumn.def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku(), p.label()]) .addColumns { prev -> [qapi.exprs().caseWhen(prev.sku.like("foo%"), qapi.exprs().string("FooStyle"), qapi.exprs().string("Generic")).as("ProductGroup")] } .stream { it.each { row -> // do something with the row }}- Parameters:
expressions- a function getting the reference to this stage output columns and returning the list of columns to be added to the previous stage output columns- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
retainColumns
@Deprecated(since="15.1") PipelineStage retainColumns(Function<Tables.Columns, Collection<? extends Selectable>> columns) Deprecated.UseselectColumns(Function)instead.Adds a stage that keeps only the given columns from this stage output.As an example here is a very simple example that only keeps the sku and the label from the product table
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p) .filter { prev -> prev.Cost.isNotNull() } .retainColumns { prev -> [prev.sku, prev.label] } .stream { it.each { row -> // do something with the row }}- Parameters:
columns- a function getting the reference to this stage output columns and returning the list of columns to be kept- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
selectColumns
Adds a stage that keeps and adds the given columns from this stage output.As an example here is a very simple example that only keeps the sku and the label and add new column as concat of sku and label from the product table
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p) .filter { prev -> prev.Cost.isNotNull() } .selectColumns { prev -> [prev.sku, prev.label, prev.sku.concat(prev.label).as("skuLabel")] } .stream { it.each { row -> // do something with the row }}- Parameters:
columns- a function getting the reference to this stage output columns and returning the list of columns to be kept and added- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 15.1 - Southside
- See Also:
-
removeColumns
Adds a stage that remove the given columns from this stage output.As an example here is a very simple example that only remove a column previously used for filtering out rows
def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku(), p.label(), p.Cost]) .filter { prev -> prev.Cost.isNotNull() } .removeColumns { prev -> [prev.Cost] } .stream { it.each { row -> // do something with the row }}- Parameters:
columns- a function getting the reference to this stage output columns returning the list of columns to be removed- Returns:
- a
PipelineStageas a result of applying this stage - Since:
- 14.0 - Caribou Lou
- See Also:
-
stream
Queries the database, fetches the result as a stream and provides it to the given consumer function.The
functionargument should iterate over thePipelineStage.ResultStreamto either do a side effect on each row or compute a result from the stream.
After the execution of this function, the stream will be automatically closed.A connection to the database is kept open during the entire stream iteration. This is why the processing of each row should be as fast as possible. The client code should then wisely choose the right tradeoff between:
- either collecting quickly big number of records into memory using
collect {it}and processing it later → which may lead to OutOfMemory exception - or processing it "on the flight" → which may lead to too long processing time.
If the result content is not that big, then the usual way of getting the result will be using
.stream { it.toList() }. In that case the return type will be aList<Map<String, Object>>each map being the representation of a result row. As an example, this code will return[[sku:"MB-0001"]]:def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku()], [p.sku().equal("MB-0001")) .stream { it.each { row -> // do something with the row }}Even if
PipelineStage.ResultStreamis exposing an immutable map view for eachPipelineStage.ResultStream.ResultRow, enriching this map view with other entries is possible using common non mutating Groovy Map operators like the+operator:def qapi = api.queryApi() def p = qapi.tables().products() return qapi.source(p, [p.sku()]) .stream { it.collect { it + [size: it.sku.size()] // adds the 'size' entry with the size of sku String. }}- Type Parameters:
T- the return type of thefunctionparameter- Parameters:
function- the function in charge of consuming the result stream and returning a value which will be itself the return value of this method.- Returns:
- the value returned by
function - Since:
- 14.0 - Caribou Lou
- either collecting quickly big number of records into memory using
-
queryString
String queryString()Builds up the query string resulting from all the stages of this pipeline.Only for debugging purpose.
- Returns:
- the string representation of the query that will be sent to the database when calling
stream(Function) - Since:
- 14.0 - Caribou Lou
-
traceQuery
PipelineStage traceQuery()Prints the result ofqueryString()into the current logic execution trace usingPublicGroovyAPI.trace(String, String, Object).Only for debugging purpose.
- Returns:
- this pipeline stage
- Since:
- 14.0 - Caribou Lou
-
selectColumns(Function)instead.