0

It's been two days and I have worked really hard to make it work but I am not able to find something exactly relevant. I am pasting all files below, really need this.

What I need is to send a message from service-two (.Net project) and receive the same in service-one (.Net project). Both projects, Kafka and Zookeeper are hosted on Docker using docker-compose. I am able to ping kafka:9092 from .Net project containers. And if I use console to create Kafka producer and consumer, they can communicate to each other, using docker exec so that I stay within the same network. Something wrong with .Net client I guess.

Docker Compose file

version: "3.9"

networks:
  app-tier:
    driver: bridge

services:
  database:
    networks:
    - app-tier

    image: postgres:14.1-alpine3.15
    restart: always
    ports:
      - '5432:5432'
    environment:
      POSTGRES_PASSWORD: Db_Password123*
      POSTGRES_DB: mydb
    volumes:
      - data:/custom/mount:/var/lib/postgresql/data

  service-one:
    build: ./ServiceOne
    networks:
    - app-tier
    depends_on:
      - database
      - zookeeper
    restart: always
    ports:
      - '5000:5000'
      - '5001:5001'

  service-two:
    networks:
    - app-tier
    depends_on:
      - database
      - zookeeper
    build: ./ServiceTwo
    restart: always
    ports:
      - '4000:4000'
      - '4001:4001'

  zookeeper:
    image: 'bitnami/zookeeper:latest'
    networks:
    - app-tier
    environment:
      ALLOW_ANONYMOUS_LOGIN: yes
  
  kafka:
    image: 'bitnami/kafka:latest'
    networks:
    - app-tier
    environment:
      KAFKA_CREATE_TOPICS: "one:1:1,two:1:1"
      KAFKA_CFG_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181
      ALLOW_PLAINTEXT_LISTENER: yes
      KAFKA_CFG_LISTENERS: PLAINTEXT://:9092
      KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: true
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true
    depends_on:
      - zookeeper
    ports:
      - '9092:9092'

volumes:
  data:

service-one Dockerfile

FROM mcr.microsoft.com/dotnet/sdk:6.0
WORKDIR /app
EXPOSE 5001
EXPOSE 5000

RUN apt-get update && apt-get install -y iputils-ping

COPY . .

WORKDIR /app/ServiceOne

RUN dotnet dev-certs https -ep .\aspnetapp.pfx -p INTELLECTUAL
RUN dotnet dev-certs https --trust

CMD ["dotnet", "watch", "run"]

service-two Dockerfile

FROM mcr.microsoft.com/dotnet/sdk:6.0
WORKDIR /app
EXPOSE 4001
EXPOSE 4000

RUN apt-get update && apt-get install -y iputils-ping

COPY . .

WORKDIR /app/ServiceTwo

RUN dotnet dev-certs https -ep .\aspnetapp.pfx -p INTELLECTUAL
RUN dotnet dev-certs https --trust

CMD ["dotnet", "watch", "run"]

service-one Hosted Consumer Service

using System.Text;
using Kafka.Public;
using Kafka.Public.Loggers;

namespace ServiceOne.Services;

public class KafkaStringConsumerService : IHostedService
{
    private readonly IClusterClient _cluster;
    private readonly ConsoleLogger _logger;

    public KafkaStringConsumerService()
    {
        _logger = new ConsoleLogger();

        _cluster = new ClusterClient(new Configuration
        {
            Seeds = "kafka:9092"
        },
        _logger);
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _cluster.MessageReceived += record =>
        {
            _logger.LogInformation(Encoding.UTF8.GetString((record.Value as byte[])!));
        };

        _cluster.ConsumeFromLatest("two", 1);

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _cluster?.Dispose();
        return Task.CompletedTask;
    }
}

service-one Program.cs

using ServiceOne.Services;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddHostedService<KafkaStringConsumerService>();

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.UseCors(config =>
{
    config.AllowAnyHeader();
    config.AllowAnyMethod();
    config.AllowAnyOrigin();
});

app.MapControllers();

app.Run();

service-one lanchSettings.json

{
  "$schema": "https://json.schemastore.org/launchsettings.json",
  "profiles": {
    "ServiceOne": {
      "commandName": "Project",
      "dotnetRunMessages": true,
      "launchBrowser": true,
      "launchUrl": "swagger",
      "applicationUrl": "https://0.0.0.0:5001;http://0.0.0.0:5000",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development"
      }
    }
  }
}

service-two HomeController.cs

using Kafka.Public;
using Microsoft.AspNetCore.Mvc;

namespace ServiceTwo.Controllers
{
    [Route("[controller]/[action]")]
    public class HomeController : Controller
    {
        private readonly IClusterClient _cluster;

        public HomeController(IClusterClient cluster)
        {
            _cluster = cluster;
        }
        
        [HttpPost]
        public IActionResult SendMessage([FromBody] string message)
        {
            var result = _cluster.Produce("two", null, message, 1);
            return Ok($"Whether message was sent: {result}");
        }
    }
}

