Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 35 additions & 0 deletions datafusion/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ use datafusion_expr::dml::InsertOp;
use datafusion_expr::{
CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType,
};

// Re-export DmlCapabilities for convenience
pub use datafusion_expr::dml::DmlCapabilities;
use datafusion_physical_plan::ExecutionPlan;

/// A table which can be queried and modified.
Expand Down Expand Up @@ -328,6 +331,38 @@ pub trait TableProvider: Debug + Sync + Send {
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("Insert into not implemented for this table")
}

/// Returns the DML capabilities supported by this table.
///
/// Defaults to [`DmlCapabilities::NONE`]. Override to enable DELETE/UPDATE.
fn dml_capabilities(&self) -> DmlCapabilities {
DmlCapabilities::NONE
}

/// Delete rows matching the filter predicates.
///
/// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
/// Empty `filters` deletes all rows.
async fn delete_from(
&self,
_state: &dyn Session,
_filters: Vec<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("DELETE not supported for {} table", self.table_type())
}

/// Update rows matching the filter predicates.
///
/// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64).
/// Empty `filters` updates all rows.
async fn update(
&self,
_state: &dyn Session,
_assignments: Vec<(String, Expr)>,
_filters: Vec<Expr>,
) -> Result<Arc<dyn ExecutionPlan>> {
not_impl_err!("UPDATE not supported for {} table", self.table_type())
}
}

