I have created a dataflow job in GCS which reads .tsv files from GCS Bucket and inserts into BigQuery Tables. Before forming a table row, I have to read a lookup file which has the column names. This lookup file is inside .tar.gz directory along with other files but I need to read only the lookup files with the name column_headers.tsv
I have used a FileIO.match().filepattern() to read a list of file names from the .tar.gz directory but dataflow in GCS is throwing error stating FileNotFoundException
Below is the stack trace,
severity: "ERROR"
textPayload: "Error message from worker: java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: fs:/xxx_xxxx_xxxxx/initial_load/xxxxxxxx_2022-10-11-lookup_data.tar.gz (No such file or directory)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:187)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner$1.outputWindowedValue(GroupAlsoByWindowFnRunner.java:108)
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:56)
org.apache.beam.runners.dataflow.worker.util.BatchGroupAlsoByWindowReshuffleFn.processElement(BatchGroupAlsoByWindowReshuffleFn.java:39)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.invokeProcessElement(GroupAlsoByWindowFnRunner.java:121)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowFnRunner.processElement(GroupAlsoByWindowFnRunner.java:73)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn.processElement(GroupAlsoByWindowsParDoFn.java:117)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:218)
org.apache.beam.runners.dataflow.worker.util.common.worker.ReadOperation.start(ReadOperation.java:169)
org.apache.beam.runners.dataflow.worker.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:83)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.executeWork(BatchDataflowWorker.java:420)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.doWork(BatchDataflowWorker.java:389)
org.apache.beam.runners.dataflow.worker.BatchDataflowWorker.getAndPerformWork(BatchDataflowWorker.java:314)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:140)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:120)
org.apache.beam.runners.dataflow.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:107)
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.beam.sdk.util.UserCodeException: java.io.FileNotFoundException: gs:/crs_user_events/initial_load/napaonlineglobal_2022-10-11-lookup_data.tar.gz (No such file or directory)
org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
gradle_inital_load.ReadTarDir$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.sdk.io.FileIO$ReadMatches$ToReadableFileFn.process(FileIO.java:874)
org.apache.beam.sdk.io.FileIO$ReadMatches$ToReadableFileFn$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:76)
org.apache.beam.sdk.transforms.MapElements$1.processElement(MapElements.java:142)
org.apache.beam.sdk.transforms.MapElements$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:285)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:275)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:85)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:423)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:411)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1.processElement(ReshuffleOverrideFactory.java:86)
org.apache.beam.runners.dataflow.ReshuffleOverrideFactory$ReshuffleWithOnlyTrigger$1$DoFnInvoker.invokeProcessElement(Unknown Source)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.invokeProcessElement(SimpleDoFnRunner.java:211)
org.apache.beam.runners.dataflow.worker.repackaged.org.apache.beam.runners.core.SimpleDoFnRunner.processElement(SimpleDoFnRunner.java:188)
org.apache.beam.runners.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:340)
org.apache.beam.runners.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:44)
org.apache.beam.runners.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:49)
org.apache.beam.runners.dataflow.worker.GroupAlsoByWindowsParDoFn$1.output(GroupAlsoByWindowsParDoFn.java:185)
... 21 more
Caused by: java.io.FileNotFoundException: fs:/xxx_xxxx_xxxx/initial_load/xxxxxx_2022-10-11-lookup_data.tar.gz (No such file or directory)
java.base/java.io.FileInputStream.open0(Native Method)
java.base/java.io.FileInputStream.open(FileInputStream.java:216)
xxxx_inital_load.ReadFilexxxx.processElement(ReadFilexxxx.java:97)
"
timestamp: "2022-12-13T17:31:06.310507157Z"
}
My pipeline code is as below,
package xxxx_inital_load;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileIO.ReadableFile;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.TableDestination;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptions.DirectRunner;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.beam.sdk.values.ValueInSingleWindow;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.common.io.Files;
import org.apache.beam.sdk.annotations.*;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.DataflowRunner;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
public class ReadFilexxxx {
private static final Logger LOG = LoggerFactory.getLogger(ReadFilexxxx.class);
static String outputTableProject = "xxx-x-xxxx";
static String outputTableDataset = "xxxx_events";
public static void main(String[] args) {
// TODO Auto-generated method stub
DataflowPipelineOptions dfOptions = PipelineOptionsFactory.as(MyOptions.class);
dfOptions.setProject("xxx-x-xxxx");
dfOptions.setStagingLocation("fs://xxxx_events/xxxx");
dfOptions.setRegion("us-eastx");
dfOptions.setTempLocation("fs://xxxx_events/xxxx");
dfOptions.setServiceAccount("xxxx-xx@xxx-x-xxxx.xxx.xxxxxxx.com");
dfOptions.setSubnetwork(
"https://www.googleapis.com/compute/v1/projects/xxx-xxx-x-xxxx-xxx/regions/us-eastx/subnetworks/x-xxxxxx-xxxx-xxxx");
dfOptions.setUsePublicIps(false);
dfOptions.setRunner(DataflowRunner.class);
DataflowRunner.fromOptions(dfOptions);
Pipeline p = Pipeline.create(dfOptions);
// PCollection<String[]> fileContents =
PCollectionView<String[]> filecontents = (PCollectionView<String[]>) p
.apply("MatchFile(s)", FileIO.match().filepattern(
"fs://xxxx_events/initial_load/*.tar.gz"))
.apply("GCS File Match", FileIO.readMatches().withCompression(Compression.AUTO))
.apply("Untar and read files", ParDo.of(new DoFn<FileIO.ReadableFile, String[]>() {
//.apply("Untar and read files", ParDo.of(new DoFn<MatchResult.Metadata, String[]>() {
@ProcessElement
public void processElement(ProcessContext c) throws FileNotFoundException, IOException {
String line = null;
String tmp = null;
String[] Data = null;// = new ArrayList<>();
String filename = c.element().getMetadata().resourceId().getFilename();
filename = "fs://xxxxx_events/initial_load/".concat(filename);
LOG.info("columns file path:", filename);
TarArchiveInputStream tarInput = new TarArchiveInputStream(
new GzipCompressorInputStream(new FileInputStream(filename)));
TarArchiveEntry currentEntry = tarInput.getNextTarEntry();
BufferedReader br = null;
while (currentEntry != null) {
if (currentEntry.getName().contains("column_headers.tsv")) {
br = new BufferedReader(new InputStreamReader(tarInput)); // Read directly from tarInput
System.out.println("For File = " + currentEntry.getName());
while ((tmp = br.readLine()) != null) {
line = tmp;
Data = line.split("\t");
System.out.println("line=" + line);
}
// br.close();
}
currentEntry = tarInput.getNextTarEntry(); // You forgot to iterate to the next file
}
br.close();
tarInput.close();
c.output(Data);
}
})).apply(View.asSingleton());
PCollection<String> lines = p.apply("Read Files",
TextIO.read().from("fs://xxxx_events/initial_load/*.tsv.gz")
.withCompression(Compression.GZIP));
p.getCoderRegistry().registerCoderForClass(ReadTarDir.class, TableAndRowCoder.of());
PCollection<TableAndRow> tablerows = lines
.apply("Transform File lines into TableAndRow", ParDo.of(new DoFn<String, TableAndRow>() {
@ProcessElement
public void processElement(ProcessContext c) {
int tabnam_idx, indx;
TableAndRow tbObj;
String tabName = null;
TableRow row = new TableRow();
String[] columns = c.sideInput(filecontents);
String[] arr = c.element().split("\t");
if (arr.length > 0) {
tabnam_idx = getIndex(columns, "channel");
indx = getIndex(columns, "page_event");
// ProductDetails
if ((arr[tabnam_idx].toString()).contains("productdetails")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("detail_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Detail_Page_View_Events");
row.set("eventType", "detail-page-view");
int index = getIndex(columns, "evar6");
if (arr[getIndex(columns, "evar6")] != "") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("eventTime", arr[getIndex(columns, "date_time")]);
row.set("experimentIds", arr[getIndex(columns, "evar104")]);
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("userInfo.userId", "1");
row.set("userInfo.ipAddress", arr[getIndex(columns, "ip")]);
row.set("userInfo.userAgent", arr[getIndex(columns, "user_agent")]);
row.set("userInfo.directUserRequest", "1");
row.set("uri", arr[getIndex(columns, "page_url")]);
if (arr[getIndex(columns, "visit_referrer")] == "") {
row.set("referrerUri", "1");
} else {
row.set("referrerUri", arr[getIndex(columns, "visit_referrer")]);
}
}
// Homepage
if ((arr[tabnam_idx].toString()).contains("homepage1")) {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("home_page_view_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Home_Page_View_Events");
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
}
// Search
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("search") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("search_events_idl");
// tabName = String.format("%s:%s.%s", outputTableProject,
// outputTableDataset,"Pass Table Name here");
/* create row here */
row.set("eventType", "search");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("searchQuery", arr[getIndex(columns, "evar1")]);
} else {
row.set("searchQuery", arr[getIndex(columns, "evar2")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// Browse
indx = getIndex(columns, "page_event");
if ((arr[tabnam_idx].toString()).contains("category-landing") && arr[indx] == "0") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("category_page_view_events_idl");
/* create row here */
row.set("eventType", "category-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("pageCategories", arr[getIndex(columns, "evar104")]);
}
// add-to-cart
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "12") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("add_to_cart_events_idl");
/* create row here */
row.set("eventType", "add-to-cart");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
}
// purchase complete
indx = getIndex(columns, "page_event");
if (arr[getIndex(columns, "product_list")] != null && arr[indx] == "1") {
tabName = "outputTableProject".concat(":").concat(outputTableDataset).concat(".")
.concat("purchase_complete_events_idl");
/* create row here */
row.set("eventType", "home-page-view");
if (arr[getIndex(columns, "evar6")] != " ") {
row.set("visitorId", arr[getIndex(columns, "evar6")]);
} else {
row.set("visitorId", arr[getIndex(columns, "mcvisid")]);
}
row.set("productDetails.product.id", arr[getIndex(columns, "product_list")]);
row.set("productDetails.product.quantity", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.revenue", arr[getIndex(columns, "product_list")]);
row.set("purchaseTransaction.currencyCode", arr[getIndex(columns, "product_list")]);
}
}
LOG.info("Row:" + row.toString());
tbObj = new TableAndRow(row, tabName);
c.output(tbObj);
}
}).withSideInputs(filecontents)).setCoder(TableAndRowCoder.of());
tablerows.apply("Write to BigQuery",
BigQueryIO.<TableAndRow>write().to(line -> getTableName(line))
.withFormatFunction((TableAndRow line) -> convertToTableRow(line))
.withCreateDisposition(CreateDisposition.CREATE_NEVER)
.withWriteDisposition(WriteDisposition.WRITE_APPEND));
p.run().waitUntilFinish();
System.out.println("Pipeline Executed");
}
private static TableRow convertToTableRow(TableAndRow line) {
// TODO Auto-generated method stub
TableRow row = line.getRow();
return row;
}
public static int getIndex(String[] Data, String str) {
int index = -1;
for (int j = 0; j < Data.length; j++) {
if (Data[j].contains(str)) {
index = j;
break;
}
}
return index;
}
public static TableDestination getTableName(ValueInSingleWindow<TableAndRow> line) {
TableDestination destination;
TableAndRow row = line.getValue();
destination = new TableDestination(row.getTab_name(), null);
return destination;
}
}