service-two Program.cs

using System.Net;
using Kafka.Public;
using Kafka.Public.Loggers;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();
// Learn more about configuring Swagger/OpenAPI at https://aka.ms/aspnetcore/swashbuckle
builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();
builder.Services.AddSingleton<IClusterClient>(
    new ClusterClient(new Configuration
    {
        Seeds = "kafka:9092",
        ClientId = Dns.GetHostName()
    },
    new ConsoleLogger()));

var app = builder.Build();

// Configure the HTTP request pipeline.
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

app.UseHttpsRedirection();

app.UseAuthorization();

app.UseCors(config =>
{
    config.AllowAnyHeader();
    config.AllowAnyMethod();
    config.AllowAnyOrigin();
});

app.MapControllers();

app.Run();

service-two launchSettings.json

{
  "$schema": "https://json.schemastore.org/launchsettings.json",
  "profiles": {
    "ServiceTwo": {
      "commandName": "Project",
      "dotnetRunMessages": true,
      "launchBrowser": true,
      "launchUrl": "swagger",
      "applicationUrl": "https://0.0.0.0:4001;http://0.0.0.0:4000",
      "environmentVariables": {
        "ASPNETCORE_ENVIRONMENT": "Development"
      }
    }
  }
}

Unable to paste the complete log because it is over 88k characters exceeding the total limit of 30000 characters for StackOverflow. Logs after calling Produce method in .Net.:

