5

Attempting to build a pipeline to read from a 3rd party REST API endpoint data source.

I am using the HTTP (version 1.2.0) plugin found in the Hub.

The response request URL is: https://api.example.io/v2/somedata?return_count=false

A sample of response body:

{
  "paging": {
    "token": "12456789",
    "next": "https://api.example.io/v2/somedata?return_count=false&__paging_token=123456789"
  },
  "data": [
    {
      "cID": "aerrfaerrf",
      "first": true,
      "_id": "aerfaerrfaerrf",
      "action": "aerrfaerrf",
      "time": "1970-10-09T14:48:29+0000",
      "email": "example@aol.com"
    },
    {...}
  ]
}

The main error in the logs is:

java.lang.NullPointerException: null
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.getNextPage(BaseHttpPaginationIterator.java:118) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.ensurePageIterable(BaseHttpPaginationIterator.java:161) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.common.pagination.BaseHttpPaginationIterator.hasNext(BaseHttpPaginationIterator.java:203) ~[1580429892615-0/:na]
    at io.cdap.plugin.http.source.batch.HttpRecordReader.nextKeyValue(HttpRecordReader.java:60) ~[1580429892615-0/:na]
    at io.cdap.cdap.etl.batch.preview.LimitingRecordReader.nextKeyValue(LimitingRecordReader.java:51) ~[cdap-etl-core-6.1.1.jar:na]
    at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:214) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439) ~[scala-library-2.11.8.jar:na]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:128) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$4.apply(SparkHadoopWriter.scala:127) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1415) ~[spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$.org$apache$spark$internal$io$SparkHadoopWriter$$executeTask(SparkHadoopWriter.scala:139) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:83) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.internal.io.SparkHadoopWriter$$anonfun$3.apply(SparkHadoopWriter.scala:78) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.scheduler.Task.run(Task.scala:109) [spark-core_2.11-2.3.3.jar:2.3.3]
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [spark-core_2.11-2.3.3.jar:2.3.3]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_232]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_232]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_232]

Possible issues

After trying to troubleshoot this for awhile, I'm thinking the issue might be with

Pagination

  • Data Fusion HTTP plugin has a lot of methods to deal with pagination
    • Based on the response body above, it seems like the best option for Pagination Type is Link in Response Body
    • For the required Next Page JSON/XML Field Path parameter, I've tried $.paging.next and paging/next. Neither work.
    • I have verified that the link in /paging/next works when opening in Chrome

Authentication

  • When simply trying to view the response URL in Chrome, a prompt will pop up asking for username and password
    • Only need to input API key for username to get past this prompt in Chrome
    • To do this in the Data Fusion HTTP plugin, the API Key is used for Username in the Basic Authentication section

Anyone have any success in creating a pipeline in Google Cloud Data Fusion where the data source is a REST API?

Korean_Of_the_Mountain
  • 1,428
  • 3
  • 16
  • 40

2 Answers2

1

In answer to

Anyone have any success in creating a pipeline in Google Cloud Data Fusion where the data source is a REST API?

This is not the optimal way to achieve this the best way would be to ingest data Service APIs Overview to pub/sub your would then use pub/sub as the source for your pipeline this would provide a simple and reliable staging location for your data on its for processing, storage, and analysis, see the documentation for the pub/sub API . In order to use this in conjunction with Dataflow, the steps to follow are in the official documentation here Using Pub/Sub with Dataflow

Paddy Popeye
  • 1,634
  • 1
  • 16
  • 29
  • Thanks for response. Could you elaborate on why Data Fusion is not optimal for this application? Is it only because Pub/Sub provides a reliable staging location? It seems like Data Fusion could simply use Cloud Storage as a reliable staging location. – Korean_Of_the_Mountain Feb 04 '20 at 16:24
  • it is not that Data Fusion is the issue but from your description I understood that you wish to ingest directly from a REST endpoint... you can of course use Data fusion with pub/sub ... if you review this tutorial it will give you a better idea https://codelabs.developers.google.com/codelabs/real-time-csv-cdf-bq/index.html?index=..%2F..index#0 – Paddy Popeye Feb 05 '20 at 10:34
  • Yes, that is correct: the real issue here is how to ingest data from a REST endpoint. I'm still confused, however, on how Pub/Sub would help me send a GET request to my data source API and stage the response data before moving on to Data Fusion or Dataflow. The Service APIs link you provided in OP is about the Pub/Sub API but I don't see where it touches on ingesting data from API endpoint other than Google's own APIs – Korean_Of_the_Mountain Feb 10 '20 at 00:52
  • @Korean_Of_the_Mountain You should check out Apache NiFi. – DarkLeafyGreen Mar 19 '20 at 10:38
0

I think your problem is in the data format that you receive. The exception:

java.lang.NullPointerException: null

occurs when you do not specify a correct output schema (no schema in this case I believe)

Solution 1

To solve it, try configuring the HTTP Data Fusion plugin to:

  • Receive format: Text.
  • Output Schema: name: user Type: String

This should work to obtain the response from the API in string format. Once that is done, use a JSONParser to convert the string into a table like object.

Solution 2

Configure the HTTP Data Fusion plugin to:

  • Receive format: json
  • JSON/XML Result Path : data
  • JSON/XML Fields Mapping : Include the fields you presented (see attached foto).

HTTP plugin configuration

FVCC
  • 262
  • 2
  • 16