0

I'm trying to import a table from a SQL Sever DB to Bigquery. This table has a Datetime column that I've mapped with Bigquery DATETIME. I'm using Dataflow to ingest the table with default template (JDBC to Bigquery).
This is the connection string:
jdbc:sqlserver://<hostIP>:1433;instanceName=SQLSERVER;databaseName=<dbName>;encrypt=true;trustServerCertificate=true;
I've also added user and pwd (my credentials) in the job's parameters.
But Dataflow gives me the following error:

Error message from worker: java.lang.ClassCastException: class java.sql.Timestamp cannot be cast to class java.time.temporal.TemporalAccessor (java.sql.Timestamp is in module java.sql of loader 'platform'; java.time.temporal.TemporalAccessor is in module java.base of loader 'bootstrap')
com.google.cloud.teleport.templates.common.JdbcConverters$ResultSetToTableRow.mapRow(JdbcConverters.java:157)
com.google.cloud.teleport.templates.common.JdbcConverters$ResultSetToTableRow.mapRow(JdbcConverters.java:115)
com.google.cloud.teleport.io.DynamicJdbcIO$DynamicReadFn.processElement(DynamicJdbcIO.java:388)  

I've also tried to map the datetime column with the Bigquery TIMESTAMP datatype, but I've got the same error.
It's not a connection problem, because I've tried to read other columns (except the datetime column) and the dataflow job works.

What do I have to do? What's the problem?

I'm using com.microsoft.sqlserver.jdbc.SQLServerDriver as JDBC Driver name and I've downloaded from Maven the jar version mssql-jdbc-11.2.1.jre8.jar.

Thom A
  • 88,727
  • 11
  • 45
  • 75
alex-mont
  • 75
  • 7
  • Hi @alex-mont, for your requirement, you can check this [StackOverflow question](https://stackoverflow.com/questions/25892417/caused-by-java-lang-classcastexception-java-sql-timestamp-cannot-be-cast-to-ja) . Let me know if that helps. – Shipra Sarkar Nov 04 '22 at 13:36
  • Hi @ShipraSarkar, it doesn't help. My problem is not on DATE type, but on DATETIME columns. Moreover, I don't write any line of code: I'm using Dataflow's templates. I've also tried to set Doracle.jdbc.V8Compatible=true, it still doesn't work. – alex-mont Nov 04 '22 at 16:30

2 Answers2

3

Solved in this way:

  1. Parse datetime columns when reading tables with SQL function CONVERT.
  2. Create tables in BigQuery and specify DATETIME datatype for the parsed columns in step 1.
  3. Bigquery will cast that columns automatically.

Consider this example. You have the SQL Server table

CREATE TABLE User(id varchar(N), date_insert datatime);

Step n.1:

SELECT id, CONVERT(varchar, date_insert, 20) as date_insert FROM User; 

according to: date & time conversion.

Step n.2 Create User table in BigQuery with id as STRING(N) and date_insert as DATETIME.

Step n.3 Ingest your data.

buddemat
  • 4,552
  • 14
  • 29
  • 49
alex-mont
  • 75
  • 7
0

I could not use Dataflow due to some specific challenges, and I did it with Python to ingest data to BigQuery from SQL Server using ODBC driver, SQLAlchemy and BigQuery client libraries.

For me, the issue was due to hierarchy column, it was handled through a function and returned as string column. Code given below for your reference.

import pyodbc 
import sqlalchemy
from sqlalchemy.engine import URL
import datetime
from google.cloud import bigquery
from google.oauth2 import service_account

print("Sync job started at :", datetime.datetime.now())

credentials = service_account.Credentials.from_service_account_file('sa-key.json')
projectid = "myproject"
tableid = "projectid.dataset.table"
client = bigquery.Client(credentials= credentials,project=projectid)

server = 'cloudsql-proxyip' 
database = 'db' 
username = 'user' 
password = 'pwd' 
driver = "{ODBC Driver 18 for SQL Server}"
port = 1433

cnxn = pyodbc.connect('DRIVER={ODBC Driver 18 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password+';TrustServerCertificate=YES;')
cursor = cnxn.cursor()

def HandleHierarchyId(v):
    return str(v)

connection_string = f"DRIVER={driver};SERVER={server};DATABASE={database};UID={username};PWD={password};TrustServerCertificate=YES;"
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})
sql_engine = sqlalchemy.create_engine(connection_url)

select_stmt = ("SELECT * FROM [db].[schema].[view]")

print("Creating connetion")
db = sql_engine.connect().connection
db.add_output_converter(-151, HandleHierarchyId)
    
print("Fetching data")
data = db.execute(select_stmt).fetchall()
md = sqlalchemy.MetaData()
table = sqlalchemy.Table('view', md, autoload=True, autoload_with=sql_engine)
columns = table.c

column_list = []
for column in columns:
    column_list.append(column.name)


def split(a, n):
    k, m = divmod(len(a), n)
    return (a[i*k+min(i, m):(i+1)*k+min(i+1, m)] for i in range(n))


def to_json(row, columns):
    newlist = list(row)
    newdicts = dict(zip(columns, newlist))
    return newdicts

def ingestdata(tableid, data):
    table = bigquery.Table.from_string(tableid)
    print("Data ingestion started")
    errors = client.insert_rows_json(table, data)  # Make an API request.
    if errors == []:
        print("New rows have been added.")
    else:
        print("Encountered errors while inserting rows: {}".format(errors))

insertdata = []
for row in data:
    newdict = to_json(row, column_list)
    insertdata.append(newdict)

print("Total record count :", len(insertdata))

# BigQuery POST API limitations - split the list into multiple smaller chunks
records = split(insertdata, 10)


for r in records:
    print("Batch record count :", len(r))
    ingestdata(tableid=tableid, data=r)


print("Total records inserted : ", len(insertdata))
print("Sync job ended at :", datetime.datetime.now())
Rathish Kumar B
  • 1,271
  • 10
  • 21