Explore JAVA Concurrency-Concurrent Container Family Portrait

Explore JAVA Concurrency-Concurrent Container Family Portrait

14 concurrent containers, how many have you used?

Regardless of multi-threaded concurrency, container classes generally use thread-unsafe classes such as ArrayList and HashMap, which are more efficient. In concurrency scenarios, thread-safe container classes such as ConcurrentHashMap and ArrayBlockingQueue are often used. Although some efficiency is sacrificed, they are safe.

The thread-safe containers mentioned above are all under the java.util.concurrent package. There are a lot of concurrent containers under this package. Today, they all turned out to play around.

Just make a brief introduction, and then we will explore in depth later.

Introduction

  • ConcurrentHashMap: Concurrent HashMap
  • CopyOnWriteArrayList: Concurrent version of ArrayList
  • CopyOnWriteArraySet: Concurrent Set
  • ConcurrentLinkedQueue: Concurrent queue (based on linked list)
  • ConcurrentLinkedDeque: Concurrent queue (based on doubly linked list)
  • ConcurrentSkipListMap: Concurrent Map based on skip table
  • ConcurrentSkipListSet: Concurrent Set based on skip table
  • ArrayBlockingQueue: Blocking queue (based on array)
  • LinkedBlockingQueue: Blocking queue (based on linked list)
  • LinkedBlockingDeque: Blocking queue (based on doubly linked list)
  • PriorityBlockingQueue: thread-safe priority queue
  • SynchronousQueue: Read and write paired queues
  • LinkedTransferQueue: Data exchange queue based on linked list
  • DelayQueue: Delay queue

ConcurrentHashMap Concurrent HashMap

One of the most common concurrent containers can be used as a cache in concurrent scenarios. The bottom layer is still a hash table, but it has changed a lot in JAVA 8. JAVA 7 and JAVA 8 both use more versions, so they often compare the implementation of these two versions (such as Interview).

One big difference is that in JAVA 7, segmented locks are used to reduce lock competition , and segmented locks are abandoned in JAVA 8, and CAS (an optimistic lock) is used. . At the same time, in order to prevent the hash conflict from degenerating into a linked list ( In the event of a conflict, a linked list will be generated at this position, and objects with the same hash value will be linked together), and will be converted into a red-black tree after the length of the linked list reaches the threshold (8) (compared to the linked list, the query efficiency of the tree is more stable).

CopyOnWriteArrayList concurrent version of ArrayList

In the concurrent version of ArrayList, the underlying structure is also an array. The difference from ArrayList is that a new array is created when adding and deleting elements, adding or excluding specified objects in the new array, and finally replacing the original array with the new array .

Applicable scenarios: Since the read operation is not locked, and the write (add, delete, modify) operation is locked, it is suitable for scenarios where more reads and less writes are performed.

Limitations: Since there will be no locks when reading (reading efficiency is high, just like ordinary ArrayList), the current copy of the read, so dirty data may be read. If you mind, it is recommended not to.

Take a look at the source code and feel:

public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {
    final transient ReentrantLock lock = new ReentrantLock();
    private transient volatile Object[] array;

   //add element, lock
    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();//Lock when modifying to ensure concurrency safety
        try {
            Object[] elements = getArray();//current array
            int len ​​= elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);//Create a new array, one space larger than the old one
            newElements[len] = e;//put the elements to be added into the new array
            setArray(newElements);//replace the original array with the new array
            return true;
        } finally {
            lock.unlock();//unlock
        }
    }

   //read elements without locking, so old data may be read
    public E get(int index) {
        return get(getArray(), index);
    }
}

CopyOnWriteArraySet concurrent Set

Implementation based on CopyOnWriteArrayList (contains a CopyOnWriteArrayList member variable), that is to say, the bottom layer is an array, which means that each time add has to traverse the entire collection to know whether it exists, and when it does not exist, it needs to be inserted (locked).

