Servlet-3 Async Context, como fazer gravações assíncronas?

Descrição do Problema

A API do Servlet-3.0 permite desappend um contexto de solicitação / resposta e responder a ele posteriormente.

No entanto, se eu tentar escrever uma grande quantidade de dados, algo como:

AsyncContext ac = getWaitingContext() ; ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(some_big_data); out.flush() 

Ele pode realmente bloquear – e bloquear em casos de teste triviais – tanto para o Tomcat 7 quanto para o Jetty 8. Os tutoriais recomendam criar um conjunto de encadeamentos que lidaria com essa configuração – que geralmente é contra-positivo para uma arquitetura tradicional de 10K.

No entanto, se eu tiver 10.000 conexões abertas e um conjunto de threads de, digamos, 10 encadeamentos, é suficiente para até 1% de clientes que possuem conexões de baixa velocidade ou apenas bloquearem a conexão e bloquearem completamente a resposta do cometa ou atrasá-la significativamente.

A prática esperada é obter notificação de “gravação pronta” ou notificação de conclusão de E / S e continuar enviando os dados.

Como isso pode ser feito usando a API do Servlet-3.0, ou seja, como obtenho:

  • Notificação de conclusão assíncrona na operação de E / S.
  • Obtenha E / S sem bloqueio com notificação de gravação pronta.

Se isso não for suportado pela API do Servlet-3.0, há APIs específicas do servidor Web (como Jetty Continuation ou Tomcat CometEvent) que permitem manipular esses events de forma verdadeiramente assíncrona, sem falsificar a E / S assíncrona usando o pool de threads.

Alguém sabe?

E se isso não for possível, você pode confirmar isso com uma referência à documentação?

Demonstração do problema em um código de amostra

Eu tinha anexado o código abaixo que emula o stream de events.

Notas:

  • ele usa ServletOutputStream que lança o IOException para detectar clientes desconectados
  • ele envia mensagens de keep-alive para garantir que os clientes ainda estejam lá
  • Eu criei um pool de segmentos para “emular” operações assíncronas.

Nesse exemplo, defini explicitamente o conjunto de encadeamentos do tamanho 1 para mostrar o problema:

  • Inicie um aplicativo
  • Executar a partir de dois terminais curl http://localhost:8080/path/to/app (duas vezes)
  • Agora envie os dados com curd -dm=message http://localhost:8080/path/to/app
  • Ambos os clientes receberam os dados
  • Agora suspenda um dos clientes (Ctrl + Z) e envie a mensagem novamente curd -dm=message http://localhost:8080/path/to/app
  • Observe que outro cliente não suspenso não recebeu nada ou depois que a mensagem foi transferida parou de receber solicitações keep-alive porque outro thread está bloqueado.

Eu quero resolver esse problema sem usar pool de threads, porque com 1000-5000 conexões abertas eu posso esgotar o pool de threads muito rápido.

O código de amostra abaixo.


 import java.io.IOException; import java.util.HashSet; import java.util.Iterator; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.LinkedBlockingQueue; import javax.servlet.AsyncContext; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.annotation.WebServlet; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.ServletOutputStream; @WebServlet(urlPatterns = "", asyncSupported = true) public class HugeStreamWithThreads extends HttpServlet { private long id = 0; private String message = ""; private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue()); // it is explicitly small for demonstration purpose private final Thread timer = new Thread(new Runnable() { public void run() { try { while(true) { Thread.sleep(1000); sendKeepAlive(); } } catch(InterruptedException e) { // exit } } }); class RunJob implements Runnable { volatile long lastUpdate = System.nanoTime(); long id = 0; AsyncContext ac; RunJob(AsyncContext ac) { this.ac = ac; } public void keepAlive() { if(System.nanoTime() - lastUpdate > 1000000000L) pool.submit(this); } String formatMessage(String msg) { StringBuilder sb = new StringBuilder(); sb.append("id"); sb.append(id); for(int i=0;i<100000;i++) { sb.append("data:"); sb.append(msg); sb.append("\n"); } sb.append("\n"); return sb.toString(); } public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); if(!sendMessage(message)) return; boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } boolean sendMessage(String message) { try { ServletOutputStream out = ac.getResponse().getOutputStream(); out.print(message); out.flush(); lastUpdate = System.nanoTime(); return true; } catch(IOException e) { ac.complete(); removeContext(this); return false; } } }; private HashSet asyncContexts = new HashSet(); @Override public void init(ServletConfig config) throws ServletException { super.init(config); timer.start(); } @Override public void destroy() { for(;;){ try { timer.interrupt(); timer.join(); break; } catch(InterruptedException e) { continue; } } pool.shutdown(); super.destroy(); } protected synchronized void removeContext(RunJob ac) { asyncContexts.remove(ac); } // GET method is used to establish a stream connection @Override protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { // Content-Type header response.setContentType("text/event-stream"); response.setCharacterEncoding("utf-8"); // Access-Control-Allow-Origin header response.setHeader("Access-Control-Allow-Origin", "*"); final AsyncContext ac = request.startAsync(); ac.setTimeout(0); RunJob job = new RunJob(ac); asyncContexts.add(job); if(id!=0) { pool.submit(job); } } private synchronized void sendKeepAlive() { for(RunJob job : asyncContexts) { job.keepAlive(); } } // POST method is used to communicate with the server @Override protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException { request.setCharacterEncoding("utf-8"); id++; message = request.getParameter("m"); for(RunJob job : asyncContexts) { pool.submit(job); } } } 

