0

I spent 2 days on my research and now I need you guys help. Thank you in advance.

I have following flow: 1) ConsumeKafka (messages are in JSON format) 2) EvaluateJsonPath 3) UpdateAttributes 4) AttributesToJson

All above flow is working but following rest of flow is not working: 5) PutCassandraRecord (I need help on how to configure this processor. I know my Cassandra server, port, keyspace, table name, record reader is JsonPathReader). What else??? 6) added controller service - JsonPathReader (here I need help as to how this record reader must be configured). 7) I am getting exception as attached file below. Where and how to I get or configure Schema Registry? enter image description here

I checked this question and answer: Apache Nifi/Cassandra - how to load CSV into Cassandra table

Guys if my flow is wrong please correct me. Thanks.

regards, Yeshwant

Yeshwant KAKAD
  • 279
  • 1
  • 6
  • 16
  • Does your flow file have an attribute named valor.vaengine ? – Bryan Bende Dec 12 '18 at 14:25
  • Hi Bryan, Valor indicates keyspace name of Cassandra. And 'vaengine' is name of the table. In my JSON file there is no attribute related to this keyspace and table name. – Yeshwant KAKAD Dec 13 '18 at 02:29
  • When you use ${something} that is an expression language statement referencing a flow file attribute named 'something'. Flow file attributes are different than the content of the flow file, so something would need to create the attribute valor.vaengine as Shu mentioned in his answer – Bryan Bende Dec 13 '18 at 14:17
  • Thank you Bryan for clearing my concepts. – Yeshwant KAKAD Dec 14 '18 at 01:22

1 Answers1

1

There are multiple ways we can configure Record Reader/writer controller service

i will try to explain the below two Schema Access Strategy

  • Use 'Schema Name' Property
  • Use 'Schema Text' Property

Use SchemaText property:

In this access strategy processor will look for avro.schema attribute in VariableRegistry/FlowfileAttributes (or) we can give schema in the property value.

Example:

I have given schema text property value as my avro schema enter image description here

Use 'Schema Name' Property:

enter image description here

In this strategy processor checks Schema Name property value ${valor.vaengine} (it's an attribute name) so we need to have value for this attribute associated with the flowfile.

Then controller service uses ${valor.vaengine} value uses appropriate schema from AvroSchemaRegistry that has been used by this controller service.

In your case your flowfile not having ${valor.vaengine} attribute, to add this attribute to the flowfile use UpdateAttribute processor add new property as

valor.vaengine

<schema_name_in_avroschemaregistry>

Use this template for more details regards to configuring/usage of Record Reader/writer Controller services


You are using JsonPathReader controller service for this controller service

we need to add atleast one user defined properties to enable the controller service, like property name as id value as $.id

notNull
  • 30,258
  • 4
  • 35
  • 50