service-one_1  | [2021-12-20 15:10:09] INFO Fetching metadata from [kafka:9092]...
service-one_1  | [2021-12-20 15:10:09] INFO [Metadata][Brokers] (Id:1001 Host:localhost Port:9092)
service-one_1  | [2021-12-20 15:10:09] INFO Removing broker '[kafka:9092]' from the cluster in response to new topology
service-one_1  | [2021-12-20 15:10:09] INFO Connected to [Unknown]
service-two_1  | [2021-12-20 15:10:11] INFO Fetching metadata from [localhost:9092]...
service-two_1  | [2021-12-20 15:10:11] ERROR Could not get routing table! The Kafka cluster is probably having problems answering requests. Exception was: System.AggregateException: One or more errors occurred. (Kafka transport error)
service-two_1  |  ---> Kafka.Network.TransportException: Kafka transport error
service-two_1  |  ---> System.Net.Sockets.SocketException (111): Connection refused
service-two_1  |    at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.ThrowException(SocketError error, CancellationToken cancellationToken)
service-two_1  |    at System.Net.Sockets.Socket.AwaitableSocketAsyncEventArgs.System.Threading.Tasks.Sources.IValueTaskSource.GetResult(Int16 token)
service-two_1  |    at System.Threading.Tasks.ValueTask.ValueTaskSourceAsTask.<>c.<.cctor>b__4_0(Object state)
service-two_1  | --- End of stack trace from previous location ---
service-two_1  |    at Kafka.Network.Connection.ConnectAsync()
service-two_1  |    --- End of inner exception stack trace ---
service-two_1  |    at Kafka.Network.Connection.ConnectAsync()
service-two_1  |    at Kafka.Cluster.Node.InitConnection()
service-two_1  |    at Kafka.Cluster.Node.ProcessRequest(Ping dummy)
service-two_1  |    at Kafka.Cluster.Cluster.ProcessFullMetadata(TaskCompletionSource`1 promise)
service-two_1  |    --- End of inner exception stack trace ---
service-two_1  |    at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
service-two_1  |    at System.Threading.Tasks.Task`1.GetResultCore(Boolean waitCompletionNotification)
service-two_1  |    at System.Threading.Tasks.Task`1.get_Result()
service-two_1  |    at Kafka.Routing.ProduceRouter.EnsureHasRoutingTable()
service-two_1  | [2021-12-20 15:10:11] ERROR [Producer] No node available for [topic: two / partition: 1], postponing messages.
service-two_1  | [2021-12-20 15:10:11] INFO Fetching metadata from [localhost:9092]...
service-two_1  | [2021-12-20 15:10:12] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1  | [2021-12-20 15:10:12] INFO Fetching metadata from [localhost:9092]...
service-two_1  | [2021-12-20 15:10:14] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1  | [2021-12-20 15:10:14] INFO Fetching metadata from [localhost:9092]...
service-two_1  | [2021-12-20 15:10:14] WARNING Failed to connect to (Id:1001 Host:localhost Port:9092), retrying.
service-two_1  | [2021-12-20 15:10:14] WARNING Kafka node (Id:1001 Host:localhost Port:9092) is dead, refreshing metadata.
service-two_1  | [2021-12-20 15:10:14] ERROR All nodes are dead, retrying from bootstrap seeds.
service-two_1  | [2021-12-20 15:10:14] INFO Fetching metadata from [kafka:9092]...
service-two_1  | [2021-12-20 15:10:14] INFO [Metadata][Brokers] (Id:1001 Host:localhost Port:9092)
service-two_1  | [2021-12-20 15:10:14] INFO Removing broker '[kafka:9092]' from the cluster in response to new topology
service-two_1  | [2021-12-20 15:10:14] INFO Connected to [Unknown]
service-one_1  | [2021-12-20 15:10:19] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1  | [2021-12-20 15:10:29] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1  | [2021-12-20 15:10:39] DEBUG ProcessFullMetadata: no need to refresh the routing table
service-one_1  | [2021-12-20 15:10:49] DEBUG ProcessFullMetadata: no need to refresh the routing table

UPDATE

I am also unable to reach to database as well from .Net project to database hosted at docker container within same docker-compose

Thank you

Imran Faruqi
  • 663
  • 9
  • 19
  • 1
    What errors are you getting? Does the producer work on its own? As in, if you use Kafka cli consumer, does it get any messages? Try including `KAFKA_CFG_LISTENERS: CLIENT://0.0.0.0:9092` so other containers can connect – OneCricketeer Dec 20 '21 at 01:30
  • Kafka consumer and producer can communicate using command line, if I initiate consumer and producer. I will post the log of .Net. Its not throwing exceptions, but unable to find the broker or topic although topic is there. – Imran Faruqi Dec 20 '21 at 08:02
  • 1
    If it's "unable to find" the Kafka container, you should be getting errors in the logs saying so. You should additionally add your own logging to your code so that you know what's happening where – OneCricketeer Dec 20 '21 at 14:59
  • Its not "unable to find", let me paste the logs. No errors until I send the produce request from .Net code. When I send the request to controller, and it produces the message, it then displays some notes but no exceptions etc. – Imran Faruqi Dec 20 '21 at 15:05
  • There is an exception in the logs, please check. @OneCricketeer – Imran Faruqi Dec 20 '21 at 15:12
  • I am using Kafka-Sharp. Could that be a problem? should I directly connect to Kafka socket? – Imran Faruqi Dec 20 '21 at 15:13
  • Why did you modify your compose file? It was correct before, with the exception of the variable I mentioned – OneCricketeer Dec 20 '21 at 15:36
  • More specifically, see `Fetching metadata from [localhost:9092]`... This is not your Kafka container. This is not the address you gave in the code? Where's it come from? Well, look at `KAFKA_CFG_ADVERTISED_LISTENERS`... This need to "advertise" `kafka:9092`, like it did before – OneCricketeer Dec 20 '21 at 15:39
  • When I do KAFKA_CFG_ADVERTISED_LISTENERS: kafka:9092, it does not communicate between Kafka producer and consumer using console. So I used localhost and it worked. Okay I will change back to what it was and let you know. – Imran Faruqi Dec 20 '21 at 18:32
  • That value is correct for your C# services. You need two separate listeners if you are trying to use CLI tools **outside** of the containers, as written in the Bitnami readme and here https://stackoverflow.com/questions/51630260/connect-to-kafka-running-in-docker – OneCricketeer Dec 20 '21 at 22:12
  • Understood. Also, I figured that I was unable to communicate to even database from the code. So I guess I will have to update my question for future reference. – Imran Faruqi Dec 21 '21 at 13:12
  • Hmm. You could try removing all the networks tags since compose creates a default network on its own for all services – OneCricketeer Dec 21 '21 at 15:28
  • I did that. Could that be a problem with Docker on Windows? :o I litrally have left working with that task for now but I would take that up again and see how it goes :/ – Imran Faruqi Dec 22 '21 at 19:21
  • Between containers? No. Only if Docker is running via WSL2, and your C# code is not running in WSL2, then yes. You would need to port-forward the hypervisor to the windows host network interface. For example - (use port 9092 instead of 3000) https://github.com/microsoft/WSL/issues/4150#issuecomment-504051131 ... If you use the correct advertised listeners, your logs would not say `INFO [Metadata][Brokers] (Id:1001 Host:localhost Port:9092)`, or `INFO Fetching metadata from [localhost:9092]` and would rather have `kafka:9092` for those lines. Beyond that, I don't know what "using console" means – OneCricketeer Dec 22 '21 at 20:24
  • C# code is also within the same docker-compose file so I guess network would be same. And I can ping using `docker exec` from one container to another (project to database or kafka). I mean by console is, use docker exec to control kafka container and launch one instance of producer and one instance of consumer, and they can talk to each other. But they are in the same container in this case. – Imran Faruqi Dec 22 '21 at 21:25
  • `ping` doesn't verify port connectivity, only hostnames/IPs, though, and disregards the Kafka listener protocols – OneCricketeer Dec 23 '21 at 00:14
  • I see. I will have to figureout if I can somehow ping the port too. – Imran Faruqi Dec 23 '21 at 00:15

0 Answers0