0

I'm trying to build a fluentd pipeline that pulls data from an API and puts it in PostgresQL:

<source>
  @type http_pull

  tag energydata
  url http://10.0.0.30:8080/data
  interval 10s
  format json
</source>

# Record from source looks like this (I've sent it to stdout to verify): 
# {
# "url":"http://10.0.0.30:8080/data",
# "status":200,
# "message":
#   {
#   "timestamp":"2022-12-01T09:28:43Z",
#   "currentPowerConsumption":0.429
#   }
# }

<match energydata>
  @type sql
  host 10.0.0.10
  port 5432
  database energy
  adapter postgresql
  username fluent
  password somepasswd

  <table>
    table energymeterdata
    column_mapping '$.message.timestamp:timestamp,$.message.currentPowerConsumption:currentPowerConsumption'
  </table>
</match>

The resulting SQL row contains only NULL values. What is the right syntax for the record_accessor in the column_mapping?

I've tried different syntaxes and quote styles for the record_accessor use in the column mapping, but I can't find the right format.

Before knowing it should be possible with the record accessor (as claimed by one of the maintainers in a Github issue) I tried flattening the JSON structure, but I could not get that to work either. I prefer the approach mentioned in this post, because it is a cleaner solution. It seems this is a fairly basic scenario and I may be overlooking something.

Niels
  • 1,026
  • 8
  • 15
  • 1
    Do you see any errors in the logs? It didn't work after flattening the JSON with `record_transformer`, right? If yes, please add that config to your question as well. – Azeem Dec 01 '22 at 11:42
  • The only error I keep getting is that Postgres (rightfully) does not accept (null,null) as a valid row since there is a not null constraint on the timestamp. – Niels Dec 01 '22 at 11:49
  • @Ameen I was able to get the flattening to work in the 2 transformer steps you detailed in another post - thanks. I still would like to have the answer for doing it with the record_accessor, since that seems like a cleaner solution. – Niels Dec 01 '22 at 12:03
  • 1
    From the [parsing of the `column_mapping` field](https://github.com/fluent/fluent-plugin-sql/blob/a05003501dc65e121ed56aec59f5ea83eba9e665/lib/fluent/plugin/out_sql.rb#L140-L149) and the rest of the source, it doesn't look like it supports `record_accessor` syntax. It's a one-to-one mapping of keys (source fields of the internal msgpack) and values (destination columns of the table) so the transformation suggested in that issue should work. – Azeem Dec 01 '22 at 12:03
  • Good to hear that! You're welcome! You might want to self-answer this question with the exact configuration that solved your issue for others. Thanks! The maintainers might add `record_accessor` support in the future. I see your reaction to another similar [issue](https://github.com/fluent/fluent-plugin-sql/issues/126#issue-1161116130). – Azeem Dec 01 '22 at 12:08
  • Oh, I just saw this [comment](https://github.com/fluent/fluent-plugin-sql/issues/74#issuecomment-501963189) that the `record_accessor` is supported since v1. But, I'm not seeing anything related to that in the source. I'll go through the codebase thoroughly later. Your syntax looks correct though it's not working. – Azeem Dec 01 '22 at 12:14

1 Answers1

0

Turns out there is nothing other than the comment from user 'repeatedly' in the Github issue to show that record_accessor syntax is supported in the sql column_mapping parameter. This means that we will have to rely on filters to modify the json structure. For me, the following configuration works:

<filter energydata>
  @type record_transformer
  renew_record true
  keep_keys message
  enable_ruby true
  <record>
    message ${record["message"].to_json.to_s}
  </record>
</filter>

<filter energydata>
  @type parser
  key_name message
  reserve_data true
  remove_key_name_field true
  <parse>
    @type json
  </parse>
</filter>

Source: here

Niels
  • 1,026
  • 8
  • 15