1

I'm new to Apache Kafka. I'm trying to run their DemoApp in StreamAPI section and I get the following error in cmd:

The syntax of the command is incorrect.

Here're the variants I've used:

Variant 1:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true

Variant 2:

bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --consumer.config config\console-consumer.properties

console-consumer.properties contains the same properties as in the first variant.

I'm using Windows 10 with both PowerShell and classic cmd results in same error on execution of the batch file.

Here's the working example (the use Linux):
https://kafka.apache.org/27/documentation/streams/quickstart#quickstart_streams_start

Without formatter and configuration I can't see the message properly formatted (as obvious :).

UPDATE 1:

The debugged batch file output:

C:\Users\MyUserName\kafka_2.13-2.7.0>bin\windows\kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --consumer.config config\console-consumer.properties

SetLocal
set KAFKA_HEAP_OPTS=-Xmx512M
"C:\Users\MyUserName\kafka_2.13-2.7.0\bin\windows\kafka-run-class.bat" kafka.tools.ConsoleConsumer --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning --formatter kafka.tools.DefaultMessageFormatter --consumer.config config\console-consumer.properties
The syntax of the command is incorrect.

UPDATE 2:

The second debugged batch file output as asked by Mofi (only the last page):

set DEFAULT_DEBUG_SUSPEND_FLAG=n
rem Set Debug options if enabled
IF [""] NEQ [""] (
IF [""] EQU [""] (set JAVA_DEBUG_PORT=5005 )
 IF [""] EQU [""] (set DEBUG_SUSPEND_FLAG=n )
 set DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=!DEBUG_SUSPEND_FLAG!,address=!JAVA_DEBUG_PORT!
 IF [""] EQU [""] (set JAVA_DEBUG_OPTS=!DEFAULT_JAVA_DEBUG_OPTS! )
 echo Enabling Java debug options: !JAVA_DEBUG_OPTS!
 set KAFKA_OPTS=!JAVA_DEBUG_OPTS! !KAFKA_OPTS!
)
rem Which java to use
IF ["C:\Program Files\Java\jdk-14"] EQU [""] (set JAVA=java )  ELSE (set JAVA="C:\Program Files\Java\jdk-14/bin/java" )
rem Memory options
IF ["-Xmx512M"] EQU [""] (set KAFKA_HEAP_OPTS=-Xmx256M )
rem JVM performance options
IF [""] EQU [""] (set KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true )
IF not defined CLASSPATH (
echo Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
 EXIT /B 2
)
The syntax of the command is incorrect.
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

1

I suggest to try for batch file kafka-run-class.bat this code:

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

setlocal EnableExtensions DisableDelayedExpansion

IF "%~1" == "" (
    echo USAGE: %0 classname [opts]
    EXIT /B 1
)

rem Determine absolute path of base directory.
for %%i in ("%~dp0..\..") do set "BASE_DIR=%%~fi"

IF "%SCALA_VERSION%" == "" set "SCALA_VERSION=2.13.3"

IF "%SCALA_BINARY_VERSION%" == "" (
  for /f "tokens=1,2 delims=." %%i in ("%SCALA_VERSION%") do (
    if "%%j" == "" (
      set "SCALA_BINARY_VERSION=%%i"
    ) else (
      set "SCALA_BINARY_VERSION=%%i.%%j"
    )
  )
)

IF not defined CLASSPATH goto AddClassPaths
rem Remove all double quotes within CLASSPATH. No path should contain a
rem semicolon as otherwise the further processing would not be correct.
set "CLASSPATH=%CLASSPATH:"=%"
IF not defined CLASSPATH goto AddClassPaths
rem Remove last character if this character is a semicolon.
IF "%CLASSPATH:~-1%" == ";" set "CLASSPATH=%CLASSPATH:~0,-1%"

:AddClassPaths
rem Classpath addition for kafka-core dependencies
for %%i in ("%BASE_DIR%\core\build\dependant-libs-%SCALA_VERSION%\*.jar") do call :concat "%%i"

rem Classpath addition for kafka-examples
for %%i in ("%BASE_DIR%\examples\build\libs\kafka-examples*.jar") do call :concat "%%i"

rem Classpath addition for kafka-clients
for %%i in ("%BASE_DIR%\clients\build\libs\kafka-clients*.jar") do call :concat "%%i"

rem Classpath addition for kafka-streams
for %%i in ("%BASE_DIR%\streams\build\libs\kafka-streams*.jar") do call :concat "%%i"

rem Classpath addition for kafka-streams-examples
for %%i in ("%BASE_DIR%\streams\examples\build\libs\kafka-streams-examples*.jar") do call :concat "%%i"

for %%i in ("%BASE_DIR%\streams\build\dependant-libs-%SCALA_VERSION%\rocksdb*.jar") do call :concat "%%i"

rem Classpath addition for kafka tools
for %%i in ("%BASE_DIR%\tools\build\libs\kafka-tools*.jar") do call :concat "%%i"

for %%i in ("%BASE_DIR%\tools\build\dependant-libs-%SCALA_VERSION%\*.jar") do call :concat "%%i"

