Apache Spark has an Extension Points API that lets third-party users customize Spark in their deployments. In my previous blog, I talked about the extension points and how to add an optimizer rule using this API. The Extension Points API is great and lets you extend Spark in different ways for optimizations, for adding hooks to enable some functions in our application. In this blog, I talk about some enhancements that we developed for the Extension Points API.
When we use the SparkSessionExtensions API to add new optimizer rules, the rules are added in a specific place in the Optimizer. For most cases, this works great! However, the current API does not allow fine-grained control on when the optimization rules are exercised. In some cases, we want to add a rule in a specific place in the optimizer. Currently, the API does not give us that flexibility. This is what motivated us to enhance the API.
There are two new API enhancements added to the SparkSessionExtensions API.
Inject a rule after or before an existing rule in a given existing batch in the Optimizer.
/** * Inject an optimizer `Rule` builder into the [[SparkSession]] in a particular batch after * or before a specific existing rule in the batch. * If the batchName does not exist, or if the existing rule does not exist in the given * batch, then an error will be thrown. (fail fast) */ def injectOptimizerRuleInOrder( builder: RuleBuilder, batchName: String, ruleOrder: Order.Order, existingRule: String): Unit
This method can be used to inject a rule to an existing batch specified by the ‘batchName’, and you can specify whether the new rule is added before or after an existing rule name.
Inject an optimizer batch after or before an existing batch. You cannot currently add a batch to the optimization using the SparkSessionExtensions API. So, this API adds support to add a batch of rules.
/** * Inject a batch of optimizer rules * @param batchName - Batch Name to inject * @param maxIterations - Iterations * @param existingBatchName - Existing batch name in reference to which this batch is injected * @param order - Specify the order, if before the existing batch or after the existing batch * @param rules - New rules in the batch that will be added */ def injectOptimizerBatch( batchName: String, maxIterations: Int, existingBatchName: String, order: Order.Value, rules: Seq[RuleBuilder]): Unit
This method can be used to add a batch of rules in a specific order with reference to an existing batch. You can specify whether you want to add the batch before or after an existing batch name.
I have opened SPARK-26249 for this function. The JIRA contains the design document and details on the expected behavior of this API. Please check out the code changes in this PR if you are interested in trying out this function for your own use case. I would appreciate any feedback on SPARK-26249 if it is useful for your scenarios.
I gave a presentation at the Apache Spark and AI Summit at San Francisco 2019 on how to extend Spark with customized optimizations. The talk covers the Extension Points API in detail and also talks about the proposed enhancements that I cover in this blog. Please feel free to check it out if you are interested.