When importing from tabular data sources, Flux provides support for aggregating rows together to produce hierarchical JSON or XML documents that often better resemble the logical entities associated with the documents. Flux supports this capability for the following commands:
As an example, consider a scenario where you are importing customer data from a relational database via JDBC. Each customer record can have one or more related address records, which are stored in a separate table. You could simply import all customer rows and all address rows as separate documents in MarkLogic. But for a variety of use cases - including searching and security - you may want each customer document to contain all of its related addresses. Flux allows you to achieve that goal by aggregating related data such that hierarchical documents are written to MarkLogic containing one or more arrays of related records.
Usage
To aggregate rows together when using import-avro-files, import-delimited-files, import-jdbc, import-orc-files, or import-parquet-files, use the following options:
--group-byspecifies a column name to group rows by; this is typically the column used in a join.--aggregatespecifies a string of the formnew_column_name=column1,column2,column3. Thenew_column_namecolumn will contain an array of objects, with each object having columns ofcolumn1,column2, andcolumn3.
For example, consider the Postgres tutorial database that features 15 tables with multiple relationships. One such relationship is between customers and payments. The following options would be used to write customer documents with each customer document containing an array of its related payments:
-
./bin/flux import-jdbc \ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" \ --group-by customer_id \ --aggregate "payments=payment_id,amount,payment_date" \ --connection-string "flux-example-user:password@localhost:8004" \ --permissions flux-example-role,read,flux-example-role,update -
bin\flux import-jdbc ^ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" ^ --group-by customer_id ^ --aggregate "payments=payment_id,amount,payment_date" ^ --connection-string "flux-example-user:password@localhost:8004" ^ --permissions flux-example-role,read,flux-example-role,update
The options above result in the following aggregation being performed:
- Rows retrieved from the Postgres database are grouped together based on values in the
customer_idcolumn. - For each payment for a given customer, the values in a payment row -
payment_id,amount, andpayment_date- are added to a struct that is then added to an array in a new column namedpayments. - The
payment_id,amount, andpayment_datecolumns are removed.
Each customer JSON document will as a result have a top-level payments array containing one object for each related payment, as shown in the example below:
{
"customer_id": 1,
"first_name": "Mary",
"payments": [
{
"payment_id": 18495,
"amount": 5.99,
"payment_date": "2007-02-15T04:22:38.996Z"
},
{
"payment_id": 18496,
"amount": 0.99,
"payment_date": "2007-02-15T21:31:19.996Z"
}
]
}
The approach can be used for many joins as well, thus producing multiple top-level array fields containing related objects. You are restricted to a single --group-by, but you can include many --aggregate options, one for each join that you wish to aggregate rows from.
Ordering aggregates
When aggregating rows, the order of elements in the resulting arrays is not guaranteed, even if your SQL query includes an ORDER BY clause. To ensure a specific order for elements in an aggregated array, use the --aggregate-order-by option:
--aggregate-order-byspecifies which aggregated array to sort and which column to sort by, in the formaggregationName=columnName:ascoraggregationName=columnName:desc. The column name must be one of the columns included in the corresponding aggregation. If the direction (:ascor:desc) is omitted, ascending order is used by default. This option can be specified multiple times to order multiple aggregated arrays.
For example, to ensure that the payments array in each customer document is ordered by payment amount in ascending order:
-
./bin/flux import-jdbc \ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" \ --group-by customer_id \ --aggregate "payments=payment_id,amount,payment_date" \ --aggregate-order-by "payments=amount:asc" \ --connection-string "flux-example-user:password@localhost:8004" \ --permissions flux-example-role,read,flux-example-role,update -
bin\flux import-jdbc ^ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" ^ --group-by customer_id ^ --aggregate "payments=payment_id,amount,payment_date" ^ --aggregate-order-by "payments=amount:asc" ^ --connection-string "flux-example-user:password@localhost:8004" ^ --permissions flux-example-role,read,flux-example-role,update
To sort in descending order (highest amount first), use :desc:
-
./bin/flux import-jdbc \ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" \ --group-by customer_id \ --aggregate "payments=payment_id,amount,payment_date" \ --aggregate-order-by "payments=amount:desc" \ --connection-string "flux-example-user:password@localhost:8004" \ --permissions flux-example-role,read,flux-example-role,update -
bin\flux import-jdbc ^ --query "select c.*, p.payment_id, p.amount, p.payment_date from customer c inner join payment p on c.customer_id = p.customer_id" ^ --group-by customer_id ^ --aggregate "payments=payment_id,amount,payment_date" ^ --aggregate-order-by "payments=amount:desc" ^ --connection-string "flux-example-user:password@localhost:8004" ^ --permissions flux-example-role,read,flux-example-role,update
You can also order multiple aggregated arrays by specifying --aggregate-order-by multiple times. For example, to order payments by amount descending and rentals by rental_id ascending:
-
./bin/flux import-jdbc \ --query "select c.*, p.payment_id, p.amount, r.rental_id from customer c inner join payment p on c.customer_id = p.customer_id inner join rental r on c.customer_id = r.customer_id" \ --group-by customer_id \ --aggregate "payments=payment_id,amount" \ --aggregate "rentals=rental_id" \ --aggregate-order-by "payments=amount:desc" \ --aggregate-order-by "rentals=rental_id:asc" \ --connection-string "flux-example-user:password@localhost:8004" \ --permissions flux-example-role,read,flux-example-role,update -
bin\flux import-jdbc ^ --query "select c.*, p.payment_id, p.amount, r.rental_id from customer c inner join payment p on c.customer_id = p.customer_id inner join rental r on c.customer_id = r.customer_id" ^ --group-by customer_id ^ --aggregate "payments=payment_id,amount" ^ --aggregate "rentals=rental_id" ^ --aggregate-order-by "payments=amount:desc" ^ --aggregate-order-by "rentals=rental_id:asc" ^ --connection-string "flux-example-user:password@localhost:8004" ^ --permissions flux-example-role,read,flux-example-role,update
Important notes:
- Each
--aggregate-order-byoption orders a single aggregated array independently. - The ordering is applied after the aggregation is complete, ensuring consistent results across all documents.
- For single-column aggregations (arrays of simple values rather than objects), the array will be sorted by those values directly.
- If a column name contains a colon character, you can use the SQL
ASkeyword to alias it to a name without a colon.