for %%j in (api runtime file json tools) do (
    for %%i in ("%BASE_DIR%\connect\%%j\build\libs\connect-%%j*.jar") do call :concat "%%i"
    if exist "%BASE_DIR%\connect\%%j\build\dependant-libs\*" call :concat "%BASE_DIR%\connect\%%j\build\dependant-libs\*"
)

rem Classpath addition for release
for %%i in ("%BASE_DIR%\libs\*") do call :concat "%%i"

rem Classpath addition for core
for %%i in ("%BASE_DIR%\core\build\libs\kafka_%SCALA_BINARY_VERSION%*.jar") do call :concat "%%i"

rem JMX settings
IF "%KAFKA_JMX_OPTS%" == "" set "KAFKA_JMX_OPTS=-Dcom.sun.management.jmxremote -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false"

rem JMX port to use
IF not "%JMX_PORT%" == "" set "KAFKA_JMX_OPTS=%KAFKA_JMX_OPTS% -Dcom.sun.management.jmxremote.port=%JMX_PORT%"

rem Log directory to use
IF "%LOG_DIR%" == "" set "LOG_DIR=%BASE_DIR%\logs"

rem Log4j settings
IF "%KAFKA_LOG4J_OPTS%" == "" (
    set "KAFKA_LOG4J_OPTS=-Dlog4j.configuration=file:%BASE_DIR%\config\tools-log4j.properties"
) ELSE (
  rem create logs directory
  IF not exist "%LOG_DIR%\" mkdir "%LOG_DIR%"
)

set "KAFKA_LOG4J_OPTS=-Dkafka.logs.dir="%LOG_DIR%" "%KAFKA_LOG4J_OPTS%""

rem Generic jvm settings you want to add
IF "%KAFKA_OPTS%" == "" set "KAFKA_OPTS="

set "DEFAULT_JAVA_DEBUG_PORT=5005"
set "DEFAULT_DEBUG_SUSPEND_FLAG=n"
rem Set Debug options if enabled
IF "%KAFKA_DEBUG%" == "" goto SetJavaHome

IF "%JAVA_DEBUG_PORT%" == "" set "JAVA_DEBUG_PORT=%DEFAULT_JAVA_DEBUG_PORT%"
IF "%DEBUG_SUSPEND_FLAG%" == "" set "DEBUG_SUSPEND_FLAG=%DEFAULT_DEBUG_SUSPEND_FLAG%"
set "DEFAULT_JAVA_DEBUG_OPTS=-agentlib:jdwp=transport=dt_socket,server=y,suspend=%DEBUG_SUSPEND_FLAG%,address=%JAVA_DEBUG_PORT%"
IF "%JAVA_DEBUG_OPTS%" == "" set "JAVA_DEBUG_OPTS=%DEFAULT_JAVA_DEBUG_OPTS%"

setlocal EnableDelayedExpansion
echo Enabling Java debug options: !JAVA_DEBUG_OPTS!
endlocal
set "KAFKA_OPTS=%JAVA_DEBUG_OPTS% %KAFKA_OPTS%"

:SetJavaHome
rem Which java to use
IF "%JAVA_HOME%" == "" (set "JAVA=java") ELSE set "JAVA=%JAVA_HOME%\bin\java"

rem Memory options
IF "%KAFKA_HEAP_OPTS%" == "" set "KAFKA_HEAP_OPTS=-Xmx256M"

rem JVM performance options
IF "%KAFKA_JVM_PERFORMANCE_OPTS%" == "" set "KAFKA_JVM_PERFORMANCE_OPTS=-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"

IF not defined CLASSPATH (
    echo Classpath is empty. Please build the project first e.g. by running 'gradlew jarAll'
    EXIT /B 2
)

set "COMMAND="%JAVA%" %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*"
rem echo/
rem echo %COMMAND%
rem echo/
%COMMAND%

rem Results in implicit execution of endlocal
goto :EOF
:concat
IF not defined CLASSPATH (set "CLASSPATH=%~1") ELSE set "CLASSPATH=%CLASSPATH%;%~1"

And for batch file kafka-console-consumer.bat use this code:

@echo off
rem Licensed to the Apache Software Foundation (ASF) under one or more
rem contributor license agreements.  See the NOTICE file distributed with
rem this work for additional information regarding copyright ownership.
rem The ASF licenses this file to You under the Apache License, Version 2.0
rem (the "License"); you may not use this file except in compliance with
rem the License.  You may obtain a copy of the License at
rem
rem     http://www.apache.org/licenses/LICENSE-2.0
rem
rem Unless required by applicable law or agreed to in writing, software
rem distributed under the License is distributed on an "AS IS" BASIS,
rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
rem See the License for the specific language governing permissions and
rem limitations under the License.

SetLocal EnableExtensions DisableDelayedExpansion
set "KAFKA_HEAP_OPTS=-Xmx512M"
call "%~dp0kafka-run-class.bat" kafka.tools.ConsoleConsumer %*
EndLocal

The modifications made on the two batch files are mainly those described with the issue chapters in this answer.

Mofi
  • 46,139
  • 17
  • 80
  • 143