I have this Python code that executes 3 insert transactions into a delta table
import pandas as pd
from deltalake.writer import write_deltalake
from deltalake import DeltaTable
if __name__ == '__main__':
# First transaction
id_list = []
name_list = []
for i in range(1, 11):
id_list.append(i)
name_list.append(f'item {i}')
data_dict = {'id': id_list, 'name': name_list}
write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')
# Second transaction
id_list = []
name_list = []
for i in range(11, 16):
id_list.append(i)
name_list.append(f'item {i}')
data_dict = {'id': id_list, 'name': name_list}
write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')
# Third transaction
id_list = []
name_list = []
for i in range(16, 31):
id_list.append(i)
name_list.append(f'item {i}')
data_dict = {'id': id_list, 'name': name_list}
write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')
This does what is intended. When I run this code, I can confirm by checking the _delta_log
that indeed 3 transactions was executed.
Now I am trying to do similar using Rust, with the following code
fn get_data(start: u8, end: u8) -> RecordBatch {
let schema = Arc::new(ArrowSchema::new(vec![
Field::new("id", DataType::Int32, false),
Field::new("name", DataType::Utf8, false),
]));
let ids: Vec<i32> = (start..=end).map(i32::from).collect();
let names: Vec<String> = ids.iter().map(|x| format!("item {x}")).collect();
let id_values = Int32Array::from(ids);
let name_values = StringArray::from(names);
RecordBatch::try_new(schema, vec![Arc::new(id_values), Arc::new(name_values)]).unwrap()
}
async fn main() {
// First transaction
let data = get_data(1, 10);
let table = DeltaTableBuilder::from_uri("./data/table3")
.build()
.unwrap();
let ops = DeltaOps::from(table);
ops.write(vec![data]).await.unwrap();
// Second transaction
let data = get_data(11, 15);
let table = DeltaTableBuilder::from_uri("./data/table3")
.build()
.unwrap();
let ops = DeltaOps::from(table);
ops.write(vec![data]).await.unwrap();
// Third transaction
let data = get_data(16, 30);
let table = DeltaTableBuilder::from_uri("./data/table3")
.build()
.unwrap();
let ops = DeltaOps::from(table);
ops.write(vec![data]).await.unwrap();
}
But when I run this, it fails with the following error
thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Transaction { source: CommitConflict(ProtocolChanged) }', src/main.rs:87:33
stack backtrace:
0: rust_begin_unwind
I noticed the first transaction is executed and this error happens on the second write to the delta table.
Does anyone know why I am getting this error? Why do I get this error with the Rust version but not in the Python code which as far as I can tell, should be doing the same thing.
And more importantly what I need to do to fix it?