Compare commits

...

2 commits

Author SHA1 Message Date
violette
14c72c3f7a Q4. TCP/IP done 2024-03-26 22:17:00 -04:00
violette
dd474b686a added trips instead of stop list 2024-03-26 11:48:47 -04:00
7 changed files with 423 additions and 170 deletions

5
README.md Normal file
View file

@ -0,0 +1,5 @@
# PROJET IFT630 STS 2
Ce projet utilise maven.
Un test est utilisé pour voir le projet fonctionner.
Pour l'executer, vous pouvez faire `mvn test` dans la racine.
Les questions sont répondues avec les commits correspondant.

View file

@ -1,125 +1,46 @@
package usherbrooke.ift630; package usherbrooke.ift630;
import java.lang.Long;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
public class App { public class App {
static private int numBusses = 5; static private int numLines = 3;
static private int numBusPerLine = 2;
static private int numBusses = numLines * numBusPerLine;
static private int numStopPerBus = 8; static private int numStopPerBus = 8;
static private int numStops = numBusses * numStopPerBus; static private int numStops = numBusses * numStopPerBus;
static private int numPassengersPerBus = 10; static private int numPassengersPerBus = 5;
static private int numPassengersPerStop = 3; static private int numPassengersPerStop = 5;
static private int numPassengers = numPassengersPerStop * numStops; //static private int numPassengers = numPassengersPerStop * numStops;
static private int numThreadsBus = 5; static private int numThreadsBus = 5;
static private int numThreadsStop = 2; static private int numThreadsStop = 2;
static private int timeBetweenStops = 5; static private int timeBetweenStops = 5;
static private int timeEmbark = 2; static private int timeEmbark = 2;
static private int socketPort = 8181;
public static void main(String[] args) { public static void main(String[] args) {
ExecutorService threadsBus = Executors.newFixedThreadPool(numThreadsBus); try {
ExecutorService threadsStop = Executors.newFixedThreadPool(numThreadsStop);
ExecutorService threadsPassenger = Executors.newFixedThreadPool(1); // doesnt need more! future balance
// themselfes out.
ArrayList<Stop> stops = new ArrayList<Stop>();
ArrayList<Bus> busses = new ArrayList<Bus>();
ArrayList<Passenger> passengers = new ArrayList<Passenger>();
BlockingQueue<BusInformationMessage> blockingQueue = new LinkedBlockingQueue<BusInformationMessage>();
// make stop" + name + " reached stop " + nextStop.getStopName() + "!"s
for (int k = 0; k < numStops; k++) {
Stop s = new Stop(k, numPassengersPerStop);
s.setBlockingQueue(blockingQueue);
stops.add(s);
s.setThreadPool(threadsStop);
threadsStop.submit(s);
}
// make busses
for (int k = 0; k < numBusses; k++) {
if (numStopPerBus > numStops) { if (numStopPerBus > numStops) {
System.out.println("More stops per bus than stops."); System.out.println("More stops per bus than stops.");
return; return;
} }
ArrayList<Stop> s_list = new ArrayList<Stop>(); Central central = new Central(numThreadsBus, numThreadsStop, socketPort);
Stop s; // create everything
// add stop. Make a bus doesnt stop multiple time at the same stop central.createStops(numStops, numPassengersPerStop);
while (s_list.size() < numStopPerBus) { central.createLines(numLines, numStopPerBus, numStops);
do { central.createBusses(numBusses, timeBetweenStops, timeEmbark, numPassengersPerBus);
s = stops.get((int) (Math.random() * numStops)); central.createPassengers(numPassengersPerStop, numStopPerBus);
} while (s_list.contains(s));
s_list.add(s);
}
Bus b = new Bus(s_list, k, timeBetweenStops, timeEmbark, numPassengersPerBus);
b.setBlockingQueue(blockingQueue);
busses.add(b);
}
// make passenger
int idPassenger = 0;
for (Bus b : busses) {
for (Stop start : b.getStops()) {
// make sure passenger can leave.
if (start == b.getTerminus())
continue;
int idx = b.getStops().indexOf(start);
if (idx == -1 || start == b.getTerminus())
continue;
for (int k = 0; k < (int) (Math.random() * numPassengersPerStop); k++) {
Stop dest = null;
dest = b.getStops().get(idx + 1 + (int) Math.round(Math.random() * (numStopPerBus - 2 - idx)));
Passenger p = new Passenger(idPassenger, start, dest);
p.setThreadPool(threadsPassenger);
passengers.add(p);
start.addPassenger(p);
idPassenger++;
threadsPassenger.submit(p);
}
}
}
// pretty print!
for (Bus b : busses) {
b.printDetails();
}
// print start details
System.out.println("-----".repeat(5) + " START " + "-----".repeat(5));
central.printDetails();
System.out.println("-----".repeat(5) + " START " + "-----".repeat(5)); System.out.println("-----".repeat(5) + " START " + "-----".repeat(5));
// start bus thread // start bus thread
// (here so init logs are clean) central.startBusThreads();
for (Bus b : busses) {
threadsBus.submit(b);
}
// block bus pool // shutdown and wait!
threadsBus.shutdown(); central.shutdown();
try { } catch (Exception e) {
threadsBus.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); System.out.println("[APP] " + e.toString());
} catch (InterruptedException e) {} }
// block stop and passenger pool once bus have finished
threadsStop.shutdown();
threadsPassenger.shutdown();
try {
threadsStop.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {}
try {
threadsPassenger.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {}
} }
} }

View file

@ -1,12 +1,16 @@
package usherbrooke.ift630; package usherbrooke.ift630;
import java.net.Socket;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.List;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
public class Bus extends Thread { public class Bus extends Thread {
private ArrayList<Passenger> passengers; private ArrayList<Passenger> passengers;
private ArrayList<Stop> stops; private Line line;
private String name; private String name;
private int id; private int id;
private int maxCapacity; private int maxCapacity;
@ -14,8 +18,12 @@ public class Bus extends Thread {
private int timeBetweenStops; private int timeBetweenStops;
private int timeToNextStop; private int timeToNextStop;
private int timeEmbark; private int timeEmbark;
private int nextStopIndex = 0;
private int socketPort;
private Stop nextStop; private Stop nextStop;
private BlockingQueue<BusInformationMessage> blockingQueue; private BlockingQueue<BusInformationMessage> blockingQueue;
private Socket socket;
private PrintWriter socketTx;
// returns time for a single passenger to embark. // returns time for a single passenger to embark.
private int timeEmbark() { private int timeEmbark() {
@ -24,7 +32,7 @@ public class Bus extends Thread {
// set time between stop. // set time between stop.
private int calculateTimeBetweenStops() { private int calculateTimeBetweenStops() {
timeToNextStop = (int) ((timeBetweenStops / 3) + (Math.random() * 2 * timeBetweenStops / 3)); timeToNextStop = (int) ((timeBetweenStops / 3f) + (Math.random() * 2f * timeBetweenStops / 3f));
return timeToNextStop; return timeToNextStop;
} }
@ -34,7 +42,6 @@ public class Bus extends Thread {
for (Passenger p : passengers) { for (Passenger p : passengers) {
if (p.getDest() == nextStop) { if (p.getDest() == nextStop) {
res = true; res = true;
Logger.getInstance().print(id, "[BUS] " + name + " stop asked!");
break; break;
} }
} }
@ -43,11 +50,12 @@ public class Bus extends Thread {
} }
// travel to next stop. Waits the travel time. // travel to next stop. Waits the travel time.
private Stop goToNextStop() throws InterruptedException, IndexOutOfBoundsException { private Stop goToNextStop() throws InterruptedException, IndexOutOfBoundsException, IOException {
// if no reason to stop, skip current stop // if no reason to stop, skip current stop
boolean nextStopEmpty, overMaxCapacity; boolean nextStopEmpty, overMaxCapacity;
do { do {
nextStop = stops.remove(0); nextStopIndex += 1;
nextStop = line.get(nextStopIndex);
sendNextStopInfo(); sendNextStopInfo();
@ -56,8 +64,12 @@ public class Bus extends Thread {
overMaxCapacity = currentCapacity >= maxCapacity; overMaxCapacity = currentCapacity >= maxCapacity;
nextStopEmpty = getNextStopPassengers().isEmpty(); nextStopEmpty = getNextStopPassengers().isEmpty();
if (overMaxCapacity) if (stopAsked())
Logger.getInstance().print(id, "[BUS] " + name + " stop asked!");
if (overMaxCapacity) {
Logger.getInstance().print(id, "[BUS] " + name + " im full, sorry!"); Logger.getInstance().print(id, "[BUS] " + name + " im full, sorry!");
sendFulLMessage();
}
if (nextStopEmpty && !stopAsked()) if (nextStopEmpty && !stopAsked())
Logger.getInstance().print(id, "[BUS] " + name + " next " + nextStop.getStopName() + " is empty!"); Logger.getInstance().print(id, "[BUS] " + name + " next " + nextStop.getStopName() + " is empty!");
} while (!stopAsked() && (nextStopEmpty || overMaxCapacity)); } while (!stopAsked() && (nextStopEmpty || overMaxCapacity));
@ -79,28 +91,29 @@ public class Bus extends Thread {
ArrayList<Passenger> res = new ArrayList<Passenger>(); ArrayList<Passenger> res = new ArrayList<Passenger>();
if (nextStop != null) if (nextStop != null)
for (Passenger p : passengers) { res = nextStop.getPassengerByDest(line, nextStopIndex);
if (p.getDest() == nextStop) {
res.add(p);
}
}
return res; return res;
} }
// lock until passenger got in // lock until passenger got in
private void waitEmbark() throws InterruptedException { private void waitEmbark() throws InterruptedException {
synchronized (this) {
sleep(timeEmbark()); sleep(timeEmbark());
} }
}
// lock until passenger got out // lock until passenger got out
private void waitStop() throws InterruptedException { private void waitStop() throws InterruptedException {
synchronized (this) {
sleep(timeToNextStop); sleep(timeToNextStop);
} }
private void connectSocket() throws IOException {
socket = new Socket("localhost", socketPort);
socketTx = new PrintWriter(socket.getOutputStream(), true);
}
private void sendFulLMessage() throws IOException {
connectSocket();
socketTx.println("FULL:" + id);
socket.close();
} }
@Override @Override
@ -109,8 +122,8 @@ public class Bus extends Thread {
// run until terminus // run until terminus
while (nextStop != null) { while (nextStop != null) {
// new stop ! // new stop !
Logger.getInstance().print(id, "[BUS] " + name + " reached stop " + nextStop.getStopName() + "!");
Logger.getInstance().print(id, "[BUS] " + name + " reached stop " + nextStop.getStopName() + "!");
disembarkPassengers(); disembarkPassengers();
embarkPassengers(); embarkPassengers();
calculateTimeBetweenStops(); calculateTimeBetweenStops();
@ -127,14 +140,14 @@ public class Bus extends Thread {
Logger.getInstance().print(id, "[BUS] " + name + " exiting!"); Logger.getInstance().print(id, "[BUS] " + name + " exiting!");
} }
Bus(ArrayList<Stop> s, int id, int timeStop, int timeEmbark, int maxCapacity) { Bus(Line line, int id, int timeStop, int timeEmbark, int maxCapacity) {
this.timeBetweenStops = timeStop * 1000 + 1; this.timeBetweenStops = timeStop * 1000 + 1;
this.timeEmbark = timeEmbark * 1000 + 1; this.timeEmbark = timeEmbark * 1000 + 1;
this.name = "Bus n°" + id; this.name = "Bus n°" + id;
this.id = id; this.id = id;
this.passengers = new ArrayList<Passenger>(); this.passengers = new ArrayList<Passenger>();
this.stops = s; this.line = line;
this.nextStop = stops.get(0); this.nextStop = line.get(0);
this.currentCapacity = 0; this.currentCapacity = 0;
this.maxCapacity = maxCapacity; this.maxCapacity = maxCapacity;
} }
@ -144,10 +157,9 @@ public class Bus extends Thread {
// lock on next stop // lock on next stop
synchronized (nextStop) { synchronized (nextStop) {
for (Passenger p : passengers) { for (Passenger p : passengers) {
waitEmbark();
if (p.getDest() == nextStop) { if (p.getDest() == nextStop) {
synchronized (p) { synchronized (p) {
waitEmbark();
p.notify(); p.notify();
} }
// already synced on this, func is synced // already synced on this, func is synced
@ -159,14 +171,11 @@ public class Bus extends Thread {
// embark passenger at a stop. Synchronizes, locks on that stop, and waits. // embark passenger at a stop. Synchronizes, locks on that stop, and waits.
// check for overflow, and if passenger should embark in the first place // check for overflow, and if passenger should embark in the first place
public synchronized void embarkPassengers() throws InterruptedException { public synchronized void embarkPassengers() throws InterruptedException, IOException {
synchronized (nextStop) { synchronized (nextStop) {
try { try {
ArrayList<Passenger> list = nextStop.getPassengerByDest(stops); ArrayList<Passenger> list = nextStop.getPassengerByDest(line, nextStopIndex);
for (Passenger p : list) { for (Passenger p : list) {
synchronized (p) {
p.notify();
}
waitEmbark(); waitEmbark();
if (currentCapacity >= maxCapacity) if (currentCapacity >= maxCapacity)
continue; continue;
@ -189,9 +198,9 @@ public class Bus extends Thread {
Logger.getInstance().print(id, "\t".repeat(indent) + "current stop: " + nextStop.getStopName()); Logger.getInstance().print(id, "\t".repeat(indent) + "current stop: " + nextStop.getStopName());
Logger.getInstance().print(id, "\t".repeat(indent) + "stops: "); Logger.getInstance().print(id, "\t".repeat(indent) + "stops: ");
if (nextStop != stops.get(0)) if (nextStop != line.get(0))
nextStop.printDetails(id, indent + 1); nextStop.printDetails(id, indent + 1);
for (Stop s : stops) { for (Stop s : line.getStops()) {
s.printDetails(id, indent + 1); s.printDetails(id, indent + 1);
} }
} }
@ -201,25 +210,29 @@ public class Bus extends Thread {
printDetails(0); printDetails(0);
} }
// return stop list
public ArrayList<Stop> getStops() {
ArrayList<Stop> res = new ArrayList<Stop>(stops);
if (nextStop != stops.get(0))
res.add(0, nextStop);
return res;
}
// return last stop // return last stop
public Stop getTerminus() { public Stop getTerminus() {
return stops.get(stops.size() - 1); return line.getTerminus();
} }
public String getNameBus() { public String getNameBus() {
return name; return name;
} }
// still there since i first didnt understand what a trip should have been. See git logs.
public List<Stop> getStops() {
return line.getStops();
}
public void setBlockingQueue(BlockingQueue<BusInformationMessage> q) { public void setBlockingQueue(BlockingQueue<BusInformationMessage> q) {
blockingQueue = q; blockingQueue = q;
} }
public void setSocketPort(int port) {
socketPort = port;
}
public Line getLine() {
return line;
}
} }

View file

@ -0,0 +1,242 @@
package usherbrooke.ift630;
import java.io.IOException;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.Socket;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.Executors;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
class Central {
private ExecutorService threadsBus;
private ExecutorService threadsSpareBusses;
private ExecutorService threadsStop;
private ExecutorService threadsPassenger;
private ExecutorService threadsSocket;
private ArrayList<Stop> stops;
private ArrayList<Bus> busses;
private ArrayList<Line> lines;
private ArrayList<Passenger> passengers;
private BlockingQueue<BusInformationMessage> blockingQueue;
private ServerSocket socket;
private Future<Void> socketFuture;
private int socketPort;
private boolean socketStop = false;
private int timeEmbark;
private int timeBetweenStops;
private int numPassengersPerBus;
// idk what to do with those
private ArrayList<Integer> spacePerStop;
private ArrayList<Integer> spacePerLine;
private void sendBus(String msg) {
Bus reqB = busses.get(Integer.parseInt(msg));
Line l = reqB.getLine();
// if >= 2 bus are full on that line, reset counter and add a bus
if (spacePerLine.get(l.getId()) >= 2) {
Bus b = new Bus(l, busses.size(), timeBetweenStops, timeEmbark, numPassengersPerBus);
b.setSocketPort(socketPort);
b.setBlockingQueue(blockingQueue);
busses.add(b);
threadsSpareBusses.submit(b);
Logger.getInstance().print(0, "\t[CENTRAL] Sent a new bus on line " + l.getName());
}
}
// should be used to see what lines need more bus and act accordingly.
// This is hard to do w/ our implementation, since we cannot see what passenger
// will embark on what line trivialy.
// Therefore, when a bus is full, a new one will go get remaining passengers.
private void refreshStops(String stop, String number) {
int s = Integer.parseInt(stop);
int n = Integer.parseInt(number);
int l = 0;
for (int k = 0 ; k < lines.size() ; k ++) {
if (lines.get(k).getStops().contains(stops.get(s))) {
l = k;
}
}
spacePerStop.set(s, n);
spacePerLine.set(l, spacePerLine.get(l) + 1);
Logger.getInstance().print(0, "\t[CENTRAL] refreshed stop value for " + stops.get(s).getStopName());
}
// run TCP socket until an exception occurs
private void runSocket() {
if (socketFuture == null)
// use a thread, so we dont block here
threadsSocket.submit(() -> {
try {
while (!socketStop) {
// blocks until client connects
String txt;
Socket s = socket.accept();
BufferedReader rx = new BufferedReader(new InputStreamReader(s.getInputStream()));
txt = rx.readLine();
// split on ':'
// 0 => who
// 1 => id
// 2 => arg if necessary
String[] msg = txt.split(":");
if (msg[0].startsWith("FULL"))
sendBus(msg[1]);
else if (msg[0].startsWith("STOP")) {
refreshStops(msg[1], msg[2]);
}
// loop back to accept
s.close();
}
} catch (Exception e) {Logger.getInstance().print(1, "[CENTRAL] exc " + e.getMessage()); e.printStackTrace();}
});
}
Central (int numThreadsBus, int numThreadsStop, int socketPort) throws IOException{
threadsBus = Executors.newFixedThreadPool(numThreadsBus);
threadsSpareBusses = Executors.newFixedThreadPool(numThreadsBus);
threadsStop = Executors.newFixedThreadPool(numThreadsStop);
threadsPassenger = Executors.newSingleThreadExecutor(); // doesnt need more! future balances themselves.
threadsSocket = Executors.newSingleThreadExecutor();
stops = new ArrayList<Stop>();
busses = new ArrayList<Bus>();
lines = new ArrayList<Line>();
passengers = new ArrayList<Passenger>();
spacePerStop = new ArrayList<Integer>();
spacePerLine = new ArrayList<Integer>();
blockingQueue = new LinkedBlockingQueue<BusInformationMessage>();
this.socketPort = socketPort;
socket = new ServerSocket(socketPort);
runSocket();
}
// init stops
public void createStops(int numStops, int numPassengersPerStop) {
for (int k = 0; k < numStops; k++) {
Stop s = new Stop(k, numPassengersPerStop);
s.setBlockingQueue(blockingQueue);
s.setSocketPort(socketPort);
stops.add(s);
s.setThreadPool(threadsStop);
threadsStop.submit(s);
}
for (int k = 0 ; k < stops.size() ; k++)
spacePerStop.add(k, 0);
}
// init lines (need stops to be init before)
public void createLines(int numLines, int numStopPerBus, int numStops) throws Exception {
for (int k = 0 ; k < numLines ; k++) {
Stop s;
if (numStopPerBus > stops.size())
throw new Exception("not enough stops");
ArrayList<Stop> s_list = new ArrayList<Stop>();
while (s_list.size() < numStopPerBus) {
do {
s = stops.get((int) (Math.random() * numStops));
} while (s_list.contains(s));
s_list.add(s);
}
Line l = new Line(k, s_list);
lines.add(l);
}
for (int k = 0 ; k < lines.size() ; k++)
spacePerLine.add(k, 0);
}
// init busses
public void createBusses(int numBusses, int timeBetweenStops, int timeEmbark, int numPassengersPerBus) throws IOException{
this.numPassengersPerBus = numPassengersPerBus;
this.timeBetweenStops = timeBetweenStops;
for (int k = 0; k < numBusses; k++) {
Line l = lines.get(k % lines.size());
Bus b = new Bus(l, k, timeBetweenStops, timeEmbark, numPassengersPerBus);
b.setSocketPort(socketPort);
b.setBlockingQueue(blockingQueue);
busses.add(b);
}
}
// init pasengers
public void createPassengers(int numPassengersPerStop, int numStopPerBus) {
// make passenger
int idPassenger = 0;
for (Bus b : busses) {
for (Stop start : b.getStops()) {
// if not found or is terminus, dont use
int idx = b.getStops().indexOf(start);
if (idx == -1 || start == b.getTerminus())
continue;
// rand number of passenger per stop
for (int k = 0; k < (int) (Math.random() * numPassengersPerStop); k++) {
Stop dest = null;
// dest is random from list of stops of this line
dest = b.getStops().get(idx + 1 + (int) Math.round(Math.random() * (numStopPerBus - 2 - idx)));
// create them!
Passenger p = new Passenger(idPassenger, start, dest);
passengers.add(p);
start.addPassenger(p);
idPassenger++;
threadsPassenger.submit(p);
}
}
}
}
// pretty print!
public void printDetails() {
for (Bus b : busses) {
b.printDetails();
}
}
// start bus thread
public void startBusThreads() {
for (Bus b : busses)
threadsBus.submit(b);
}
// shutdowns all threand & return.
public void shutdown() {
// block stop and passenger pool once bus have finished
try {
threadsBus.shutdown();
threadsBus.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
threadsSpareBusses.shutdown();
threadsSpareBusses.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
threadsStop.shutdown();
threadsStop.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
threadsPassenger.shutdown();
threadsPassenger.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
socketStop = true;
//threadsSocket.shutdown();
//threadsSocket.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);
} catch (InterruptedException e) {}
}
}

View file

@ -0,0 +1,52 @@
package usherbrooke.ift630;
import java.util.List;
import java.util.ArrayList;
class Line {
private List<Stop> stops;
private String name;
private int id;
Line(int id, ArrayList<Stop> stops) {
this.id = id;
this.name = "Line " + this.id;
this.stops = stops;
}
// get stops from index
public List<Stop> getStops(int index) {
return stops.subList(index, stops.size() - 1);
}
public List<Stop> getStops() {
return stops;
}
public String getName() {
return name;
}
public void printDetails() {
printDetails(0);
}
public void printDetails(int indent) {
Logger.getInstance().print(id, "\t.".repeat(indent) + "[TRIP] " + name);
for (Stop s : stops) {
s.printDetails(id, indent + 1);
}
}
public Stop get(int idx) {
return stops.get(idx);
}
public int getId() {
return id;
}
public Stop getTerminus() {
return stops.get(stops.size() - 1);
}
}

View file

@ -10,16 +10,14 @@ public class Passenger extends Thread {
private int id; private int id;
private Stop dest; private Stop dest;
private Stop start; private Stop start;
private ExecutorService threads;
private ExecutorService ex; private ExecutorService ex;
private Future<Void> future;
Passenger(int id, Stop start, Stop dest) { Passenger(int id, Stop start, Stop dest) {
this.id = id; this.id = id;
this.name = "Passenger " + id; this.name = "Passenger " + id;
this.dest = dest; this.dest = dest;
this.start = start; this.start = start;
this.ex = Executors.newFixedThreadPool(2); // one for each future. this.ex = Executors.newFixedThreadPool(1); // one for each future.
} }
public void run() { public void run() {
@ -32,23 +30,23 @@ public class Passenger extends Thread {
} }
// embark people once thread notified // embark people once thread notified
private synchronized Future<Void> embark() { private Future<Void> embark() {
return ex.submit(() -> { return ex.submit(() -> {
synchronized (this) { synchronized (this) {
wait(); wait();
} }
Logger.getInstance().print(id, "\t[PASSENGER] I'm embarking at " + start.getStopName()); Logger.getInstance().print(id, "\t[PASSENGER] " + name + " embarking at " + start.getStopName());
return null; return null;
}); });
} }
// disembark people once thread notified another time. // disembark people once thread notified another time.
private synchronized Future<Void> disembark() { private Future<Void> disembark() {
return ex.submit(() -> { return ex.submit(() -> {
synchronized (this) { synchronized (this) {
wait(); wait();
} }
Logger.getInstance().print(id, "\t[PASSENGER] I'm leaving at " + dest.getStopName()); Logger.getInstance().print(id, "\t[PASSENGER] " + name + " leaving at " + dest.getStopName());
return null; return null;
}); });
} }
@ -84,8 +82,4 @@ public class Passenger extends Thread {
public void printDetails() { public void printDetails() {
printDetails(0); printDetails(0);
} }
public void setThreadPool(ExecutorService threads) {
this.threads = threads;
}
} }

View file

@ -1,6 +1,9 @@
package usherbrooke.ift630; package usherbrooke.ift630;
import java.util.concurrent.Executors; import java.net.Socket;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.ArrayList; import java.util.ArrayList;
@ -10,9 +13,27 @@ public class Stop extends Thread {
private int id; private int id;
private int maxCapacity; private int maxCapacity;
private int indent = 0; private int indent = 0;
private int socketPort;
private ArrayList<Passenger> passengers; private ArrayList<Passenger> passengers;
private ExecutorService threads; private ExecutorService threads;
private BlockingQueue<BusInformationMessage> blockingQueue; private BlockingQueue<BusInformationMessage> blockingQueue;
private Socket socket;
private PrintWriter socketTx;
private void connectSocket() throws IOException {
socket = new Socket("localhost", socketPort);
socketTx = new PrintWriter(socket.getOutputStream(), true);
}
//
private void sendCurrentCapacity() throws IOException {
connectSocket();
// prone to over estimation. Some passenger will take other lines.
// One again, i did not understand lines before last question. see git logs.
socketTx.println("STOP:" + id + ":" + passengers.size());
socket.close();
}
// print bus info times. // print bus info times.
private void displayInfos(int indent) throws InterruptedException, ExitException { private void displayInfos(int indent) throws InterruptedException, ExitException {
@ -30,8 +51,7 @@ public class Stop extends Thread {
Logger.getInstance().print(id, Logger.getInstance().print(id,
"\t".repeat(indent) + "[STOP] " + info.getBus(this).getNameBus() + " arrives in " + time + "s"); "\t".repeat(indent) + "[STOP] " + info.getBus(this).getNameBus() + " arrives in " + time + "s");
// else, catch // else, catch
} catch (NullPointerException e) { } catch (NullPointerException e) {}
}
} catch (UnauthorizedException e) { } catch (UnauthorizedException e) {
// if unauth, requeue message. // if unauth, requeue message.
@ -56,10 +76,11 @@ public class Stop extends Thread {
@Override @Override
public void run() { public void run() {
try { try {
for (;;) { for (;;)
displayInfos(indent); displayInfos(indent);
}
} catch (InterruptedException | ExitException e) { } catch (InterruptedException | ExitException e) {
if (e.getClass() != ExitException.class)
Logger.getInstance().print(id, e.toString());
} }
} }
@ -75,10 +96,10 @@ public class Stop extends Thread {
} }
// return all passenger that stops at a stop in the list // return all passenger that stops at a stop in the list
public ArrayList<Passenger> getPassengerByDest(ArrayList<Stop> list) { public ArrayList<Passenger> getPassengerByDest(Line l, int indexStart) {
ArrayList<Passenger> res = new ArrayList<Passenger>(); ArrayList<Passenger> res = new ArrayList<Passenger>();
// for all given stop // for all given stop
for (Stop s : list) { for (Stop s : l.getStops(indexStart)) {
// for all passenger at this stop // for all passenger at this stop
for (Passenger p : passengers) { for (Passenger p : passengers) {
// if they stop at this stop, add to res // if they stop at this stop, add to res
@ -125,13 +146,14 @@ public class Stop extends Thread {
printDetails(0); printDetails(0);
} }
public synchronized void removePassenger(Passenger p) { public void removePassenger(Passenger p) throws IOException {
try { if(passengers.remove(p)) {
passengers.remove(p); synchronized (p) {
} catch (Exception e) { p.notify();
System.out.println("exception" + e.getMessage());
} }
} }
sendCurrentCapacity();
}
public void setIndent(int indent) { public void setIndent(int indent) {
this.indent = indent; this.indent = indent;
@ -144,4 +166,8 @@ public class Stop extends Thread {
public void setThreadPool(ExecutorService threads) { public void setThreadPool(ExecutorService threads) {
this.threads = threads; this.threads = threads;
} }
public void setSocketPort(int port) {
socketPort = port;
}
} }