Applicable scenario: Add one under the applicable scenario of CopyOnWriteArrayList, and the set should not be too large (it can't hurt to traverse all).

ConcurrentLinkedQueue Concurrent Queue (based on linked list)

Based on the concurrent queue implemented by the linked list, optimistic locking (CAS) is used to ensure thread safety. Because the data structure is a linked list, there is theoretically no queue size limit, which means that adding data must be successful.

ConcurrentLinkedDeque concurrent queue (based on doubly linked list)

The concurrent queue based on the doubly linked list can operate on the head and the tail separately, so in addition to the first-in first-out (FIFO), it can also be the first-in-last-out (FILO). Of course, it should be called a stack if it is first-in-last-out.

ConcurrentSkipListMap Concurrent Map based on skip table

SkipList is a skip list. The skip list is a space-for-time data structure. Through redundant data, the linked list is indexed layer by layer to achieve a similar binary search effect.

ConcurrentSkipListSet Concurrent Set based on skip table

Similar to the relationship between HashSet and HashMap, ConcurrentSkipListSet contains a ConcurrentSkipListMap, so I won’t go into details.

ArrayBlockingQueue blocking queue (based on array)

The blockable queue based on the array implementation must specify the size of the array when constructing it. If the array is full when you put something in it, it will block until there is a position (direct return and timeout waiting are also supported). A lock ReentrantLock is used to ensure thread safety.

Take an example with the offer operation:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
   /**
     * This lock is shared for reading and writing, and the threads communicate through the following two Conditions
     * These two Conditions are closely related to lock (which is generated by the lock method)
     * Wait/notify similar to Object
     */
    final ReentrantLock lock;

   /** Signal that the queue is not empty, and the thread that fetches the data needs to pay attention*/
    private final Condition notEmpty;

   /** The signal that the queue is not full, the thread that writes the data needs to pay attention*/
    private final Condition notFull;

   //keep blocking until something can be taken out
    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

   //Insert an element at the end, wait for the specified time when the queue is full, and return if it still cannot be inserted
    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {
        checkNotNull(e);
        long nanos = unit.toNanos(timeout);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();//locked
        try {
           //wait in a loop until the queue is free
            while (count == items.length) {
                if (nanos <= 0)
                    return false;//Waiting for timeout, return
               //Temporarily release the lock and wait for a while (may be awakened in advance and grab the lock, so it needs to cycle to determine the condition)
               //During this period of time, other threads may take away the element, so there is a chance to insert it
                nanos = notFull.awaitNanos(nanos);
            }
            enqueue(e);//Insert an element
            return true;
        } finally {
            lock.unlock();//Unlock
        }
    }

At first glance, it may seem a bit confusing. Reading and writing are both the same lock. If it happens to be empty when a reader thread comes, won't it be blocked all the time?

The answer lies in notEmpty and notFull. These two small things from lock give the lock a function similar to synchronized + wait + notify. Portal → finally figured out sleep/wait/notify/notifyAll

LinkedBlockingQueue blocking queue (based on linked list)

The blocking queue implemented based on the linked list has one more capacity limit than the non-blocking ConcurrentLinkedQueue. If it is not set, it defaults to the maximum value of int.

LinkedBlockingDeque blocking queue (based on doubly linked list)

Similar to LinkedBlockingQueue, but provides unique operations for doubly linked lists.

PriorityBlockingQueue thread-safe priority queue

When constructing, you can pass in a comparator, which can be regarded as the elements put in will be sorted, and then consumed in order when reading. Some low-priority elements may not be consumed for a long time, because higher-priority elements are constantly coming in.

SynchronousQueue Queue for synchronous data exchange

A fake queue, because it actually has no real space for storing elements, every insert operation must have a corresponding fetch operation, and it cannot continue to be put when it is not fetched.

A simple example to feel:

import java.util.concurrent.*;

public class Main {

    public static void main(String[] args) {
        SynchronousQueue<Integer> queue = new SynchronousQueue<>();
        new Thread(() -> {
            try {
               //No rest, crazy writing
                for (int i = 0;; i++) {
                    System.out.println("Put in: "+ i);
                    queue.put(i);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
        new Thread(() -> {
            try {
               //Get data in salted fish mode
                while (true) {
                    System.out.println("Take out: "+ queue.take());
                    Thread.sleep((long) (Math.random() * 2000));
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }).start();
    }

}

/* Output:

Put in: 0
Take out: 0
Put in: 1
Take out: 1
Put in: 2
Take out: 2
Put in: 3
Take out: 3

*/

It can be seen that the writing thread does not have any sleep. It can be said that it puts things in the queue with all its strength, while the reading thread is very inactive, and sleeps for a while after reading. The output result is that read and write operations appear in pairs.

One usage scenario in JAVA is Executors.newCachedThreadPool(), which creates a cache thread pool .

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(
            0,//The core thread is 0, and the useless threads are ruthlessly discarded
            Integer.MAX_VALUE,//The maximum number of threads is theoretically unlimited, and the machine resources will be emptied before this value is reached
            60L, TimeUnit.SECONDS,//The idle thread is destroyed after 60 seconds
            new SynchronousQueue<Runnable>());//If there is no free thread to take out the task during the offer, it will fail and the thread pool will create a new thread
    }

LinkedTransferQueue Data exchange queue based on linked list

The interface TransferQueue is implemented, and when an element is placed through the transfer method, if a thread is found to be blocked in fetching an element, the element will be directly given to the waiting thread. If no one is waiting to consume, then the element will be put at the end of the queue, and this method will block until someone reads the element. It is a bit similar to SynchronousQueue, but more powerful than it.

DelayQueue Delay Queue

The elements put in the queue can be taken out by the consumer after a specified delay, and the elements need to implement the Delayed interface.

summary

The above briefly introduced some container classes under the JAVA concurrent package. Knowing these things, when you encounter a suitable scenario, you can remember that there is a ready-made thing that can be used. If you want to know why, you have to explore more in the follow-up.