O exemplo acima usa encadeamentos para evitar o bloqueio … No entanto, se o número de clientes de bloqueio for maior que o tamanho do conjunto de encadeamentos, ele será bloqueado.

Como poderia ser implementado sem bloquear?

Eu encontrei o Servlet 3.0 Asynchronous API complicado para implementar documentação correta e útil para ser escassa. Depois de muita tentativa e erro e tentando muitas abordagens diferentes, consegui encontrar uma solução robusta com a qual fiquei muito feliz. Quando olho para o meu código e o comparo ao seu, percebo uma grande diferença que pode ajudá-lo com seu problema específico. Eu uso um ServletResponse para gravar os dados e não um ServletOutputStream .

Aqui, minha class de servlet assíncrona se adaptou um pouco para o seu caso some_big_data :

 import java.io.IOException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; import javax.servlet.ServletConfig; import javax.servlet.ServletException; import javax.servlet.ServletResponse; import javax.servlet.annotation.WebInitParam; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpSession; import org.apache.log4j.Logger; @javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") }) public class AsyncServlet extends HttpServlet { private static final Logger logger = Logger.getLogger(AsyncServlet.class); public static final int CALLBACK_TIMEOUT = 10000; // ms /** executor service */ private ExecutorService exec; @Override public void init(ServletConfig config) throws ServletException { super.init(config); int size = Integer.parseInt(getInitParameter("threadpoolsize")); exec = Executors.newFixedThreadPool(size); } @Override public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException { final AsyncContext ctx = req.startAsync(); final HttpSession session = req.getSession(); // set the timeout ctx.setTimeout(CALLBACK_TIMEOUT); // attach listener to respond to lifecycle events of this AsyncContext ctx.addListener(new AsyncListener() { @Override public void onComplete(AsyncEvent event) throws IOException { logger.info("onComplete called"); } @Override public void onTimeout(AsyncEvent event) throws IOException { logger.info("onTimeout called"); } @Override public void onError(AsyncEvent event) throws IOException { logger.info("onError called: " + event.toString()); } @Override public void onStartAsync(AsyncEvent event) throws IOException { logger.info("onStartAsync called"); } }); enqueLongRunningTask(ctx, session); } /** * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact) * 