/// Arguments for scanning a table with [`TableProvider::scan_with_args`].
Expand Down
168 changes: 165 additions & 3 deletions datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ use arrow_schema::Field;
use datafusion_catalog::ScanArgs;
use datafusion_common::display::ToStringifiedPlan;
use datafusion_common::format::ExplainAnalyzeLevel;
use datafusion_common::tree_node::{TreeNode, TreeNodeRecursion, TreeNodeVisitor};
use datafusion_common::tree_node::{
Transformed, TreeNode, TreeNodeRecursion, TreeNodeVisitor,
};
use datafusion_common::Column;
use datafusion_common::{
assert_eq_or_internal_err, assert_or_internal_err, TableReference,
};
Expand Down Expand Up @@ -590,6 +593,66 @@ impl DefaultPhysicalPlanner {
);
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Delete,
input,
..
}) => {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let capabilities = provider.table_provider.dml_capabilities();
if !capabilities.delete {
return plan_err!(
"Table '{}' does not support DELETE operations",
table_name
);
}
let filters = extract_dml_filters(input)?;
provider
.table_provider
.delete_from(session_state, filters)
.await?
} else {
return exec_err!(
"Table source can't be downcasted to DefaultTableSource"
);
}
}
LogicalPlan::Dml(DmlStatement {
table_name,
target,
op: WriteOp::Update,
input,
..
}) => {
if let Some(provider) =
target.as_any().downcast_ref::<DefaultTableSource>()
{
let capabilities = provider.table_provider.dml_capabilities();
if !capabilities.update {
return plan_err!(
"Table '{}' does not support UPDATE operations",
table_name
);
}
// For UPDATE, the assignments are encoded in the projection of input
// We pass the filters and let the provider handle the projection
let filters = extract_dml_filters(input)?;
// Extract assignments from the projection in input plan
let assignments = extract_update_assignments(input)?;
provider
.table_provider
.update(session_state, assignments, filters)
.await?
} else {
return exec_err!(
"Table source can't be downcasted to DefaultTableSource"
);
}
}
LogicalPlan::Window(Window { window_expr, .. }) => {
assert_or_internal_err!(
!window_expr.is_empty(),
Expand Down Expand Up @@ -1829,6 +1892,105 @@ fn get_physical_expr_pair(
Ok((physical_expr, physical_name))
}

/// Extract filter predicates from a DML input plan (DELETE/UPDATE).
/// Walks the logical plan tree and collects Filter predicates,
/// splitting AND conjunctions into individual expressions.
/// Column qualifiers are stripped so expressions can be evaluated against
/// the TableProvider's schema.
fn extract_dml_filters(input: &Arc<LogicalPlan>) -> Result<Vec<Expr>> {
let mut filters = Vec::new();

input.apply(|node| {
if let LogicalPlan::Filter(filter) = node {
// Split AND predicates into individual expressions
filters.extend(split_conjunction(&filter.predicate).into_iter().cloned());
}
Ok(TreeNodeRecursion::Continue)
})?;

// Strip table qualifiers from column references
filters.into_iter().map(strip_column_qualifiers).collect()
}

/// Strip table qualifiers from column references in an expression.
/// This is needed because DML filter expressions contain qualified column names
/// (e.g., "table.column") but the TableProvider's schema only has simple names.
fn strip_column_qualifiers(expr: Expr) -> Result<Expr> {
expr.transform(|e| {
if let Expr::Column(col) = &e {
if col.relation.is_some() {
// Create unqualified column
return Ok(Transformed::yes(Expr::Column(Column::new_unqualified(
col.name.clone(),
))));
}
}
Ok(Transformed::no(e))
})
.map(|t| t.data)
}

/// Extract column assignments from an UPDATE input plan.
/// For UPDATE statements, the SQL planner encodes assignments as a projection
/// over the source table. This function extracts column name and expression pairs
/// from the projection. Column qualifiers are stripped from the expressions.
fn extract_update_assignments(input: &Arc<LogicalPlan>) -> Result<Vec<(String, Expr)>> {
// The UPDATE input plan structure is:
// Projection(updated columns as expressions with aliases)
// Filter(optional WHERE clause)
// TableScan
//
// Each projected expression has an alias matching the column name
let mut assignments = Vec::new();

// Find the top-level projection
if let LogicalPlan::Projection(projection) = input.as_ref() {
for expr in &projection.expr {
if let Expr::Alias(alias) = expr {
// The alias name is the column name being updated
// The inner expression is the new value
let column_name = alias.name.clone();
// Only include if it's not just a column reference to itself
// (those are columns that aren't being updated)
if !is_identity_assignment(&alias.expr, &column_name) {
// Strip qualifiers from the assignment expression
let stripped_expr = strip_column_qualifiers((*alias.expr).clone())?;
assignments.push((column_name, stripped_expr));
}
}
}
} else {
// Try to find projection deeper in the plan
input.apply(|node| {
if let LogicalPlan::Projection(projection) = node {
for expr in &projection.expr {
if let Expr::Alias(alias) = expr {
let column_name = alias.name.clone();
if !is_identity_assignment(&alias.expr, &column_name) {
let stripped_expr =
strip_column_qualifiers((*alias.expr).clone())?;
assignments.push((column_name, stripped_expr));
}
}
}
return Ok(TreeNodeRecursion::Stop);
}
Ok(TreeNodeRecursion::Continue)
})?;
}

Ok(assignments)
}

/// Check if an assignment is an identity assignment (column = column)
/// These are columns that are not being modified in the UPDATE
fn is_identity_assignment(expr: &Expr, column_name: &str) -> bool {
match expr {
Expr::Column(col) => col.name == column_name,
_ => false,
}
}

/// Check if window bounds are valid after schema information is available, and
/// window_frame bounds are casted to the corresponding column type.
/// queries like:
Expand Down Expand Up @@ -3919,8 +4081,8 @@ digraph {
let right = LogicalPlanBuilder::scan("right", source, None)?.build()?;

let join_keys = (
vec![datafusion_common::Column::new(Some("left"), "a")],
vec![datafusion_common::Column::new(Some("right"), "a")],
vec![Column::new(Some("left"), "a")],
vec![Column::new(Some("right"), "a")],
);

let join = left.join(right, JoinType::Full, join_keys, None)?.build()?;
Expand Down
4 changes: 4 additions & 0 deletions datafusion/expr/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ pub mod interval_arithmetic {
pub use datafusion_expr_common::interval_arithmetic::*;
}
pub mod logical_plan;
pub mod dml {
//! DML (Data Manipulation Language) types for DELETE, UPDATE operations.
pub use crate::logical_plan::dml::*;
}
pub mod planner;
pub mod registry;
pub mod simplify;
Expand Down
104 changes: 100 additions & 4 deletions datafusion/expr/src/logical_plan/dml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,11 @@ impl CopyTo {
/// * `INSERT` - Appends new rows to the existing table. Calls
/// [`TableProvider::insert_into`]
///
/// * `DELETE` - Removes rows from the table. Currently NOT supported by the
/// [`TableProvider`] trait or builtin sources.
/// * `DELETE` - Removes rows from the table. Calls [`TableProvider::delete_from`]
/// if the provider returns [`DmlCapabilities`] with `delete = true`.
///
/// * `UPDATE` - Modifies existing rows in the table. Currently NOT supported by
/// the [`TableProvider`] trait or builtin sources.
/// * `UPDATE` - Modifies existing rows in the table. Calls [`TableProvider::update`]
/// if the provider returns [`DmlCapabilities`] with `update = true`.
///
/// * `CREATE TABLE AS SELECT` - Creates a new table and populates it with data
/// from a query. This is similar to the `INSERT` operation, but it creates a new
Expand All @@ -136,6 +136,9 @@ impl CopyTo {
///
/// [`TableProvider`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html
/// [`TableProvider::insert_into`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.insert_into
/// [`TableProvider::delete_from`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.delete_from
/// [`TableProvider::update`]: https://docs.rs/datafusion/latest/datafusion/datasource/trait.TableProvider.html#method.update
/// [`DmlCapabilities`]: crate::dml::DmlCapabilities
#[derive(Clone)]
pub struct DmlStatement {
/// The table name
Expand Down Expand Up @@ -288,10 +291,103 @@ impl Display for InsertOp {
}
}

/// DML operations supported by a table.
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Hash)]
pub struct DmlCapabilities {
pub delete: bool,
pub update: bool,
}

impl DmlCapabilities {
pub const NONE: Self = Self {
delete: false,
update: false,
};

pub const ALL: Self = Self {
delete: true,
update: true,
};

pub const DELETE_ONLY: Self = Self {
delete: true,
update: false,
};

pub const UPDATE_ONLY: Self = Self {
delete: false,
update: true,
};

pub fn supports_any(&self) -> bool {
self.delete || self.update
}
}

impl Display for DmlCapabilities {
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
let caps: Vec<&str> = [
self.delete.then_some("DELETE"),
self.update.then_some("UPDATE"),
]
.into_iter()
.flatten()
.collect();

if caps.is_empty() {
write!(f, "NONE")
} else {
write!(f, "{}", caps.join(", "))
}
}
}

fn make_count_schema() -> DFSchemaRef {
Arc::new(
Schema::new(vec![Field::new("count", DataType::UInt64, false)])
.try_into()
.unwrap(),
)
}

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_dml_capabilities_constants() {
assert!(!DmlCapabilities::NONE.delete);
assert!(!DmlCapabilities::NONE.update);

assert!(DmlCapabilities::ALL.delete);
assert!(DmlCapabilities::ALL.update);

assert!(DmlCapabilities::DELETE_ONLY.delete);
assert!(!DmlCapabilities::DELETE_ONLY.update);

assert!(!DmlCapabilities::UPDATE_ONLY.delete);
assert!(DmlCapabilities::UPDATE_ONLY.update);
}

#[test]
fn test_dml_capabilities_supports_any() {
assert!(!DmlCapabilities::NONE.supports_any());
assert!(DmlCapabilities::ALL.supports_any());
assert!(DmlCapabilities::DELETE_ONLY.supports_any());
assert!(DmlCapabilities::UPDATE_ONLY.supports_any());
}

#[test]
fn test_dml_capabilities_display() {
assert_eq!(format!("{}", DmlCapabilities::NONE), "NONE");
assert_eq!(format!("{}", DmlCapabilities::ALL), "DELETE, UPDATE");
assert_eq!(format!("{}", DmlCapabilities::DELETE_ONLY), "DELETE");
assert_eq!(format!("{}", DmlCapabilities::UPDATE_ONLY), "UPDATE");
}

#[test]
fn test_dml_capabilities_default() {
let caps = DmlCapabilities::default();
assert_eq!(caps, DmlCapabilities::NONE);
}
}
2 changes: 1 addition & 1 deletion datafusion/expr/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use ddl::{
CreateFunctionBody, CreateIndex, CreateMemoryTable, CreateView, DdlStatement,
DropCatalogSchema, DropFunction, DropTable, DropView, OperateFunctionArg,
};
pub use dml::{DmlStatement, WriteOp};
pub use dml::{DmlCapabilities, DmlStatement, WriteOp};
pub use plan::{
Aggregate, Analyze, ColumnUnnestList, DescribeTable, Distinct, DistinctOn,
EmptyRelation, Explain, ExplainOption, Extension, FetchType, Filter, Join,
Expand Down
Loading