I am stuck here, can someone explain why the consumer thread is running prior producer thread in the below code. How can consumer thread run when the producer has not put any content. Is the program wrong?
Achieve:- run produce consumer thread for each file that is picked up from the given folder.
For instance if the specified folder has 3 then 2 thread (producer/consumer) per file must be initiated, which makes the thread count 6.
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
class sharedInt {
private int syncUponInt;
private boolean available = false;
private File processingFile;
private static File[] listOfFile;
sharedInt(File[] totalList) {
listOfFile = totalList;
}
public int getTotalCount() {
return listOfFile.length;
}
public static File[] getListOfFile() {
return listOfFile;
}
public static void setListOfFile(File[] listOfFile) {
sharedInt.listOfFile = listOfFile;
}
public File getProcessingFile() {
return processingFile;
}
public void setProcessingFile(File processingFile) {
this.processingFile = processingFile;
}
public synchronized int getContents() {
while (available == false) {
try {
wait();
} catch (InterruptedException e) {
}
}
available = false;
notify();
return syncUponInt;
}
public synchronized void setContents(int value) {
while (available == true) {
try {
wait();
} catch (InterruptedException e) {
}
}
syncUponInt = value;
available = true;
notify();
}
}
class Producer1 extends Thread {
private sharedInt cubbyhole;
private int number;
public Producer1(sharedInt c, int number) {
cubbyhole = c;
this.number = number;
}
public void run() {
// for (int i = 0; i < cubbyhole.getTotalCount(); i++) {
cubbyhole.setContents(this.number);
Vector vectorList = new Vector();
System.out.println("Producer <current thread>" + this.currentThread() + "put: " + this.number
+ "processing file is" + cubbyhole.getProcessingFile());
RandomAccessFile raf = null;
try {
raf = new RandomAccessFile(cubbyhole.getProcessingFile(), "r");
StringBuffer sb = new StringBuffer();
String line = null;
while ((line = raf.readLine()) != null) {
sb.append(line);
}
vectorList.add(sb.toString());
System.out.println(sb.toString());
} catch (FileNotFoundException e) {
} catch (IOException e) {
}
// }
}
}
class Consumer1 extends Thread {
private sharedInt cubbyhole;
public Consumer1(sharedInt c) {
cubbyhole = c;
}
public void run() {
int value = 0;
// for (int i = 0; i < cubbyhole.getTotalCount(); i++) {
System.out.println("Consumer <current thread>" + this.currentThread() + "got: " + cubbyhole.getContents()
+ "processing file is" + cubbyhole.getProcessingFile());
}
}
public class FileManagementApp {
public static void main(String[] args) throws InterruptedException {
System.out.println("1. Please enter the path of the <Directory/Folder>...");
// Scanner scn = new Scanner(System.in);
// String folderPath = scn.nextLine();
File folder = new File("C:\\file\\output");
File[] fileList = folder.listFiles();
int countOfFiles = fileList.length;
sharedInt c = new sharedInt(fileList);
Producer1 p1 = null;
List<Producer1> pList = new ArrayList<Producer1>();
Consumer1 c1 = null;
List<Consumer1> cList = new ArrayList<Consumer1>();
for (int i = 0; i < countOfFiles; i++) {
c = new sharedInt(fileList);
c.setProcessingFile(fileList[i]);
p1 = new Producer1(c, i);
p1.setName("Producer--" + i);
pList.add(p1);
c1 = new Consumer1(c);
c1.setName("Consumer--" + i);
cList.add(c1);
pList.get(i).start();
cList.get(i).start();
}
}
}
Output:-
1. Please enter the path of the <Directory/Folder>...
Consumer <current thread>Thread[Consumer--0,5,main]got: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--0,5,main]put: 0processing file isC:\file\output\0.A.txt
Producer <current thread>Thread[Producer--1,5,main]put: 1processing file isC:\file\output\1.A.txt
Producer <current thread>Thread[Producer--2,5,main]put: 2processing file isC:\file\output\2.A.txt
Consumer <current thread>Thread[Consumer--1,5,main]got: 1processing file isC:\file\output\1.A.txt
fg
abc
Consumer <current thread>Thread[Consumer--2,5,main]got: 2processing file isC:\file\output\2.A.txt
de
EDIT:-
Modified the code to something like this and was able to achieve the concurrency /multi threading to read and write files simultaneously using producer consumer model.
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Vector;
class SharedInteger {
private boolean available = false;
public File processingFile;
public long totalNoOfSplits;
public Vector<Byte> vectorBytes;
private File[] listOfFiles;
SharedInteger(File[] totalList) {
listOfFiles = totalList;
}
public synchronized Vector<Byte> get() {
while (available == false) {
try {
wait();
} catch (InterruptedException e) {
}
}
available = false;
notify();
return vectorBytes;
}
public synchronized void put(Vector<Byte> value) {
while (available == true) {
try {
wait();
} catch (InterruptedException e) {
}
}
vectorBytes = value;
available = true;
notify();
}
}
class Producer extends Thread {
private SharedInteger sharedInteger;
public Producer(SharedInteger c) {
sharedInteger = c;
}
public void run() {
FileInputStream fis = null;
Vector<Byte> vectorBytes = new Vector<Byte>();
try {
fis = new FileInputStream(sharedInteger.processingFile);
while (fis.available() != 0) {
vectorBytes.add((byte) fis.read());
}
sharedInteger.put(vectorBytes);
} catch (Exception e) {
}
}
}
class Consumer extends Thread {
private SharedInteger sharedInteger;
private FileOutputStream fos;
public Consumer(SharedInteger c) {
sharedInteger = c;
}
public void run() {
File newFile = sharedInteger.processingFile;
try {
fos = new FileOutputStream(newFile.getParentFile()+"1\\"+newFile.getName());
} catch (FileNotFoundException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
Vector<Byte> v = sharedInteger.get();
try {
if (null != v) {
writeToAFile(v);
}
} catch (IOException e) { // TODO Auto-generated catch block
e.printStackTrace();
}
}
public void writeToAFile(Vector<Byte> contents) throws IOException {
for (int i = 0; i < contents.size(); i++) {
System.out.println(Thread.currentThread());
fos.write(contents.get(i));
fos.flush();
}
}
}
public class ProducerConsumerTest {
public static void main(String[] args) throws FileNotFoundException {
File folder = new File("C:\\file\\output");
File[] fileList = folder.listFiles();
int countOfFiles = fileList.length;
SharedInteger c = new SharedInteger(fileList);
List<Producer> pList = new ArrayList<Producer>();
List<Consumer> cList = new ArrayList<Consumer>();
Producer p1 = null;
Consumer c1 = null;
for (int i = 0; i < countOfFiles; i++) {
c = new SharedInteger(fileList);
c.processingFile = fileList[i];
p1 = new Producer(c);
p1.setName("Producer--" + i);
pList.add(p1);
pList.get(i).start();
c1 = new Consumer(c);
c1.setName("Consumer--" + i);
cList.add(c1);
cList.get(i).start();
}
}
}