Java Multithreading - Complete Tutorial
Master Java Multithreading: Learn Thread, Runnable, Executors, ThreadPool, Synchronization, Locks, Concurrent Collections, CompletableFuture, and best practices for high-performance applications.
Thread Creation
Thread & Runnable
Synchronization
Locks & Monitors
Executor Framework
ThreadPool & Futures
Concurrent Collections
Thread-safe Data Structures
1. Introduction to Java Multithreading
Multithreading in Java is a concurrent execution of two or more threads that can run concurrently to maximize CPU utilization. Each thread runs in its own stack but shares the heap memory with other threads.
Why Use Multithreading?
- Better CPU utilization: Multiple cores can work simultaneously
- Responsive applications: UI remains responsive while background tasks run
- Faster execution: Parallel processing of independent tasks
- Asynchronous operations: Non-blocking I/O operations
- Resource sharing: Threads share memory, reducing overhead
- Real-time systems: Critical for time-sensitive applications
Thread vs Process
- Process: Independent execution unit with separate memory
- Thread: Lightweight unit within a process, shares memory
- Creation: Threads are faster to create than processes
- Communication: Threads communicate via shared memory
- Context Switching: Thread switching is faster than process switching
- Resource Usage: Threads use fewer resources than processes
Key Concepts in Multithreading
Concurrency: Multiple tasks making progress simultaneously
Parallelism: Multiple tasks executing at the same time on multiple cores
Synchronization: Coordinating thread access to shared resources
Atomicity: Operations that complete without interruption
Thread Execution Model
Main Thread
Create Threads
Thread Pool
CPU Core 1
Task A
CPU Core 2
Task B
CPU Core 3
Task C
CPU Core 4
Task D
2. Creating Threads - Thread and Runnable
Java provides two main ways to create threads: extending Thread class and implementing Runnable interface. Since Java 8, lambda expressions provide a more concise way to create threads.
public class ThreadCreationExample {
public static void main(String[] args) {
System.out.println("Main thread started: " + Thread.currentThread().getName());
// Method 1: Extending Thread class
System.out.println("\n=== Method 1: Extending Thread Class ===");
MyThread thread1 = new MyThread();
thread1.start(); // Start the thread
// Method 2: Implementing Runnable interface
System.out.println("\n=== Method 2: Implementing Runnable ===");
Thread thread2 = new Thread(new MyRunnable());
thread2.start();
// Method 3: Anonymous class
System.out.println("\n=== Method 3: Anonymous Class ===");
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
for(int i = 1; i <= 3; i++) {
System.out.println("Anonymous Thread: Count " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
});
thread3.start();
// Method 4: Lambda expression (Java 8+)
System.out.println("\n=== Method 4: Lambda Expression ===");
Thread thread4 = new Thread(() -> {
for(int i = 1; i <= 3; i++) {
System.out.println("Lambda Thread: Count " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
thread4.start();
// Method 5: Method reference
System.out.println("\n=== Method 5: Method Reference ===");
Thread thread5 = new Thread(ThreadCreationExample::printNumbers);
thread5.start();
// Method 6: Thread with name and priority
System.out.println("\n=== Method 6: Named Thread with Priority ===");
Thread thread6 = new Thread(() -> {
System.out.println("Thread Name: " + Thread.currentThread().getName());
System.out.println("Thread Priority: " + Thread.currentThread().getPriority());
System.out.println("Thread ID: " + Thread.currentThread().getId());
}, "Worker-Thread");
thread6.setPriority(Thread.MAX_PRIORITY); // Priority 10
thread6.start();
// Method 7: Daemon thread
System.out.println("\n=== Method 7: Daemon Thread ===");
Thread daemonThread = new Thread(() -> {
while(true) {
System.out.println("Daemon thread running...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
break;
}
}
});
daemonThread.setDaemon(true); // Set as daemon thread
daemonThread.start();
// Wait for all threads to complete
try {
thread1.join();
thread2.join();
thread3.join();
thread4.join();
thread5.join();
thread6.join();
// Daemon thread will terminate when main thread ends
Thread.sleep(3000); // Give daemon thread time to run
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("\nMain thread completed.");
}
private static void printNumbers() {
for(int i = 1; i <= 3; i++) {
System.out.println("Method Reference Thread: Count " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// Method 1: Extending Thread class
class MyThread extends Thread {
@Override
public void run() {
for(int i = 1; i <= 3; i++) {
System.out.println("Extended Thread: Count " + i);
try {
Thread.sleep(500); // Pause for 500ms
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
// Method 2: Implementing Runnable interface
class MyRunnable implements Runnable {
@Override
public void run() {
for(int i = 1; i <= 3; i++) {
System.out.println("Runnable Thread: Count " + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
public class ThreadMethodsExample {
public static void main(String[] args) {
System.out.println("=== Thread Methods Demonstration ===\n");
// Thread information
Thread mainThread = Thread.currentThread();
System.out.println("Current Thread: " + mainThread.getName());
System.out.println("Thread ID: " + mainThread.getId());
System.out.println("Priority: " + mainThread.getPriority());
System.out.println("State: " + mainThread.getState());
System.out.println("Is Alive: " + mainThread.isAlive());
System.out.println("Is Daemon: " + mainThread.isDaemon());
System.out.println("Is Interrupted: " + mainThread.isInterrupted());
// Thread sleep example
System.out.println("\n=== Thread.sleep() Example ===");
System.out.print("Countdown: ");
for(int i = 5; i > 0; i--) {
System.out.print(i + " ");
try {
Thread.sleep(1000); // Sleep for 1 second
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("Go!");
// Thread join example
System.out.println("\n=== Thread.join() Example ===");
Thread worker1 = new Thread(() -> {
System.out.println("Worker 1 started");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker 1 completed");
});
Thread worker2 = new Thread(() -> {
System.out.println("Worker 2 started");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Worker 2 completed");
});
worker1.start();
worker2.start();
try {
System.out.println("Main thread waiting for workers...");
worker1.join(); // Wait for worker1 to complete
worker2.join(); // Wait for worker2 to complete
System.out.println("All workers completed!");
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread interrupt example
System.out.println("\n=== Thread.interrupt() Example ===");
Thread infiniteWorker = new Thread(() -> {
try {
while(!Thread.currentThread().isInterrupted()) {
System.out.println("Working...");
Thread.sleep(500);
}
} catch (InterruptedException e) {
System.out.println("Thread interrupted!");
Thread.currentThread().interrupt(); // Restore interrupt status
}
});
infiniteWorker.start();
try {
Thread.sleep(2000); // Let worker run for 2 seconds
} catch (InterruptedException e) {
e.printStackTrace();
}
infiniteWorker.interrupt(); // Interrupt the thread
// Thread yield example
System.out.println("\n=== Thread.yield() Example ===");
Thread highPriority = new Thread(() -> {
for(int i = 0; i < 5; i++) {
System.out.println("High Priority: " + i);
Thread.yield(); // Hint to scheduler to give CPU to other threads
}
});
Thread lowPriority = new Thread(() -> {
for(int i = 0; i < 5; i++) {
System.out.println("Low Priority: " + i);
}
});
highPriority.setPriority(Thread.MAX_PRIORITY);
lowPriority.setPriority(Thread.MIN_PRIORITY);
highPriority.start();
lowPriority.start();
// Thread states demonstration
System.out.println("\n=== Thread States ===");
Thread stateDemo = new Thread(() -> {
System.out.println("Thread state inside run(): " +
Thread.currentThread().getState());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
System.out.println("Before start(): " + stateDemo.getState());
stateDemo.start();
System.out.println("After start(): " + stateDemo.getState());
try {
Thread.sleep(100);
System.out.println("During sleep(): " + stateDemo.getState());
stateDemo.join();
System.out.println("After completion: " + stateDemo.getState());
} catch (InterruptedException e) {
e.printStackTrace();
}
// Thread group example
System.out.println("\n=== Thread Group Example ===");
ThreadGroup group = new ThreadGroup("WorkerGroup");
Thread groupThread1 = new Thread(group, () -> {
System.out.println("Thread 1 in group: " +
Thread.currentThread().getThreadGroup().getName());
});
Thread groupThread2 = new Thread(group, () -> {
System.out.println("Thread 2 in group: " +
Thread.currentThread().getThreadGroup().getName());
});
groupThread1.start();
groupThread2.start();
System.out.println("Active threads in group: " + group.activeCount());
// Thread local example
System.out.println("\n=== ThreadLocal Example ===");
ThreadLocal threadLocal = ThreadLocal.withInitial(() -> 0);
Thread t1 = new Thread(() -> {
threadLocal.set(100);
System.out.println("Thread 1 value: " + threadLocal.get());
});
Thread t2 = new Thread(() -> {
threadLocal.set(200);
System.out.println("Thread 2 value: " + threadLocal.get());
});
t1.start();
t2.start();
try {
t1.join();
t2.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Main thread value: " + threadLocal.get());
}
}
| Method | Description | Usage | Thread vs Runnable |
|---|---|---|---|
start() |
Starts thread execution | Begins new call stack | Both |
run() |
Contains thread logic | Called by start() | Both (override/implement) |
sleep(long ms) |
Suspends thread for specified time | Timed waiting | Static method |
join() |
Waits for thread to die | Synchronization | Instance method |
interrupt() |
Interrupts thread | Thread termination | Instance method |
setPriority(int) |
Sets thread priority (1-10) | Thread scheduling hint | Instance method |
setDaemon(boolean) |
Marks as daemon thread | Background tasks | Before start() only |
Extending Thread
- Simple: Direct thread creation
- Full control: Override Thread methods
- Cannot extend: Another class (Java single inheritance)
- Tight coupling: Thread and task logic combined
- Use when: Need to override Thread methods
Implementing Runnable
- Flexible: Can extend another class
- Better design: Separation of concerns
- Reusable: Same Runnable with different Threads
- Thread pool: Required for executor framework
- Recommended: Modern Java development
3. Thread States and Lifecycle
A thread in Java can be in one of six states defined in the Thread.State enumeration. Understanding thread states is crucial for debugging and designing concurrent applications.
Thread Lifecycle Diagram
public class ThreadStatesExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Thread States Demonstration ===\n");
// State 1: NEW
System.out.println("=== 1. NEW State ===");
Thread thread = new Thread(() -> {
System.out.println("Thread execution started");
});
System.out.println("After creation: " + thread.getState());
// State 2: RUNNABLE
System.out.println("\n=== 2. RUNNABLE State ===");
thread.start();
Thread.sleep(10); // Small delay to ensure thread starts
System.out.println("After start(): " + thread.getState());
// State 3: TIMED_WAITING
System.out.println("\n=== 3. TIMED_WAITING State ===");
Thread sleepingThread = new Thread(() -> {
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
sleepingThread.start();
Thread.sleep(100); // Ensure thread enters sleep
System.out.println("During sleep(): " + sleepingThread.getState());
sleepingThread.join();
// State 4: WAITING (with wait())
System.out.println("\n=== 4. WAITING State (wait()) ===");
Object lock = new Object();
Thread waitingThread = new Thread(() -> {
synchronized(lock) {
try {
lock.wait(); // Releases lock and waits
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
waitingThread.start();
Thread.sleep(100);
System.out.println("During wait(): " + waitingThread.getState());
// Wake up waiting thread
synchronized(lock) {
lock.notify();
}
waitingThread.join();
// State 5: BLOCKED
System.out.println("\n=== 5. BLOCKED State ===");
Object resource = new Object();
Thread blocker = new Thread(() -> {
synchronized(resource) {
try {
System.out.println("Blocker acquired lock");
Thread.sleep(2000); // Hold lock for 2 seconds
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
Thread blocked = new Thread(() -> {
System.out.println("Blocked thread trying to acquire lock...");
synchronized(resource) {
System.out.println("Blocked thread acquired lock");
}
});
blocker.start();
Thread.sleep(100); // Ensure blocker gets lock first
blocked.start();
Thread.sleep(100); // Ensure blocked thread tries to acquire lock
System.out.println("Blocked thread state: " + blocked.getState());
blocker.join();
blocked.join();
// State 6: TERMINATED
System.out.println("\n=== 6. TERMINATED State ===");
Thread shortThread = new Thread(() -> {
System.out.println("Short-lived thread running");
});
shortThread.start();
shortThread.join();
System.out.println("After completion: " + shortThread.getState());
// Complete state machine demonstration
System.out.println("\n=== Complete State Transition ===");
Thread demoThread = new Thread(() -> {
System.out.println("1. In run(): " + Thread.currentThread().getState());
try {
synchronized(this) {
System.out.println("2. Before wait(): " + Thread.currentThread().getState());
wait(100); // Timed waiting
System.out.println("3. After wait(): " + Thread.currentThread().getState());
}
} catch (InterruptedException e) {
e.printStackTrace();
}
// Simulate blocking
synchronized(ThreadStatesExample.class) {
System.out.println("4. In synchronized block: " +
Thread.currentThread().getState());
}
System.out.println("5. Before termination: " +
Thread.currentThread().getState());
});
System.out.println("A. Before start(): " + demoThread.getState());
demoThread.start();
Thread.sleep(50);
System.out.println("B. After start(): " + demoThread.getState());
Thread.sleep(150);
System.out.println("C. After timed wait: " + demoThread.getState());
demoThread.join();
System.out.println("D. After termination: " + demoThread.getState());
// Thread state monitoring
System.out.println("\n=== Thread State Monitoring ===");
Thread monitoredThread = new Thread(() -> {
for(int i = 1; i <= 5; i++) {
System.out.println("Working... cycle " + i);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
break;
}
}
});
monitoredThread.start();
// Monitor thread states
while(monitoredThread.isAlive()) {
System.out.println("Thread state: " + monitoredThread.getState());
Thread.sleep(200);
}
System.out.println("Final state: " + monitoredThread.getState());
// Daemon thread states
System.out.println("\n=== Daemon Thread States ===");
Thread daemon = new Thread(() -> {
while(true) {
try {
Thread.sleep(1000);
System.out.println("Daemon alive...");
} catch (InterruptedException e) {
break;
}
}
});
daemon.setDaemon(true);
daemon.start();
System.out.println("Daemon state: " + daemon.getState());
Thread.sleep(2500);
System.out.println("Daemon still running as daemon: " + daemon.getState());
}
}
| State | Description | How to Enter | How to Exit |
|---|---|---|---|
| NEW | Thread created but not started | new Thread() |
start() |
| RUNNABLE | Ready to run, waiting for CPU | start() |
CPU scheduler picks thread |
| RUNNING | Executing on CPU | CPU scheduler selects | yield(), time slice ends, blocks |
| BLOCKED | Waiting for monitor lock | Waiting for synchronized block | Acquires lock |
| WAITING | Waiting indefinitely | wait(), join() without timeout |
notify(), notifyAll() |
| TIMED_WAITING | Waiting with timeout | sleep(), wait(timeout), join(timeout) |
Timeout expires, notification |
| TERMINATED | Thread completed execution | run() method completes |
Cannot exit (final state) |
4. Synchronization and Thread Safety
Synchronization in Java ensures that only one thread can access a shared resource at a time, preventing race conditions and ensuring data consistency.
public class SynchronizationExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Synchronization Examples ===\n");
// Problem: Race Condition without synchronization
System.out.println("=== 1. Race Condition Problem ===");
Counter unsafeCounter = new Counter();
Thread t1 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
unsafeCounter.increment();
}
});
Thread t2 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
unsafeCounter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("Unsafe counter value: " + unsafeCounter.getCount() +
" (Expected: 2000)");
// Solution 1: Synchronized method
System.out.println("\n=== 2. Synchronized Method ===");
Counter safeCounter = new Counter();
Thread t3 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
safeCounter.safeIncrement();
}
});
Thread t4 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
safeCounter.safeIncrement();
}
});
t3.start();
t4.start();
t3.join();
t4.join();
System.out.println("Safe counter (method): " + safeCounter.getCount() +
" (Expected: 2000)");
// Solution 2: Synchronized block
System.out.println("\n=== 3. Synchronized Block ===");
Counter blockCounter = new Counter();
Thread t5 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
synchronized(blockCounter) {
blockCounter.increment();
}
}
});
Thread t6 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
synchronized(blockCounter) {
blockCounter.increment();
}
}
});
t5.start();
t6.start();
t5.join();
t6.join();
System.out.println("Safe counter (block): " + blockCounter.getCount() +
" (Expected: 2000)");
// Static synchronization
System.out.println("\n=== 4. Static Synchronization ===");
Thread t7 = new Thread(() -> {
for(int i = 0; i < 500; i++) {
Counter.staticIncrement();
}
});
Thread t8 = new Thread(() -> {
for(int i = 0; i < 500; i++) {
Counter.staticIncrement();
}
});
t7.start();
t8.start();
t7.join();
t8.join();
System.out.println("Static counter: " + Counter.getStaticCount() +
" (Expected: 1000)");
// wait() and notify() example
System.out.println("\n=== 5. wait() and notify() ===");
Message message = new Message();
Thread waiter = new Thread(new Waiter(message), "Waiter");
Thread notifier = new Thread(new Notifier(message), "Notifier");
waiter.start();
Thread.sleep(1000); // Ensure waiter starts first
notifier.start();
waiter.join();
notifier.join();
// Producer-Consumer problem
System.out.println("\n=== 6. Producer-Consumer Problem ===");
Buffer buffer = new Buffer(5);
Thread producer = new Thread(new Producer(buffer), "Producer");
Thread consumer = new Thread(new Consumer(buffer), "Consumer");
producer.start();
consumer.start();
Thread.sleep(3000);
producer.interrupt();
consumer.interrupt();
producer.join();
consumer.join();
System.out.println("\nProducer-Consumer simulation completed.");
}
}
class Counter {
private int count = 0;
private static int staticCount = 0;
// Unsafe method (race condition)
public void increment() {
count++; // Not atomic: read → increment → write
}
// Synchronized method
public synchronized void safeIncrement() {
count++;
}
// Static synchronized method
public static synchronized void staticIncrement() {
staticCount++;
}
public int getCount() {
return count;
}
public static int getStaticCount() {
return staticCount;
}
}
// wait() and notify() example classes
class Message {
private String message;
private boolean empty = true;
public synchronized String read() {
while(empty) {
try {
wait(); // Wait until message is available
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = true;
notifyAll(); // Notify waiting threads
return message;
}
public synchronized void write(String message) {
while(!empty) {
try {
wait(); // Wait until message is consumed
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
empty = false;
this.message = message;
notifyAll(); // Notify waiting threads
}
}
class Waiter implements Runnable {
private Message message;
public Waiter(Message message) {
this.message = message;
}
@Override
public void run() {
System.out.println("Waiter waiting for message...");
String msg = message.read();
System.out.println("Waiter received: " + msg);
}
}
class Notifier implements Runnable {
private Message message;
public Notifier(Message message) {
this.message = message;
}
@Override
public void run() {
System.out.println("Notifier preparing message...");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
message.write("Hello from Notifier!");
System.out.println("Notifier sent message");
}
}
// Producer-Consumer classes
class Buffer {
private final int[] buffer;
private int count = 0;
private int index = 0;
private int out = 0;
public Buffer(int size) {
buffer = new int[size];
}
public synchronized void produce(int value) throws InterruptedException {
while(count == buffer.length) {
wait(); // Buffer full, wait
}
buffer[index] = value;
index = (index + 1) % buffer.length;
count++;
System.out.println("Produced: " + value + ", Count: " + count);
notifyAll(); // Notify consumers
}
public synchronized int consume() throws InterruptedException {
while(count == 0) {
wait(); // Buffer empty, wait
}
int value = buffer[out];
out = (out + 1) % buffer.length;
count--;
System.out.println("Consumed: " + value + ", Count: " + count);
notifyAll(); // Notify producers
return value;
}
}
class Producer implements Runnable {
private Buffer buffer;
public Producer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
int value = 0;
while(!Thread.currentThread().isInterrupted()) {
try {
buffer.produce(value++);
Thread.sleep((int)(Math.random() * 1000));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
class Consumer implements Runnable {
private Buffer buffer;
public Consumer(Buffer buffer) {
this.buffer = buffer;
}
@Override
public void run() {
while(!Thread.currentThread().isInterrupted()) {
try {
buffer.consume();
Thread.sleep((int)(Math.random() * 1500));
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
import java.util.concurrent.locks.*;
public class LocksExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Java Lock Framework Examples ===\n");
// 1. ReentrantLock
System.out.println("=== 1. ReentrantLock ===");
ReentrantLockCounter counter = new ReentrantLockCounter();
Thread t1 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
counter.increment();
}
});
Thread t2 = new Thread(() -> {
for(int i = 0; i < 1000; i++) {
counter.increment();
}
});
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("ReentrantLock counter: " + counter.getCount());
// 2. ReadWriteLock
System.out.println("\n=== 2. ReadWriteLock ===");
ReadWriteLockCache cache = new ReadWriteLockCache();
// Multiple readers
Thread[] readers = new Thread[5];
for(int i = 0; i < readers.length; i++) {
readers[i] = new Thread(() -> {
for(int j = 0; j < 3; j++) {
String data = cache.read("key");
System.out.println(Thread.currentThread().getName() +
" read: " + data);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Reader-" + i);
readers[i].start();
}
// Writer thread
Thread writer = new Thread(() -> {
for(int i = 1; i <= 3; i++) {
cache.write("key", "Value-" + i);
System.out.println("Writer wrote: Value-" + i);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Writer");
writer.start();
for(Thread reader : readers) {
reader.join();
}
writer.join();
// 3. StampedLock (Optimistic Locking)
System.out.println("\n=== 3. StampedLock ===");
StampedLockCounter stampedCounter = new StampedLockCounter();
Thread optimisticReader = new Thread(() -> {
for(int i = 0; i < 5; i++) {
stampedCounter.optimisticRead();
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread writer1 = new Thread(() -> {
for(int i = 0; i < 3; i++) {
stampedCounter.increment();
System.out.println("StampedLock writer incremented");
try {
Thread.sleep(400);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
optimisticReader.start();
writer1.start();
optimisticReader.join();
writer1.join();
// 4. Condition variables
System.out.println("\n=== 4. Condition Variables ===");
BoundedBuffer buffer = new BoundedBuffer(5);
Thread producer1 = new Thread(() -> {
for(int i = 1; i <= 10; i++) {
try {
buffer.put(i);
System.out.println("Produced: " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread consumer1 = new Thread(() -> {
for(int i = 1; i <= 10; i++) {
try {
int value = buffer.take();
System.out.println("Consumed: " + value);
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer1.start();
consumer1.start();
producer1.join();
consumer1.join();
// 5. TryLock with timeout
System.out.println("\n=== 5. TryLock with Timeout ===");
ReentrantLock lock = new ReentrantLock();
Thread tryLockThread = new Thread(() -> {
System.out.println("Thread trying to acquire lock...");
try {
if(lock.tryLock(2, TimeUnit.SECONDS)) {
try {
System.out.println("Lock acquired successfully");
Thread.sleep(1000);
} finally {
lock.unlock();
System.out.println("Lock released");
}
} else {
System.out.println("Could not acquire lock within timeout");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Hold lock in main thread
lock.lock();
try {
System.out.println("Main thread holding lock...");
tryLockThread.start();
Thread.sleep(1500); // Hold lock for 1.5 seconds
} finally {
lock.unlock();
System.out.println("Main thread released lock");
}
tryLockThread.join();
System.out.println("\nAll lock examples completed.");
}
}
class ReentrantLockCounter {
private int count = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock(); // Acquire lock
try {
count++;
} finally {
lock.unlock(); // Always release in finally block
}
}
public int getCount() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
}
class ReadWriteLockCache {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final java.util.Map cache = new java.util.HashMap<>();
public ReadWriteLockCache() {
cache.put("key", "Initial Value");
}
public String read(String key) {
lock.readLock().lock(); // Multiple readers can acquire
try {
return cache.get(key);
} finally {
lock.readLock().unlock();
}
}
public void write(String key, String value) {
lock.writeLock().lock(); // Exclusive lock for writers
try {
cache.put(key, value);
} finally {
lock.writeLock().unlock();
}
}
}
class StampedLockCounter {
private int count = 0;
private final StampedLock lock = new StampedLock();
public void increment() {
long stamp = lock.writeLock(); // Exclusive lock
try {
count++;
} finally {
lock.unlockWrite(stamp);
}
}
public void optimisticRead() {
long stamp = lock.tryOptimisticRead(); // Try optimistic read
int currentCount = count; // Read value
if(!lock.validate(stamp)) { // Check if data changed
stamp = lock.readLock(); // Fallback to read lock
try {
currentCount = count;
} finally {
lock.unlockRead(stamp);
}
}
System.out.println("Optimistic read: " + currentCount);
}
}
class BoundedBuffer {
private final int[] buffer;
private int count = 0;
private int putIndex = 0;
private int takeIndex = 0;
private final ReentrantLock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public BoundedBuffer(int capacity) {
buffer = new int[capacity];
}
public void put(int value) throws InterruptedException {
lock.lock();
try {
while(count == buffer.length) {
notFull.await(); // Wait until not full
}
buffer[putIndex] = value;
putIndex = (putIndex + 1) % buffer.length;
count++;
notEmpty.signal(); // Signal that buffer is not empty
} finally {
lock.unlock();
}
}
public int take() throws InterruptedException {
lock.lock();
try {
while(count == 0) {
notEmpty.await(); // Wait until not empty
}
int value = buffer[takeIndex];
takeIndex = (takeIndex + 1) % buffer.length;
count--;
notFull.signal(); // Signal that buffer is not full
return value;
} finally {
lock.unlock();
}
}
}
- Deadlock: Two or more threads waiting for each other forever
- Race Condition: Different results based on thread timing
- Starvation: Thread never gets CPU time
- Livelock: Threads keep changing state but make no progress
- Memory Consistency Errors: Threads see different values
Deadlock Visualization
Has: Resource 1
Needs: Resource 2
Has: Resource 2
Needs: Resource 1
5. Executor Framework and Thread Pools
The Executor Framework provides a high-level API for asynchronous task execution with thread pool management, eliminating the need to manually create and manage threads.
Executor Framework Architecture
Tasks
ExecutorService
Thread Pool
CachedThreadPool
FixedThreadPool
ScheduledThreadPool
Future Results
import java.util.concurrent.*;
import java.util.*;
public class ExecutorFrameworkExample {
public static void main(String[] args) throws Exception {
System.out.println("=== Executor Framework Examples ===\n");
// 1. SingleThreadExecutor
System.out.println("=== 1. SingleThreadExecutor ===");
ExecutorService singleExecutor = Executors.newSingleThreadExecutor();
for(int i = 1; i <= 5; i++) {
int taskId = i;
singleExecutor.submit(() -> {
System.out.println("Task " + taskId + " executed by " +
Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
singleExecutor.shutdown();
singleExecutor.awaitTermination(5, TimeUnit.SECONDS);
// 2. FixedThreadPool
System.out.println("\n=== 2. FixedThreadPool ===");
ExecutorService fixedExecutor = Executors.newFixedThreadPool(3);
List> futures = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
int taskId = i;
Future future = fixedExecutor.submit(() -> {
System.out.println("Task " + taskId + " started by " +
Thread.currentThread().getName());
Thread.sleep(1000);
return "Result of Task " + taskId;
});
futures.add(future);
}
// Get results from futures
for(Future future : futures) {
try {
System.out.println("Future result: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
fixedExecutor.shutdown();
fixedExecutor.awaitTermination(10, TimeUnit.SECONDS);
// 3. CachedThreadPool
System.out.println("\n=== 3. CachedThreadPool ===");
ExecutorService cachedExecutor = Executors.newCachedThreadPool();
for(int i = 1; i <= 20; i++) {
int taskId = i;
cachedExecutor.execute(() -> {
System.out.println("Quick task " + taskId + " by " +
Thread.currentThread().getName());
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
cachedExecutor.shutdown();
cachedExecutor.awaitTermination(5, TimeUnit.SECONDS);
// 4. ScheduledThreadPool
System.out.println("\n=== 4. ScheduledThreadPool ===");
ScheduledExecutorService scheduledExecutor = Executors.newScheduledThreadPool(2);
// Schedule task to run after delay
System.out.println("Scheduling tasks...");
scheduledExecutor.schedule(() -> {
System.out.println("Task executed after 2 seconds delay");
}, 2, TimeUnit.SECONDS);
// Schedule task to run periodically
ScheduledFuture> periodicTask = scheduledExecutor.scheduleAtFixedRate(() -> {
System.out.println("Periodic task executed at: " + new Date());
}, 1, 2, TimeUnit.SECONDS);
// Schedule task with fixed delay
scheduledExecutor.scheduleWithFixedDelay(() -> {
System.out.println("Fixed delay task executed");
}, 1, 3, TimeUnit.SECONDS);
Thread.sleep(10000); // Let scheduled tasks run
periodicTask.cancel(false); // Cancel periodic task
scheduledExecutor.shutdown();
// 5. Callable and Future
System.out.println("\n=== 5. Callable and Future ===");
ExecutorService callableExecutor = Executors.newFixedThreadPool(3);
List> callableFutures = new ArrayList<>();
for(int i = 1; i <= 5; i++) {
int number = i;
Callable callable = () -> {
System.out.println("Calculating square of " + number);
Thread.sleep(1000);
return number * number;
};
callableFutures.add(callableExecutor.submit(callable));
}
for(Future future : callableFutures) {
try {
System.out.println("Square result: " + future.get());
} catch (Exception e) {
e.printStackTrace();
}
}
callableExecutor.shutdown();
// 6. CompletableFuture (Java 8+)
System.out.println("\n=== 6. CompletableFuture ===");
// Simple CompletableFuture
CompletableFuture future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Async task 1 running...");
try { Thread.sleep(1000); } catch (InterruptedException e) {}
return "Hello";
});
CompletableFuture future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Async task 2 running...");
try { Thread.sleep(1500); } catch (InterruptedException e) {}
return "World";
});
// Combine futures
CompletableFuture combined = future1.thenCombine(future2, (s1, s2) -> {
return s1 + " " + s2;
});
System.out.println("Combined result: " + combined.get());
// Chain futures
CompletableFuture chain = CompletableFuture.supplyAsync(() -> "Number: ")
.thenApply(s -> s + "10")
.thenApply(s -> s + " * 2 = ")
.thenApply(s -> s + (10 * 2))
.thenApply(String::toUpperCase);
System.out.println("Chained result: " + chain.get());
// Handle exceptions
CompletableFuture safeFuture = CompletableFuture.supplyAsync(() -> {
if(Math.random() > 0.5) {
throw new RuntimeException("Random failure!");
}
return 42;
}).exceptionally(ex -> {
System.out.println("Exception handled: " + ex.getMessage());
return 0;
});
System.out.println("Safe result: " + safeFuture.get());
// 7. ForkJoinPool
System.out.println("\n=== 7. ForkJoinPool ===");
ForkJoinPool forkJoinPool = new ForkJoinPool();
int[] numbers = new int[1000];
for(int i = 0; i < numbers.length; i++) {
numbers[i] = i + 1;
}
SumTask task = new SumTask(numbers, 0, numbers.length);
ForkJoinTask result = forkJoinPool.submit(task);
System.out.println("Sum of 1-1000: " + result.get());
forkJoinPool.shutdown();
// 8. ThreadPoolExecutor (Advanced configuration)
System.out.println("\n=== 8. Custom ThreadPoolExecutor ===");
ThreadPoolExecutor customExecutor = new ThreadPoolExecutor(
2, // Core pool size
5, // Maximum pool size
60, TimeUnit.SECONDS, // Keep-alive time
new ArrayBlockingQueue<>(10), // Work queue
Executors.defaultThreadFactory(), // Thread factory
new ThreadPoolExecutor.CallerRunsPolicy() // Rejection policy
);
for(int i = 1; i <= 20; i++) {
int taskId = i;
customExecutor.execute(() -> {
System.out.println("Custom pool task " + taskId +
" by " + Thread.currentThread().getName());
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Monitor thread pool
System.out.println("Pool size: " + customExecutor.getPoolSize());
System.out.println("Active threads: " + customExecutor.getActiveCount());
System.out.println("Queue size: " + customExecutor.getQueue().size());
customExecutor.shutdown();
customExecutor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("\nAll executor examples completed.");
}
}
// ForkJoinTask example
class SumTask extends RecursiveTask {
private static final int THRESHOLD = 100;
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
if(end - start <= THRESHOLD) {
// Compute directly
int sum = 0;
for(int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Split task
int middle = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, middle);
SumTask rightTask = new SumTask(array, middle, end);
leftTask.fork(); // Execute asynchronously
int rightResult = rightTask.compute(); // Compute directly
int leftResult = leftTask.join(); // Wait for result
return leftResult + rightResult;
}
}
}
6. Concurrent Collections
Java provides thread-safe collection implementations in the java.util.concurrent package that offer better performance than synchronized wrappers for concurrent access.
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class ConcurrentCollectionsExample {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== Concurrent Collections Examples ===\n");
// 1. ConcurrentHashMap
System.out.println("=== 1. ConcurrentHashMap ===");
ConcurrentHashMap concurrentMap = new ConcurrentHashMap<>();
Thread[] mapWriters = new Thread[5];
Thread[] mapReaders = new Thread[3];
// Writers
for(int i = 0; i < mapWriters.length; i++) {
int threadId = i;
mapWriters[i] = new Thread(() -> {
for(int j = 0; j < 100; j++) {
String key = "Key-" + threadId + "-" + j;
concurrentMap.put(key, j);
}
});
mapWriters[i].start();
}
// Readers
for(int i = 0; i < mapReaders.length; i++) {
mapReaders[i] = new Thread(() -> {
for(int j = 0; j < 50; j++) {
concurrentMap.forEach((k, v) -> {
// Safe iteration
});
}
});
mapReaders[i].start();
}
for(Thread t : mapWriters) t.join();
for(Thread t : mapReaders) t.join();
System.out.println("ConcurrentHashMap size: " + concurrentMap.size());
// Atomic operations on ConcurrentHashMap
concurrentMap.compute("test", (k, v) -> v == null ? 1 : v + 1);
concurrentMap.merge("merge", 1, Integer::sum);
// 2. CopyOnWriteArrayList
System.out.println("\n=== 2. CopyOnWriteArrayList ===");
CopyOnWriteArrayList copyOnWriteList = new CopyOnWriteArrayList<>();
// Add initial elements
copyOnWriteList.addAll(Arrays.asList("A", "B", "C", "D"));
Thread writer = new Thread(() -> {
for(int i = 0; i < 5; i++) {
copyOnWriteList.add("New-" + i);
System.out.println("Writer added element");
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread reader = new Thread(() -> {
for(int i = 0; i < 5; i++) {
// Safe iteration (snapshot iterator)
for(String item : copyOnWriteList) {
System.out.println("Reader saw: " + item);
}
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
writer.start();
reader.start();
writer.join();
reader.join();
// 3. BlockingQueue implementations
System.out.println("\n=== 3. BlockingQueue ===");
// ArrayBlockingQueue
BlockingQueue arrayQueue = new ArrayBlockingQueue<>(10);
Thread producer = new Thread(() -> {
for(int i = 1; i <= 20; i++) {
try {
arrayQueue.put(i); // Blocks if queue is full
System.out.println("Produced: " + i);
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
Thread consumer = new Thread(() -> {
for(int i = 1; i <= 20; i++) {
try {
Integer value = arrayQueue.take(); // Blocks if queue is empty
System.out.println("Consumed: " + value);
Thread.sleep(150);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
producer.start();
consumer.start();
producer.join();
consumer.join();
// LinkedBlockingQueue
BlockingQueue linkedQueue = new LinkedBlockingQueue<>(5);
// PriorityBlockingQueue
BlockingQueue priorityQueue = new PriorityBlockingQueue<>();
priorityQueue.offer(30);
priorityQueue.offer(10);
priorityQueue.offer(20);
System.out.println("PriorityQueue order: " +
priorityQueue.take() + ", " + priorityQueue.take() + ", " + priorityQueue.take());
// SynchronousQueue (direct handoff)
SynchronousQueue syncQueue = new SynchronousQueue<>();
new Thread(() -> {
try {
System.out.println("Waiting to take from sync queue...");
String item = syncQueue.take();
System.out.println("Taken: " + item);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
Thread.sleep(1000);
syncQueue.put("Direct Handoff");
// 4. ConcurrentLinkedQueue (Non-blocking)
System.out.println("\n=== 4. ConcurrentLinkedQueue ===");
ConcurrentLinkedQueue concurrentQueue = new ConcurrentLinkedQueue<>();
Thread[] queueThreads = new Thread[10];
for(int i = 0; i < queueThreads.length; i++) {
int threadId = i;
queueThreads[i] = new Thread(() -> {
for(int j = 0; j < 20; j++) {
concurrentQueue.offer("Thread-" + threadId + "-Item-" + j);
}
});
queueThreads[i].start();
}
for(Thread t : queueThreads) t.join();
System.out.println("Queue size: " + concurrentQueue.size());
// 5. ConcurrentSkipListMap (Sorted concurrent map)
System.out.println("\n=== 5. ConcurrentSkipListMap ===");
ConcurrentSkipListMap skipListMap = new ConcurrentSkipListMap<>();
for(int i = 0; i < 10; i++) {
skipListMap.put(i, "Value-" + i);
}
System.out.println("First entry: " + skipListMap.firstEntry());
System.out.println("Last entry: " + skipListMap.lastEntry());
System.out.println("Head map (<5): " + skipListMap.headMap(5));
// 6. Atomic variables
System.out.println("\n=== 6. Atomic Variables ===");
AtomicInteger atomicInt = new AtomicInteger(0);
AtomicReference atomicRef = new AtomicReference<>("Initial");
AtomicLong atomicLong = new AtomicLong(1000L);
Thread[] atomicThreads = new Thread[5];
for(int i = 0; i < atomicThreads.length; i++) {
atomicThreads[i] = new Thread(() -> {
for(int j = 0; j < 1000; j++) {
atomicInt.incrementAndGet();
}
});
atomicThreads[i].start();
}
for(Thread t : atomicThreads) t.join();
System.out.println("AtomicInteger value: " + atomicInt.get());
// Compare-and-set operation
boolean updated = atomicRef.compareAndSet("Initial", "Updated");
System.out.println("AtomicReference updated: " + updated + ", value: " + atomicRef.get());
// Atomic array
AtomicIntegerArray atomicArray = new AtomicIntegerArray(10);
atomicArray.set(0, 100);
atomicArray.incrementAndGet(0);
System.out.println("AtomicArray[0]: " + atomicArray.get(0));
// 7. CountDownLatch
System.out.println("\n=== 7. CountDownLatch ===");
CountDownLatch latch = new CountDownLatch(3);
for(int i = 1; i <= 3; i++) {
int workerId = i;
new Thread(() -> {
System.out.println("Worker " + workerId + " started");
try {
Thread.sleep(workerId * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Worker " + workerId + " completed");
latch.countDown();
}).start();
}
System.out.println("Main thread waiting for workers...");
latch.await(); // Wait for all workers
System.out.println("All workers completed!");
// 8. CyclicBarrier
System.out.println("\n=== 8. CyclicBarrier ===");
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("All threads reached barrier! Proceeding...");
});
for(int i = 1; i <= 3; i++) {
int threadId = i;
new Thread(() -> {
System.out.println("Thread " + threadId + " started");
try {
Thread.sleep(threadId * 1000L);
System.out.println("Thread " + threadId + " reached barrier");
barrier.await(); // Wait for other threads
System.out.println("Thread " + threadId + " continued");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
Thread.sleep(5000); // Let barrier example complete
// 9. Semaphore
System.out.println("\n=== 9. Semaphore ===");
Semaphore semaphore = new Semaphore(3); // Allow 3 concurrent accesses
Thread[] semaphoreThreads = new Thread[10];
for(int i = 0; i < semaphoreThreads.length; i++) {
int threadId = i;
semaphoreThreads[i] = new Thread(() -> {
try {
System.out.println("Thread " + threadId + " waiting for permit");
semaphore.acquire(); // Acquire permit
System.out.println("Thread " + threadId + " got permit, working...");
Thread.sleep(2000);
System.out.println("Thread " + threadId + " releasing permit");
semaphore.release(); // Release permit
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
semaphoreThreads[i].start();
}
for(Thread t : semaphoreThreads) t.join();
// 10. Exchanger
System.out.println("\n=== 10. Exchanger ===");
Exchanger exchanger = new Exchanger<>();
Thread exchanger1 = new Thread(() -> {
try {
String data = "Data from Thread 1";
System.out.println("Thread 1 sending: " + data);
String received = exchanger.exchange(data);
System.out.println("Thread 1 received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
Thread exchanger2 = new Thread(() -> {
try {
String data = "Data from Thread 2";
System.out.println("Thread 2 sending: " + data);
String received = exchanger.exchange(data);
System.out.println("Thread 2 received: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
exchanger1.start();
exchanger2.start();
exchanger1.join();
exchanger2.join();
System.out.println("\nAll concurrent collections examples completed.");
}
}
7. Multithreading Best Practices
- Race Conditions: Shared data accessed without synchronization
- Deadlocks: Circular wait for locks
- Thread Leaks: Threads not properly terminated
- Memory Consistency Errors: volatile/cache issues
- Thread Starvation: Low priority threads never run
- Resource Contention: Too many threads competing
- Context Switching Overhead: Too many threads
Best Practices
- Use Executor Framework instead of manual threads
- Prefer Runnable over extending Thread
- Use Concurrent Collections for thread-safe data structures
- Keep synchronized blocks small and simple
- Use immutable objects when possible
- Document thread safety guarantees
- Use ThreadLocal for per-thread data
Performance Tips
- Right number of threads: CPU cores + 1 for CPU-bound tasks
- Use thread pools for I/O-bound tasks
- Avoid excessive synchronization
- Use ReadWriteLock for read-heavy data
- Consider async/non-blocking I/O
- Use CompletableFuture for complex async workflows
- Profile and measure performance
Thread Safety Levels
Immutable: Safest, no synchronization needed
Thread-safe: Internal synchronization (ConcurrentHashMap)
Conditionally thread-safe: Requires external synchronization
Thread-compatible: Can be made thread-safe with external sync
Thread-hostile: Cannot be made thread-safe
- Immutable Object Pattern: Objects that cannot be modified
- Thread Pool Pattern: Reuse threads for multiple tasks
- Producer-Consumer Pattern: Decouple production and consumption
- Read-Write Lock Pattern: Optimize for read-heavy access
- Worker Thread Pattern: Dedicated threads for specific tasks
- Future Pattern: Represent result of async computation
8. Practical Example: Web Crawler with Thread Pool
Multi-threaded Web Crawler
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
public class WebCrawlerExample {
static class WebPage {
private String url;
private String content;
private int depth;
public WebPage(String url, String content, int depth) {
this.url = url;
this.content = content;
this.depth = depth;
}
public String getUrl() { return url; }
public String getContent() { return content; }
public int getDepth() { return depth; }
@Override
public String toString() {
return "WebPage{url='" + url + "', depth=" + depth + "}";
}
}
static class WebCrawler {
private final ExecutorService executor;
private final Set visitedUrls;
private final ConcurrentLinkedQueue results;
private final AtomicInteger activeCrawlers;
private final CountDownLatch completionLatch;
private final int maxDepth;
private final int maxPages;
public WebCrawler(int threadPoolSize, int maxDepth, int maxPages) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
this.visitedUrls = ConcurrentHashMap.newKeySet();
this.results = new ConcurrentLinkedQueue<>();
this.activeCrawlers = new AtomicInteger(0);
this.maxDepth = maxDepth;
this.maxPages = maxPages;
this.completionLatch = new CountDownLatch(1);
}
public void startCrawling(String startUrl) {
System.out.println("Starting crawl from: " + startUrl);
submitCrawlTask(startUrl, 0);
// Monitor completion
new Thread(this::monitorCompletion).start();
}
private void submitCrawlTask(String url, int depth) {
if(depth > maxDepth || visitedUrls.size() >= maxPages) {
return;
}
if(visitedUrls.add(url)) {
activeCrawlers.incrementAndGet();
executor.submit(() -> crawlPage(url, depth));
}
}
private void crawlPage(String url, int depth) {
try {
// Simulate HTTP request delay
Thread.sleep(100 + (int)(Math.random() * 400));
// Simulate page content
String content = "Content of " + url;
WebPage page = new WebPage(url, content, depth);
results.offer(page);
System.out.println("Crawled: " + url + " (depth: " + depth +
", active: " + activeCrawlers.get() + ")");
// Simulate finding links (random links for demo)
List links = extractLinks(url, depth);
for(String link : links) {
submitCrawlTask(link, depth + 1);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
if(activeCrawlers.decrementAndGet() == 0) {
checkCompletion();
}
}
}
private List extractLinks(String url, int depth) {
List links = new ArrayList<>();
if(depth < maxDepth) {
// Generate random links for demonstration
int linkCount = 2 + (int)(Math.random() * 4);
for(int i = 0; i < linkCount; i++) {
links.add(url + "/link" + i + "?depth=" + (depth + 1));
}
}
return links;
}
private void monitorCompletion() {
try {
completionLatch.await();
executor.shutdown();
executor.awaitTermination(30, TimeUnit.SECONDS);
printResults();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
private void checkCompletion() {
// Wait a bit to see if new tasks are submitted
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if(activeCrawlers.get() == 0) {
completionLatch.countDown();
}
}
private void printResults() {
System.out.println("\n=== CRAWL RESULTS ===");
System.out.println("Total pages crawled: " + results.size());
System.out.println("Unique URLs visited: " + visitedUrls.size());
System.out.println("\nCrawled pages by depth:");
Map> byDepth = new TreeMap<>();
for(WebPage page : results) {
byDepth.computeIfAbsent(page.getDepth(), k -> new ArrayList<>())
.add(page);
}
for(Map.Entry> entry : byDepth.entrySet()) {
System.out.println("Depth " + entry.getKey() + ": " +
entry.getValue().size() + " pages");
}
System.out.println("\nSample pages:");
results.stream()
.limit(10)
.forEach(System.out::println);
}
public void stop() {
executor.shutdownNow();
completionLatch.countDown();
}
}
static class CrawlerWithCompletableFuture {
private final ExecutorService executor;
private final Set visitedUrls;
private final List> futures;
public CrawlerWithCompletableFuture(int threadPoolSize) {
this.executor = Executors.newFixedThreadPool(threadPoolSize);
this.visitedUrls = ConcurrentHashMap.newKeySet();
this.futures = new ArrayList<>();
}
public CompletableFuture crawlAsync(String url, int maxDepth) {
CompletableFuture future = CompletableFuture.runAsync(() -> {
crawlRecursive(url, 0, maxDepth);
}, executor);
futures.add(future);
return future;
}
private void crawlRecursive(String url, int depth, int maxDepth) {
if(depth > maxDepth || !visitedUrls.add(url)) {
return;
}
System.out.println("Async crawling: " + url + " (depth: " + depth + ")");
// Simulate page processing
try {
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
// Simulate finding and crawling child links
if(depth < maxDepth) {
List childLinks = Arrays.asList(
url + "/child1",
url + "/child2",
url + "/child3"
);
List> childFutures = new ArrayList<>();
for(String childUrl : childLinks) {
CompletableFuture childFuture = CompletableFuture
.runAsync(() -> crawlRecursive(childUrl, depth + 1, maxDepth), executor);
childFutures.add(childFuture);
}
// Wait for all child crawls to complete
CompletableFuture.allOf(childFutures.toArray(new CompletableFuture[0]))
.join();
}
}
public void shutdown() throws InterruptedException {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.join();
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
System.out.println("Async crawl completed. Total URLs: " + visitedUrls.size());
}
}
public static void main(String[] args) throws Exception {
System.out.println("=== MULTI-THREADED WEB CRAWLER DEMO ===\n");
// Example 1: Traditional thread pool crawler
System.out.println("=== Example 1: Thread Pool Crawler ===");
WebCrawler crawler1 = new WebCrawler(4, 3, 50);
crawler1.startCrawling("http://example.com/page1");
// Wait for crawler to complete
Thread.sleep(10000);
crawler1.stop();
// Example 2: CompletableFuture-based crawler
System.out.println("\n\n=== Example 2: CompletableFuture Crawler ===");
CrawlerWithCompletableFuture asyncCrawler = new CrawlerWithCompletableFuture(3);
CompletableFuture mainCrawl = asyncCrawler.crawlAsync("http://async.com/start", 2);
// Add timeout
CompletableFuture timeoutFuture = mainCrawl
.orTimeout(5, TimeUnit.SECONDS)
.exceptionally(ex -> {
System.out.println("Crawl timed out: " + ex.getMessage());
return null;
});
timeoutFuture.join();
asyncCrawler.shutdown();
// Example 3: Rate-limited crawler with Semaphore
System.out.println("\n\n=== Example 3: Rate-Limited Crawler ===");
rateLimitedCrawlDemo();
System.out.println("\nAll crawler examples completed!");
}
private static void rateLimitedCrawlDemo() throws InterruptedException {
Semaphore rateLimiter = new Semaphore(2); // Allow 2 concurrent requests
ExecutorService executor = Executors.newFixedThreadPool(5);
AtomicInteger successfulCrawls = new AtomicInteger(0);
List> futures = new ArrayList<>();
for(int i = 1; i <= 10; i++) {
int pageId = i;
Future> future = executor.submit(() -> {
try {
rateLimiter.acquire(); // Get permit
System.out.println("Crawling page " + pageId +
" (permits left: " + rateLimiter.availablePermits() + ")");
// Simulate crawl
Thread.sleep(500);
successfulCrawls.incrementAndGet();
rateLimiter.release(); // Release permit
System.out.println("Completed page " + pageId);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
futures.add(future);
}
// Wait for all tasks
for(Future> future : futures) {
future.get();
}
executor.shutdown();
executor.awaitTermination(5, TimeUnit.SECONDS);
System.out.println("Rate-limited crawl completed: " +
successfulCrawls.get() + "/10 pages");
}
}
9. Performance Testing and Benchmarking
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
import java.util.*;
public class PerformanceBenchmark {
public static void main(String[] args) throws Exception {
System.out.println("=== MULTITHREADING PERFORMANCE BENCHMARKS ===\n");
int availableProcessors = Runtime.getRuntime().availableProcessors();
System.out.println("Available processors: " + availableProcessors);
// Benchmark 1: Thread Creation Overhead
System.out.println("\n=== Benchmark 1: Thread Creation Overhead ===");
benchmarkThreadCreation();
// Benchmark 2: ThreadPool vs Manual Threads
System.out.println("\n=== Benchmark 2: ThreadPool vs Manual Threads ===");
benchmarkThreadPools();
// Benchmark 3: Synchronization Methods
System.out.println("\n=== Benchmark 3: Synchronization Methods ===");
benchmarkSynchronization();
// Benchmark 4: Concurrent Collections
System.out.println("\n=== Benchmark 4: Concurrent Collections ===");
benchmarkConcurrentCollections();
// Benchmark 5: Optimal Thread Count
System.out.println("\n=== Benchmark 5: Optimal Thread Count ===");
benchmarkOptimalThreadCount();
System.out.println("\nAll benchmarks completed!");
}
private static void benchmarkThreadCreation() throws InterruptedException {
int threadCount = 10000;
// Manual thread creation
long startTime = System.currentTimeMillis();
Thread[] threads = new Thread[threadCount];
for(int i = 0; i < threadCount; i++) {
threads[i] = new Thread(() -> {
// Do nothing
});
threads[i].start();
}
for(Thread thread : threads) {
thread.join();
}
long manualTime = System.currentTimeMillis() - startTime;
// Thread pool creation
startTime = System.currentTimeMillis();
ExecutorService executor = Executors.newFixedThreadPool(availableProcessors());
for(int i = 0; i < threadCount; i++) {
executor.submit(() -> {
// Do nothing
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
long poolTime = System.currentTimeMillis() - startTime;
System.out.println("Manual threads (" + threadCount + "): " + manualTime + "ms");
System.out.println("Thread pool (" + threadCount + "): " + poolTime + "ms");
System.out.println("Improvement: " +
String.format("%.1f", (double)manualTime/poolTime) + "x faster");
}
private static void benchmarkThreadPools() throws Exception {
int taskCount = 1000;
int[] threadCounts = {1, 2, 4, 8, 16, 32};
System.out.println("Tasks: " + taskCount + ", CPU-bound workload");
for(int threads : threadCounts) {
long time = testThreadPool(threads, taskCount, true);
System.out.printf("Threads: %2d, Time: %5dms, Efficiency: %.1f%%\n",
threads, time, (threads == 1 ? 100 : (double)threads/time*1000));
}
System.out.println("\nTasks: " + taskCount + ", IO-bound workload");
for(int threads : threadCounts) {
long time = testThreadPool(threads, taskCount, false);
System.out.printf("Threads: %2d, Time: %5dms\n", threads, time);
}
}
private static long testThreadPool(int threadCount, int taskCount, boolean cpuBound)
throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
List> futures = new ArrayList<>();
for(int i = 0; i < taskCount; i++) {
futures.add(executor.submit(() -> {
if(cpuBound) {
// CPU-intensive task
long result = 0;
for(int j = 0; j < 1000000; j++) {
result += Math.sqrt(j);
}
} else {
// IO-intensive task (simulated)
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}));
}
for(Future> future : futures) {
try {
future.get();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return System.currentTimeMillis() - startTime;
}
private static void benchmarkSynchronization() throws InterruptedException {
int iterations = 1000000;
int threadCount = 4;
System.out.println("Iterations: " + iterations + ", Threads: " + threadCount);
// No synchronization
long time = testCounter(new UnsafeCounter(), iterations, threadCount);
System.out.println("No sync: " + time + "ms");
// Synchronized method
time = testCounter(new SynchronizedCounter(), iterations, threadCount);
System.out.println("Synchronized: " + time + "ms");
// ReentrantLock
time = testCounter(new ReentrantLockCounter(), iterations, threadCount);
System.out.println("ReentrantLock:" + time + "ms");
// AtomicInteger
time = testCounter(new AtomicCounter(), iterations, threadCount);
System.out.println("AtomicInteger:" + time + "ms");
}
private static long testCounter(Counter counter, int iterations, int threadCount)
throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
List> futures = new ArrayList<>();
for(int i = 0; i < threadCount; i++) {
futures.add(executor.submit(() -> {
for(int j = 0; j < iterations; j++) {
counter.increment();
}
}));
}
for(Future> future : futures) {
future.get();
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return System.currentTimeMillis() - startTime;
}
private static void benchmarkConcurrentCollections() throws InterruptedException {
int operations = 100000;
int threadCount = 8;
System.out.println("Operations: " + operations + ", Threads: " + threadCount);
// HashMap with external synchronization
long time = testMap(new HashMap<>(), operations, threadCount, true);
System.out.println("HashMap + sync: " + time + "ms");
// ConcurrentHashMap
time = testMap(new ConcurrentHashMap<>(), operations, threadCount, false);
System.out.println("ConcurrentHashMap: " + time + "ms");
// Collections.synchronizedMap
time = testMap(Collections.synchronizedMap(new HashMap<>()),
operations, threadCount, false);
System.out.println("synchronizedMap: " + time + "ms");
}
private static long testMap(Map map, int operations,
int threadCount, boolean externalSync)
throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
List> futures = new ArrayList<>();
for(int i = 0; i < threadCount; i++) {
int threadId = i;
futures.add(executor.submit(() -> {
for(int j = 0; j < operations; j++) {
String key = "key-" + threadId + "-" + j;
if(externalSync) {
synchronized(map) {
map.put(key, j);
map.get(key);
}
} else {
map.put(key, j);
map.get(key);
}
}
}));
}
for(Future> future : futures) {
future.get();
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return System.currentTimeMillis() - startTime;
}
private static void benchmarkOptimalThreadCount() throws Exception {
int taskCount = 100;
int maxThreads = Runtime.getRuntime().availableProcessors() * 4;
System.out.println("CPU Cores: " + availableProcessors());
System.out.println("Testing thread counts from 1 to " + maxThreads);
System.out.println("\nCPU-bound tasks (matrix multiplication):");
for(int threads = 1; threads <= maxThreads; threads *= 2) {
long time = testMatrixMultiplication(threads, taskCount);
System.out.printf("Threads: %2d, Time: %5dms, Speedup: %.2fx\n",
threads, time, (double)maxThreads/threads);
}
System.out.println("\nIO-bound tasks (simulated web requests):");
for(int threads = 1; threads <= maxThreads; threads *= 2) {
long time = testSimulatedIO(threads, taskCount);
System.out.printf("Threads: %2d, Time: %5dms\n", threads, time);
}
}
private static long testMatrixMultiplication(int threadCount, int taskCount)
throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
List> futures = new ArrayList<>();
for(int i = 0; i < taskCount; i++) {
futures.add(executor.submit(() -> {
// Simulate CPU-intensive matrix multiplication
int size = 100;
double[][] a = new double[size][size];
double[][] b = new double[size][size];
double[][] c = new double[size][size];
// Initialize matrices
for(int x = 0; x < size; x++) {
for(int y = 0; y < size; y++) {
a[x][y] = Math.random();
b[x][y] = Math.random();
}
}
// Multiply
for(int x = 0; x < size; x++) {
for(int y = 0; y < size; y++) {
for(int z = 0; z < size; z++) {
c[x][y] += a[x][z] * b[z][y];
}
}
}
}));
}
for(Future> future : futures) {
future.get();
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return System.currentTimeMillis() - startTime;
}
private static long testSimulatedIO(int threadCount, int taskCount)
throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(threadCount);
long startTime = System.currentTimeMillis();
List> futures = new ArrayList<>();
for(int i = 0; i < taskCount; i++) {
futures.add(executor.submit(() -> {
// Simulate IO-bound task (network/database call)
try {
Thread.sleep(100); // Simulate 100ms IO delay
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}));
}
for(Future> future : futures) {
future.get();
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
return System.currentTimeMillis() - startTime;
}
private static int availableProcessors() {
return Runtime.getRuntime().availableProcessors();
}
}
// Counter interfaces for benchmark
interface Counter {
void increment();
long getValue();
}
class UnsafeCounter implements Counter {
private long value = 0;
public void increment() {
value++;
}
public long getValue() {
return value;
}
}
class SynchronizedCounter implements Counter {
private long value = 0;
public synchronized void increment() {
value++;
}
public synchronized long getValue() {
return value;
}
}
class ReentrantLockCounter implements Counter {
private long value = 0;
private final ReentrantLock lock = new ReentrantLock();
public void increment() {
lock.lock();
try {
value++;
} finally {
lock.unlock();
}
}
public long getValue() {
lock.lock();
try {
return value;
} finally {
lock.unlock();
}
}
}
class AtomicCounter implements Counter {
private final AtomicLong value = new AtomicLong(0);
public void increment() {
value.incrementAndGet();
}
public long getValue() {
return value.get();
}
}