I have a DSL (shown below) that ends on "log", so the json produced from the jdbc source should be logged and it's not.
The Supplier reads a database queue and produce a json array for the rows.
If I turn on logging, the SybaseSupplierConfiguration.this.logger.debug("Json: {}", json);
is outputted.
Why is it not flowing to "log" ?
So far I have tried:
- Downgrade spring boot to 2.2.9 (using 2.3.2)
- Fixed the return result of jsonSupplier (to a json string)
- Disabled prometheus / grafana
- Explicitly configured poll
spring.cloud.stream.poller.fixed-delay=10
- Used rabbitmq binder and docker image
- Offered some booz to the spring cloud dataflow god.
None worked.
the docker:
export DATAFLOW_VERSION=2.6.0
export SKIPPER_VERSION=2.5.0
docker-compose -f ./docker-compose.yml -f ./docker-compose-prometheus.yml up -d
the pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>company-cloud-dataflow-apps</artifactId>
<groupId>br.com.company.cloud.dataflow.apps</groupId>
<version>1.0.0-SNAPSHOT</version>
<relativePath>../pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>jdbc-sybase-supplier</artifactId>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-test-support</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud.stream.app</groupId>
<artifactId>app-starters-micrometer-common</artifactId>
<version>${app-starters-micrometer-common.version}</version>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
</dependency>
<dependency>
<groupId>io.micrometer.prometheus</groupId>
<artifactId>prometheus-rsocket-spring</artifactId>
<version>${prometheus-rsocket.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
<groupId>net.sourceforge.jtds</groupId>
<artifactId>jtds</artifactId>
<version>1.3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jdbc</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jdk8</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-parameter-names</artifactId>
<version>${jackson.version}</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-jaxb-annotations</artifactId>
<version>${jackson.version}</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
The configuration:
....
spring.cloud.stream.function.bindings.jsonSupplier-out-0=output
spring.cloud.function.definition=jsonSupplier
The implementation:
@SpringBootApplication
@EnableConfigurationProperties(SybaseSupplierProperties.class)
public class SybaseSupplierConfiguration {
private final DataSource dataSource;
private final SybaseSupplierProperties properties;
private final ObjectMapper objectMapper;
private final JdbcTemplate jdbcTemplate;
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
public SybaseSupplierConfiguration(DataSource dataSource,
JdbcTemplate jdbcTemplate,
SybaseSupplierProperties properties) {
this.dataSource = dataSource;
this.jdbcTemplate = jdbcTemplate;
this.properties = properties;
objectMapper = new ObjectMapper().registerModule(new ParameterNamesModule())
.registerModule(new Jdk8Module())
.registerModule(new JavaTimeModule())
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}
public static void main(String[] args) {
SpringApplication.run(SybaseSupplierConfiguration.class, args);
}
@Value
static class IntControle {
long cdIntControle;
String deTabela;
}
@Bean
public MessageSource<Object> jdbcMessageSource() {
String query = "select cd_int_controle, de_tabela from int_controle rowlock readpast " +
"where id_status = 0 order by cd_int_controle";
JdbcPollingChannelAdapter adapter =
new JdbcPollingChannelAdapter(dataSource, query) {
@Override
protected Object doReceive() {
Object object = super.doReceive();
if (object == null) {
return null;
}
@SuppressWarnings("unchecked")
List<IntControle> ints = (List<IntControle>) object;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
try (JsonGenerator jen = objectMapper.createGenerator(out)) {
jen.writeStartArray();
for (IntControle itm : ints) {
String qry = String.format("select * from vw_integ_%s where cd_int_controle = %d",
itm.getDeTabela(), itm.getCdIntControle());
List<Map<String, Object>> maps = jdbcTemplate.queryForList(qry);
for (Map<String, Object> l : maps) {
jen.writeStartObject();
for (Map.Entry<String, Object> entry : l.entrySet()) {
String k = entry.getKey();
Object v = entry.getValue();
jen.writeFieldName(k);
if (v == null) {
jen.writeNull();
} else {
//caso necessário um item específico, ver em:
// https://stackoverflow.com/questions/6514876/most-efficient-conversion-of-resultset-to-json
jen.writeObject(v);
}
}
jen.writeEndObject();
}
}
jen.writeEndArray();
}
String json = out.toString();
SybaseSupplierConfiguration.this.logger.debug("Json: {}", json);
return json;
} catch (IOException e) {
throw new IllegalArgumentException("Erro ao converter json", e);
}
}
};
adapter.setMaxRows(properties.getPollSize());
adapter.setUpdatePerRow(true);
adapter.setRowMapper((RowMapper<IntControle>) (rs, i) -> new IntControle(rs.getLong(1), rs.getString(2)));
adapter.setUpdateSql("update int_controle set id_status = 1 where cd_int_controle = :cdIntControle");
return adapter;
}
@Bean
public Supplier<Message<?>> jsonSupplier() {
return jdbcMessageSource()::receive;
}
}
the shell setup:
app register --name jdbc-postgresql-sink --type sink --uri maven://br.com.company.cloud.dataflow.apps:jdbc-postgresql-sink:1.0.0-SNAPSHOT --force
app register --name jdbc-sybase-supplier --type source --uri maven://br.com.company.cloud.dataflow.apps:jdbc-sybase-supplier:1.0.0-SNAPSHOT --force
stream create --name sybase_to_pgsql --definition "jdbc-sybase-supplier | log "
stream deploy --name sybase_to_pgsql
the log:
....
2020-08-02 00:40:18.644 INFO 81 --- [ main] o.s.b.a.e.web.EndpointLinksResolver : Exposing 0 endpoint(s) beneath base path '/actuator'
2020-08-02 00:40:18.793 INFO 81 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {logging-channel-adapter:_org.springframework.integration.errorLogger} as a subscriber to the 'errorChannel' channel
2020-08-02 00:40:18.793 INFO 81 --- [ main] o.s.i.channel.PublishSubscribeChannel : Channel 'application.errorChannel' has 1 subscriber(s).
2020-08-02 00:40:18.793 INFO 81 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean '_org.springframework.integration.errorLogger'
2020-08-02 00:40:18.793 INFO 81 --- [ main] o.s.i.endpoint.EventDrivenConsumer : Adding {router} as a subscriber to the 'jsonSupplier_integrationflow.channel#0' channel
2020-08-02 00:40:18.793 INFO 81 --- [ main] o.s.integration.channel.DirectChannel : Channel 'application.jsonSupplier_integrationflow.channel#0' has 1 subscriber(s).
2020-08-02 00:40:18.794 INFO 81 --- [ main] o.s.i.endpoint.EventDrivenConsumer : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'
2020-08-02 00:40:18.795 INFO 81 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Creating binder: kafka
2020-08-02 00:40:19.235 INFO 81 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: kafka
2020-08-02 00:40:19.235 INFO 81 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: kafka
2020-08-02 00:40:19.362 INFO 81 --- [ main] o.s.c.s.b.k.p.KafkaTopicProvisioner : Using kafka topic for outbound: sybase_to_pgsql.jdbc-sybase-supplier
2020-08-02 00:40:19.364 INFO 81 --- [ main] o.a.k.clients.admin.AdminClientConfig : AdminClientConfig values:
bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
client.dns.lookup = default
client.id =
connections.max.idle.ms = 300000
default.api.timeout.ms = 60000
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 2147483647
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
2020-08-02 00:40:19.572 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-02 00:40:19.572 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:19.572 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1596328819571
2020-08-02 00:40:20.403 INFO 81 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-1
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-02 00:40:20.477 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-02 00:40:20.477 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:20.477 INFO 81 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1596328820477
2020-08-02 00:40:20.573 INFO 81 --- [ad | producer-1] org.apache.kafka.clients.Metadata : [Producer clientId=producer-1] Cluster ID: um9lJtXTQUmURh9cwOkqxA
2020-08-02 00:40:20.574 INFO 81 --- [ main] o.a.k.clients.producer.KafkaProducer : [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 30000 ms.
2020-08-02 00:40:20.622 INFO 81 --- [ main] o.s.c.s.m.DirectWithAttributesChannel : Channel 'application.output' has 1 subscriber(s).
2020-08-02 00:40:20.625 INFO 81 --- [ main] o.s.i.e.SourcePollingChannelAdapter : started bean 'jsonSupplier_integrationflow.org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean#0'
2020-08-02 00:40:20.654 INFO 81 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 20031 (http) with context path ''
2020-08-02 00:40:20.674 INFO 81 --- [ main] b.c.c.d.a.s.SybaseSupplierConfiguration : Started SybaseSupplierConfiguration in 12.982 seconds (JVM running for 14.55)
2020-08-02 00:40:21.160 INFO 81 --- [ask-scheduler-1] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
batch.size = 16384
bootstrap.servers = [PLAINTEXT://kafka-broker:9092]
buffer.memory = 33554432
client.dns.lookup = default
client.id = producer-2
compression.type = none
connections.max.idle.ms = 540000
delivery.timeout.ms = 120000
enable.idempotence = false
interceptor.classes = []
key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
linger.ms = 0
max.block.ms = 60000
max.in.flight.requests.per.connection = 5
max.request.size = 1048576
metadata.max.age.ms = 300000
metadata.max.idle.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
receive.buffer.bytes = 32768
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retries = 0
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
security.providers = null
send.buffer.bytes = 131072
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLSv1.2
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
transaction.timeout.ms = 60000
transactional.id = null
value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
2020-08-02 00:40:21.189 INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.5.0
2020-08-02 00:40:21.189 INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 66563e712b0b9f84
2020-08-02 00:40:21.189 INFO 81 --- [ask-scheduler-1] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1596328821189
2020-08-02 00:40:21.271 INFO 81 --- [ad | producer-2] org.apache.kafka.clients.Metadata : [Producer clientId=producer-2] Cluster ID: um9lJtXTQUmURh9cwOkqxA