0

I have this Python code that executes 3 insert transactions into a delta table

import pandas as pd
from deltalake.writer import write_deltalake
from deltalake import DeltaTable

if __name__ == '__main__':
    # First transaction
    id_list = []
    name_list = []

    for i in range(1, 11):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

    # Second transaction
    id_list = []
    name_list = []

    for i in range(11, 16):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

    # Third transaction
    id_list = []
    name_list = []

    for i in range(16, 31):
        id_list.append(i)
        name_list.append(f'item {i}')

    data_dict = {'id': id_list, 'name': name_list}
    write_deltalake('./data/table2', pd.DataFrame(data_dict), mode='append')

This does what is intended. When I run this code, I can confirm by checking the _delta_log that indeed 3 transactions was executed.

Now I am trying to do similar using Rust, with the following code

fn get_data(start: u8, end: u8) -> RecordBatch {
    let schema = Arc::new(ArrowSchema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));

    let ids: Vec<i32> = (start..=end).map(i32::from).collect();
    let names: Vec<String> = ids.iter().map(|x| format!("item {x}")).collect();

    let id_values = Int32Array::from(ids);
    let name_values = StringArray::from(names);

    RecordBatch::try_new(schema, vec![Arc::new(id_values), Arc::new(name_values)]).unwrap()
}

async fn main() {
    // First transaction
    let data = get_data(1, 10);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();

    // Second transaction
    let data = get_data(11, 15);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();

    // Third transaction
    let data = get_data(16, 30);

    let table = DeltaTableBuilder::from_uri("./data/table3")
        .build()
        .unwrap();

    let ops = DeltaOps::from(table);

    ops.write(vec![data]).await.unwrap();
}

But when I run this, it fails with the following error

thread 'main' panicked at 'called `Result::unwrap()` on an `Err` value: Transaction { source: CommitConflict(ProtocolChanged) }', src/main.rs:87:33
stack backtrace:
   0: rust_begin_unwind

I noticed the first transaction is executed and this error happens on the second write to the delta table.

Does anyone know why I am getting this error? Why do I get this error with the Rust version but not in the Python code which as far as I can tell, should be doing the same thing.

And more importantly what I need to do to fix it?

Jmb
  • 18,893
  • 2
  • 28
  • 55
Finlay Weber
  • 2,989
  • 3
  • 17
  • 37
  • 1
    You should include both the entire error message, and a minimal reproducible example. Your error references line 87, but your snippet is 49 lines long. – PatientPenguin Jul 27 '23 at 17:47

0 Answers0