Fila de Conjuntos Simultâneos

Talvez esta seja uma pergunta boba, mas não consigo encontrar uma resposta óbvia.

Eu preciso de uma fila FIFO simultânea que contenha apenas valores exclusivos. A tentativa de adicionar um valor que já existe na fila simplesmente ignora esse valor. Que, se não fosse pela segurança do thread, seria trivial. Existe uma estrutura de dados em Java ou talvez um snipit de código nas interwebs que exibem esse comportamento?

Se você quiser uma melhor simultaneidade do que a synchronization completa, existe uma maneira que eu conheço para fazer isso, usando um ConcurrentHashMap como o mapa de apoio. O seguinte é apenas um esboço.

 public final class ConcurrentHashSet extends ForwardingSet implements Set, Queue { private enum Dummy { VALUE } private final ConcurrentMap map; ConcurrentHashSet(ConcurrentMap map) { super(map.keySet()); this.map = Preconditions.checkNotNull(map); } @Override public boolean add(E element) { return map.put(element, Dummy.VALUE) == null; } @Override public boolean addAll(Collection newElements) { // just the standard implementation boolean modified = false; for (E element : newElements) { modified |= add(element); } return modified; } @Override public boolean offer(E element) { return add(element); } @Override public E remove() { E polled = poll(); if (polled == null) { throw new NoSuchElementException(); } return polled; } @Override public E poll() { for (E element : this) { // Not convinced that removing via iterator is viable (check this?) if (map.remove(element) != null) { return element; } } return null; } @Override public E element() { return iterator().next(); } @Override public E peek() { Iterator iterator = iterator(); return iterator.hasNext() ? iterator.next() : null; } } 

Nem tudo é luz do sol com essa abordagem. Nós não temos nenhuma maneira decente de selecionar um elemento head diferente de usar entrySet().iterator().next() , o resultado é que o mapa fica cada vez mais desequilibrado com o passar do tempo. Esse desequilíbrio é um problema devido a maiores colisões de buckets e maior contenção de segmento.

Nota: este código usa o Guava em alguns lugares.

Não há uma coleção interna que faz isso. Existem algumas implementações simultâneas do Set que podem ser usadas junto com uma Queue simultânea.

Por exemplo, um item é adicionado à fila somente depois que foi adicionado ao conjunto e cada item removido da fila é removido do conjunto. Nesse caso, o conteúdo da fila, logicamente, é realmente o que está no conjunto, e a fila é usada apenas para rastrear o pedido e fornecer operações take() e poll() eficientes encontradas apenas em um BlockingQueue .

Um java.util.concurrent.ConcurrentLinkedQueue leva você para a maior parte do caminho.

Embrulhe o ConcurrentLinkedQueue com sua própria class que verifica a exclusividade de um add. Seu código precisa ser thread-safe.

Eu usaria um LinkedHashSet sincronizado até que houvesse justificativa suficiente para considerar as alternativas. O principal benefício que uma solução mais concorrente poderia oferecer é a divisão de bloqueio.

A abordagem concorrente mais simples seria aa ConcurrentHashMap (atuando como um conjunto) e ConcurrentLinkedQueue. A ordenação das operações forneceria a restrição desejada. Uma oferta () deve primeiro executar um CHM # putIfAbsent () e, se for bem sucedida, inserir no CLQ. Uma pesquisa () levaria do CLQ e, em seguida, removê-lo do CHM. Isso significa que consideramos uma input em nossa fila, se estiver no mapa, e o CLQ fornece a ordenação. O desempenho pode ser ajustado aumentando o nível de simultaneidade do mapa. Se você é tolerante a uma ousadia adicional, então um CHM # get () barato pode agir como uma pré-condição razoável (mas pode ser prejudicado por ser uma visão um pouco obsoleta).

O que você quer dizer com uma fila concorrente com a semântica Set? Se você quer dizer uma estrutura verdadeiramente concorrente (em oposição a uma estrutura segura para thread), então eu diria que você está pedindo um pônei.

O que acontece, por exemplo, se você chamar put(element) e detectar que algo já está lá que foi imediatamente removido? Por exemplo, o que significa no seu caso se offer(element) || queue.contains(element) offer(element) || queue.contains(element) retorna false ?

Esses tipos de coisas muitas vezes precisam ser pensados ​​de maneira ligeiramente diferente em um mundo concorrente, já que muitas vezes nada é como parece, a menos que você pare o mundo (bloqueie-o). Caso contrário, você geralmente está olhando para algo no passado. Então, o que você está realmente tentando fazer?

Talvez estenda o ArrayBlockingQueue . Para obter access ao bloqueio (package-access), tive que colocar minha subclass dentro do mesmo pacote. Advertência: Eu não testei isso.

 package java.util.concurrent; import java.util.Collection; import java.util.concurrent.locks.ReentrantLock; public class DeDupingBlockingQueue extends ArrayBlockingQueue { public DeDupingBlockingQueue(int capacity) { super(capacity); } public DeDupingBlockingQueue(int capacity, boolean fair) { super(capacity, fair); } public DeDupingBlockingQueue(int capacity, boolean fair, Collection c) { super(capacity, fair, c); } @Override public boolean add(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return false; return super.add(e); } finally { lock.unlock(); } } @Override public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return true; return super.offer(e); } finally { lock.unlock(); } } @Override public void put(E e) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //Should this be lock.lock() instead? try { if (contains(e)) return; super.put(e); //if it blocks, it does so without holding the lock. } finally { lock.unlock(); } } @Override public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { final ReentrantLock lock = this.lock; lock.lock(); try { if (contains(e)) return true; return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock. } finally { lock.unlock(); } } } 

Uma resposta simples para uma fila de objects únicos pode ser a seguinte:

 import java.util.concurrent.ConcurrentLinkedQueue; public class FinalQueue { class Bin { private int a; private int b; public Bin(int a, int b) { this.a = a; this.b = b; } @Override public int hashCode() { return a * b; } public String toString() { return a + ":" + b; } @Override public boolean equals(Object obj) { if (this == obj) return true; if (obj == null) return false; if (getClass() != obj.getClass()) return false; Bin other = (Bin) obj; if ((a != other.a) || (b != other.b)) return false; return true; } } private ConcurrentLinkedQueue queue; public FinalQueue() { queue = new ConcurrentLinkedQueue(); } public synchronized void enqueue(Bin ipAddress) { if (!queue.contains(ipAddress)) queue.add(ipAddress); } public Bin dequeue() { return queue.poll(); } public String toString() { return "" + queue; } /** * @param args */ public static void main(String[] args) { FinalQueue queue = new FinalQueue(); Bin a = queue.new Bin(2,6); queue.enqueue(a); queue.enqueue(queue.new Bin(13, 3)); queue.enqueue(queue.new Bin(13, 3)); queue.enqueue(queue.new Bin(14, 3)); queue.enqueue(queue.new Bin(13, 9)); queue.enqueue(queue.new Bin(18, 3)); queue.enqueue(queue.new Bin(14, 7)); Bin x= queue.dequeue(); System.out.println(xa); System.out.println(queue.toString()); System.out.println("Dequeue..." + queue.dequeue()); System.out.println("Dequeue..." + queue.dequeue()); System.out.println(queue.toString()); } }