It's been two days and I have worked really hard to make it work but I am not able to find something exactly relevant. I am pasting all files below, really need this.
What I need is to send a message from service-two (.Net project) and receive the same in service-one (.Net project). Both projects, Kafka and Zookeeper are hosted on Docker using docker-compose. I am able to ping kafka:9092 from .Net project containers. And if I use console to create Kafka producer and consumer, they can communicate to each other, using docker exec
so that I stay within the same network. Something wrong with .Net client I guess.
Docker Compose file
version: "3.9"
networks:
app-tier:
driver: bridge
services:
database:
networks:
- app-tier
image: postgres:14.1-alpine3.15
restart: always
ports:
- '5432:5432'
environment:
POSTGRES_PASSWORD: Db_Password123*
POSTGRES_DB: mydb
volumes:
- data:/custom/mount:/var/lib/postgresql/data
service-one:
build: ./ServiceOne
networks:
- app-tier
depends_on:
- database
- zookeeper
restart: always
ports:
- '5000:5000'
- '5001:5001'
service-two:
networks:
- app-tier
depends_on:
- database
- zookeeper
build: ./ServiceTwo
restart: always
ports:
- '4000:4000'
- '4001:4001'
zookeeper:
image: 'bitnami/zookeeper:latest'
networks:
- app-tier
environment:
ALLOW_ANONYMOUS_LOGIN: yes
kafka:
image: 'bitnami/kafka:latest'
networks:
- app-tier
environment:
KAFKA_CREATE_TOPICS: "one:1:1,two:1:1"
KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
ALLOW_PLAINTEXT_LISTENER: yes
KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
depends_on:
- zookeeper
ports:
- '9092:9092'
volumes:
data:
service-one Dockerfile
FROM mcr.microsoft.com/dotnet/sdk:6.0
WORKDIR /app
EXPOSE 5001
EXPOSE 5000
RUN apt-get update && apt-get install -y iputils-ping
COPY . .
WORKDIR /app/ServiceOne
RUN dotnet dev-certs https -ep .\aspnetapp.pfx -p INTELLECTUAL
RUN dotnet dev-certs https --trust
CMD ["dotnet", "watch", "run"]
service-two Dockerfile
FROM mcr.microsoft.com/dotnet/sdk:6.0
WORKDIR /app
EXPOSE 4001
EXPOSE 4000
RUN apt-get update && apt-get install -y iputils-ping
COPY . .
WORKDIR /app/ServiceTwo
RUN dotnet dev-certs https -ep .\aspnetapp.pfx -p INTELLECTUAL
RUN dotnet dev-certs https --trust
CMD ["dotnet", "watch", "run"]
service-one Hosted Consumer Service
using System.Text;
using Kafka.Public;
using Kafka.Public.Loggers;
namespace ServiceOne.Services;
public class KafkaStringConsumerService : IHostedService
{
private readonly IClusterClient _cluster;
private readonly ConsoleLogger _logger;
public KafkaStringConsumerService()
{
_logger = new ConsoleLogger();
_cluster = new ClusterClient(new Configuration
{
Seeds = "kafka:9092"
},
_logger);
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cluster.MessageReceived += record =>
{
_logger.LogInformation(Encoding.UTF8.GetString((record.Value as byte[])!));
};
_cluster.ConsumeFromLatest("two", 1);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster?.Dispose();
return Task.CompletedTask;
}
}
service-one Program.cs
using ServiceOne.Services;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService<KafkaStringConsumerService>();
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.UseCors(config =>
{
config.AllowAnyHeader();
config.AllowAnyMethod();
config.AllowAnyOrigin();
});
app.MapControllers();
app.Run();
service-one lanchSettings.json
{
"$schema": "https://json.schemastore.org/launchsettings.json",
"profiles": {
"ServiceOne": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://0.0.0.0:5001;http://0.0.0.0:5000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
service-two HomeController.cs
using Kafka.Public;
using Microsoft.AspNetCore.Mvc;
namespace ServiceTwo.Controllers
{
[Route("[controller]/[action]")]
public class HomeController : Controller
{
private readonly IClusterClient _cluster;
public HomeController(IClusterClient cluster)
{
_cluster = cluster;
}
[HttpPost]
public IActionResult SendMessage([FromBody] string message)
{
var result = _cluster.Produce("two", null, message, 1);
return Ok($"Whether message was sent: {result}");
}
}
}
service-two Program.cs
using System.Net;
using Kafka.Public;
using Kafka.Public.Loggers;
var builder = WebApplication.CreateBuilder(args);
// Add services to the container.
builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddSingleton<IClusterClient>(
new ClusterClient(new Configuration
{
Seeds = "kafka:9092",
ClientId = Dns.GetHostName()
},
new ConsoleLogger()));
var app = builder.Build();
// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
app.UseSwagger();
app.UseSwaggerUI();
}
app.UseHttpsRedirection();
app.UseAuthorization();
app.UseCors(config =>
{
config.AllowAnyHeader();
config.AllowAnyMethod();
config.AllowAnyOrigin();
});
app.MapControllers();
app.Run();
service-two launchSettings.json
{
"$schema": "https://json.schemastore.org/launchsettings.json",
"profiles": {
"ServiceTwo": {
"commandName": "Project",
"dotnetRunMessages": true,
"launchBrowser": true,
"launchUrl": "swagger",
"applicationUrl": "https://0.0.0.0:4001;http://0.0.0.0:4000",
"environmentVariables": {
"ASPNETCORE_ENVIRONMENT": "Development"
}
}
}
}
Unable to paste the complete log because it is over 88k characters exceeding the total limit of 30000 characters for StackOverflow. Logs after calling Produce method in .Net.:
service-one_1 | [2021-12-20 15:10:09] INFO Fetching metadata from [kafka:9092]...
service-one_1 | [2021-12-20 15:10:09] INFO [Metadata][Brokers] (Id:1001 Host:localhost Port:9092)
service-one_1 | [2021-12-20 15:10:09] INFO Removing broker '[kafka:9092]' from the cluster in response to new topology
service-one_1 | [2021-12-20 15:10:09] INFO Connected to [Unknown]
service-two_1 | [2021-12-20 15:10:11] INFO Fetching metadata from [localhost:9092]...
service-two_1 | [2021-12-20 15:10:11] ERROR Could not get routing table! The Kafka cluster is probably having problems answering requests. Exception was: System.AggregateException: One or more errors occurred. (Kafka transport error)
service-two_1 | ---> Kafka.Network.TransportException: Kafka transport error
service-two_1 | ---> System.Net.Sockets.SocketException (111): Connection refused
service-two_1 | at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
service-two_1 | at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
service-two_1 | at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
service-two_1 | --- End of stack trace from previous location ---
service-two_1 | at Kafka.Network.Connection.ConnectAsync()
service-two_1 | --- End of inner exception stack trace ---
service-two_1 | at Kafka.Network.Connection.ConnectAsync()
service-two_1 | at Kafka.Cluster.Node.InitConnection()
service-two_1 | at Kafka.Cluster.Node.ProcessRequest(Ping dummy)
service-two_1 | at Kafka.Cluster.Cluster.ProcessFullMetadata(TaskCompletionSource`1 promise)
service-two_1 | --- End of inner exception stack trace ---
service-two_1 | at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
service-two_1 | at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
service-two_1 | at System.Threading.Tasks.Task`1.get_Result()
service-two_1 | at Kafka.Routing.ProduceRouter.EnsureHasRoutingTable()
service-two_1 | [2021-12-20 15:10:11] ERROR [Producer] No node available for [topic: two / partition: 1], postponing messages.
service-two_1 | [2021-12-20 15:10:11] INFO Fetching metadata from [localhost:9092]...
service-two_1 | [2021-12-20 15:10:12] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1 | [2021-12-20 15:10:12] INFO Fetching metadata from [localhost:9092]...
service-two_1 | [2021-12-20 15:10:14] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1 | [2021-12-20 15:10:14] INFO Fetching metadata from [localhost:9092]...
service-two_1 | [2021-12-20 15:10:14] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1 | [2021-12-20 15:10:14] WARNING Kafka node (Id:1001 Host:localhost Port:9092) is dead, refreshing metadata.
service-two_1 | [2021-12-20 15:10:14] ERROR All nodes are dead, retrying from bootstrap seeds.
service-two_1 | [2021-12-20 15:10:14] INFO Fetching metadata from [kafka:9092]...
service-two_1 | [2021-12-20 15:10:14] INFO [Metadata][Brokers] (Id:1001 Host:localhost Port:9092)
service-two_1 | [2021-12-20 15:10:14] INFO Removing broker '[kafka:9092]' from the cluster in response to new topology
service-two_1 | [2021-12-20 15:10:14] INFO Connected to [Unknown]
service-one_1 | [2021-12-20 15:10:19] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1 | [2021-12-20 15:10:29] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1 | [2021-12-20 15:10:39] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1 | [2021-12-20 15:10:49] DEBUG ProcessFullMetadata: no need to refresh the routing table
UPDATE
I am also unable to reach to database as well from .Net project to database hosted at docker container within same docker-compose
Thank you