* if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked). */ private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) { exec.execute(new Runnable() { @Override public void run() { String some_big_data = getSomeBigData(); try { ServletResponse response = ctx.getResponse(); if (response != null) { response.getWriter().write(some_big_data); ctx.complete(); } else { throw new IllegalStateException(); // this is caught below } } catch (IllegalStateException ex) { logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called. } catch (Exception e) { logger.error("ERROR IN AsyncServlet", e); } } }); } /** destroy the executor */ @Override public void destroy() { exec.shutdown(); } }

Durante minha pesquisa sobre esse tópico, esse segmento continuou aparecendo, então imaginei mencionar aqui:

Servlet 3.1 introduziu operações assíncronas em ServletInputStream e ServletOutputStream . Veja ServletOutputStream.setWriteListener .

Um exemplo pode ser encontrado em http://docs.oracle.com/javaee/7/tutorial/servlets013.htm

Não podemos fazer com que as gravações sejam assíncronas. Nós, realisticamente, temos que viver com a limitação de que, quando escrevemos algo para um cliente, esperamos poder fazê-lo prontamente e podemos tratá-lo como um erro, se não o fizermos. Ou seja, se nosso objective é transmitir dados para o cliente o mais rápido possível e usar o status de bloqueio / não bloqueio do canal como forma de controlar o stream, ficamos sem sorte. Mas, se estamos enviando dados a uma taxa baixa que um cliente deve ser capaz de manipular, podemos ao menos desconectar prontamente clientes que não lêem com rapidez suficiente.

Por exemplo, em seu aplicativo, enviamos as keepalives a uma taxa lenta (a cada poucos segundos) e esperamos que os clientes sejam capazes de acompanhar todos os events que estão sendo enviados. Nós fazemos o esbanjamento dos dados para o cliente, e se ele não puder acompanhar, podemos desconectá-lo imediatamente e de forma limpa. Isso é um pouco mais limitado do que a verdadeira E / S assíncrona, mas deve atender sua necessidade (e, incidentalmente, minha).

O truque é que todos os methods para escrever saída que apenas lançam IOExceptions na verdade fazem um pouco mais que isso: na implementação, todas as chamadas para coisas que podem ser interrompidas () serão envolvidas com algo assim (tirado de Molhe 9):

 catch (InterruptedException x) throw (IOException)new InterruptedIOException().initCause(x); 

(Eu também noto que isso não acontece no Jetty 8, onde uma InterruptedException é registrada e o loop de bloqueio é imediatamente repetido. Presumivelmente, você faz para garantir que seu contêiner de servlet seja bem-comportado para usar esse truque.)

Ou seja, quando um cliente lento faz com que um encadeamento de gravação bloqueie, simplesmente forçamos a gravação a ser ativada como uma IOException chamando interrupt () no encadeamento. Pense nisso: o código sem bloqueio consumiria uma unidade de tempo em um de nossos encadeamentos de processamento para ser executado de qualquer maneira, portanto, usar gravações de bloqueio que são apenas abortadas (depois de um milissegundo) é realmente idêntico em princípio. Ainda estamos apenas mastigando um curto período de tempo no segmento, apenas marginalmente com menos eficiência.

Eu modifiquei o seu código para que o thread do timer principal execute um job para vincular o tempo em cada gravação antes de iniciarmos a gravação, e o job será cancelado se a gravação for concluída rapidamente, o que deve acontecer.

Uma nota final: em um contêiner de servlet bem implementado, fazer com que a E / S seja descartada deve ser segura. Seria bom se pudéssemos pegar o InterruptedIOException e tentar escrever novamente mais tarde. Talvez gostaríamos de dar aos clientes lentos um subconjunto dos events, caso eles não consigam acompanhar o stream completo. Tanto quanto eu posso dizer, no Jetty isso não é totalmente seguro. Se uma gravação for emitida, o estado interno do object HttpResponse poderá não ser consistente o suficiente para lidar com a reintrodução da gravação com segurança posteriormente. Espero que não seja prudente tentar enviar um contêiner de servlet dessa maneira, a menos que haja documentos específicos que eu perdi oferecendo essa garantia. Eu acho que a idéia é que uma conexão é projetada para ser desligada se ocorrer uma IOException.

Aqui está o código, com uma versão modificada de RunJob :: run () usando uma simples ilustração grotty (na verdade, nós gostaríamos de usar o thread do timer principal aqui em vez de girar um por gravação que é bobagem).

 public void run() { String message = null; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) { this.id = HugeStreamWithThreads.this.id; message = HugeStreamWithThreads.this.message; } } if(message == null) message = ":keep-alive\n\n"; else message = formatMessage(message); final Thread curr = Thread.currentThread(); Thread canceller = new Thread(new Runnable() { public void run() { try { Thread.sleep(2000); curr.interrupt(); } catch(InterruptedException e) { // exit } } }); canceller.start(); try { if(!sendMessage(message)) return; } finally { canceller.interrupt(); while (true) { try { canceller.join(); break; } catch (InterruptedException e) { } } } boolean once_again = false; synchronized(HugeStreamWithThreads.this) { if(this.id != HugeStreamWithThreads.this.id) once_again = true; } if(once_again) pool.submit(this); } 

spring é uma opção para você? O Spring-MVC 3.2 tem uma class chamada DeferredResult , que manipulará com elegância o cenário “10.000 conexões abertas / 10 threads de pool de servidores”.

Exemplo: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

Eu dei uma olhada rápida em sua listview, então talvez eu tenha perdido alguns pontos. A vantagem de um thread de pool é compartilhar resources de thread entre várias tarefas ao longo do tempo. Talvez você possa resolver seu problema espaçando respostas keepAlive de diferentes conexões http, em vez de agrupar todas elas ao mesmo tempo.

    Intereting Posts