I have a text file having 10000 log lines like
2022-12-27T00:00:00+00:00 VM_DEV02 sshd[25690]: pam_unix(sshd:session): session closed for user USER7
Main tasks are:
Filter only those lines having-['unauthorized','error','kernel error','OS error','rejected','warning',"error"] these words .
Split the lines into different parts and store this data in a Dataframe using Apache Beam
I have tried this way by writing and calling Functions but it is not working as expected.
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.pvalue import AsList
from apache_beam.transforms import Map, Filter
import pandas as pd
def extract_fields(line):
timestamp = line.split(" ")[0]
print(timestamp)
hostname = line.split(" ")[1]
print(hostname)
process_name = line.split(" ")[2].split("[")[0]
print(process_name)
pid = line.split(" ")[2].split("[")[1].split("]")[0]
print(pid)
text = " ".join(line.split(" ")[3:])
print(text)
return [timestamp, hostname, process_name, pid, text]
with beam.Pipeline() as pipeline:
log_lines = pipeline | 'Read log line' >> beam.io.ReadFromText("/Analytics/venv/Jup/CAPE_Apache_Beam/Sample_text_file")
print(log_lines)
fields = log_lines | 'Extract fields' >> Map(extract_fields)
print(fields)
df = (fields | 'Write to dataframe' >> Map(lambda fields: pd.DataFrame(fields, columns=['timestamp', 'hostname', 'process_name', 'pid', 'text'])))
print(df)