RabbitMQ文档翻译二(JAVA).工作队列

    技术2022-07-10  129

    在第一个教程中,我们编写了从命名队列发送和接收消息的程序。在本例中,我们将创建一个工作队列,用于在多个工作人员之间分配耗时的任务。

    工作队列(又名:任务队列)的主要思想是避免立即执行资源密集型任务,并且必须等待任务完成。相反,我们把任务安排在以后完成。我们将任务封装为消息并将其发送到队列。在后台运行的工作进程将从队列弹出(pop操作)任务并最终执行该作业。当您运行多个worker时,任务将在他们之间共享。

    这个概念在web应用程序中特别有用,因为在短的HTTP请求窗口期间无法处理复杂的任务。

    准备

    在本教程的前一部分中,我们发送了一条包含“Hello World!”的消息。现在我们将发送代表复杂任务的字符串。我们没有实际的任务,比如要调整大小的图像或要呈现的pdf文件,所以让我们假装很忙-通过使用Thread.sleep() 功能。我们将以字符串中的点的数量作为其复杂性;每个点将占“工作”的1秒。例如,一个由“Hello… ”描述的假任务需要三秒钟。

    我们将轻微修改上一个示例中的代码Send.java,使其允许从命令行发送任意消息。这个程序会将任务调度到我们的工作队列中,所以我们来命名它NewTask.java:

    String message = String.join(" ", argv); channel.basicPublish("", "hello", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'");

    我们的老朋友Recv.java程序也需要一些改变:它需要为消息体中的每个点伪造一秒钟的工作。它将处理传递的消息并执行任务,因此我们将其命名为Worker.java:

    DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); } }; boolean autoAck = true; // acknowledgment is covered below channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

    我们模拟执行时间的假任务:

    private static void doWork(String task) throws InterruptedException { for (char ch: task.toCharArray()) { if (ch == '.') Thread.sleep(1000); } }

    按照教程一(见上一篇)中的方法编译它们(使用工作目录中的jar文件和环境变量CP):

    javac -cp $CP NewTask.java Worker.java

    轮询分发

    使用任务队列的优点之一是能够轻松地并行处理工作。如果我们正在积累积压的工作,我们可以增加更多的工作线程,这样就可以很容易地扩大规模。

    首先,让我们尝试同时运行两个worker实例。它们都将从队列中获取消息,但具体是如何获得的呢?让我们看看。

    你需要打开三个控制台。两个将运行worker程序。这些控制台将是我们的两个消费者-C1和C2。

    # shell 1 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # shell 2 java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C

    在第三个控制台我们将发布新的任务。启动消费者后,您可以发布一些消息:

    # shell 3 java -cp $CP NewTask First message. # => [x] Sent 'First message.' java -cp $CP NewTask Second message.. # => [x] Sent 'Second message..' java -cp $CP NewTask Third message... # => [x] Sent 'Third message...' java -cp $CP NewTask Fourth message.... # => [x] Sent 'Fourth message....' java -cp $CP NewTask Fifth message..... # => [x] Sent 'Fifth message.....'

    让我们看看投递到worker的是什么:

    java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'First message.' # => [x] Received 'Third message...' # => [x] Received 'Fifth message.....' java -cp $CP Worker # => [*] Waiting for messages. To exit press CTRL+C # => [x] Received 'Second message..' # => [x] Received 'Fourth message....'

    默认情况下,RabbitMQ将按顺序将每条消息发送到下一个消费者。平均每个消费者将收到相同数量的消息。这种分发消息的方式称为轮询。可以用三个或更多的消费者试试这个。

    消息确认

    完成任务可能需要几秒钟。你可能会想知道,如果一个消费者开始一项长期任务却只完成了一部分,会发生什么。在我们当前的代码中,一旦RabbitMQ向消费者传递了一条消息,它就会立即将其标记为删除。在这种情况下,如果您杀掉一个消费者,我们将丢失它正在处理的消息。我们还将丢失已发送给这个线程但尚未处理(接受)的所有消息。

    但我们不想失去任何任务(消息)。如果一个消费者死了,我们希望把任务交给另一个消费者。

    为了确保消息不会丢失,RabbitMQ支持消息确认。使用者会发回一个确认,告诉RabbitMQ已经收到、处理了特定的消息,并且RabbitMQ可以随意删除它。

    如果一个使用者在没有发送ack的情况下死亡(其通道关闭、连接关闭或TCP连接丢失),RabbitMQ将认为消息未被完全处理,并将其重新放入队列。如果有其他消费者同时在线,它会很快将其重新发送给另一个消费者。这样你就可以确保没有信息丢失,即使消费者偶尔死亡。

    不会有任何消息超时;RabbitMQ将在消费者死亡时重新传递消息。即使处理一条消息需要很长时间也没关系。

    默认情况下,手动消息确认处于启用状态。在前面的例子中,我们通过autoAck=true标志显式地关闭了它们。一旦我们完成了一个任务,现在是时候将这个标志设置为false并从消费者那里发送一个正确的确认。

    channel.basicQos(1); // accept only one unack-ed message at a time (see below) DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; boolean autoAck = false; channel.basicConsume(TASK_QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });

    使用这段代码,我们可以确保即使您在处理消息时使用CTRL+C杀死一个worker(消费者),也不会丢失任何内容。在worker死后不久,所有未确认的消息都将被重新发送。

    确认必须在接收传递的同一通道上发送。尝试使用不同的通道进行确认将导致通道级协议异常。有关更多信息,请参阅文档确认指南。

    忘记确认 忘记basicAck是一个常见的错误。这是一个简单的错误,但后果是严重的。当您的消费者客户端退出时,消息将被重新传递(这看起来像随机重新传递),但是RabbitMQ将消耗越来越多的内存,因为它无法释放任何未确认的消息。 为了调试这种错误,可以使用rabbitmqctl打印messages_unacknowledged字段:

    sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged 在windows上,删除sudorabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

    消息持久化

    我们已经学会了如何确保即使消费者死亡,任务(消息)也不会丢失。但是如果RabbitMQ服务器停止运行,我们的任务仍然会丢失。

    当RabbitMQ退出或崩溃时,它将忘记队列和消息,除非您告诉它不要这样做。要确保消息不会丢失,需要做两件事:我们需要将队列和消息都标记为持久的。

    首先,我们需要确保队列能够在RabbitMQ节点重新启动后继续运行。为此,我们需要声明它是持久的:

    boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);

    虽然这个命令本身是正确的,但它在我们当前的设置中不起作用。这是因为我们已经定义了一个名为hello的队列,它不是持久的。RabbitMQ不允许您使用不同的参数重新定义现有队列,并将向任何尝试重新定义的程序返回错误。但是有一个快速的解决方法-让我们声明一个具有不同名称的队列,例如task_queue:

    boolean durable = true; channel.queueDeclare("task_queue", durable, false, false, null);

    此queueDeclare更改需要同时应用于生产者代码和使用者代码。 这样我们确保即使RabbitMQ重新启动,任务队列队列也不会丢失。现在我们需要将消息标记为persistent持久化的,方法是将MessageProperties(实现BasicProperties)设置为PERSISTENT_TEXT_PLAIN值。

    import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());

    关于消息持久性的说明 将消息标记为持久性并不能完全保证消息不会丢失。尽管它告诉RabbitMQ将消息保存到磁盘上,但是RabbitMQ接受了一条消息并且还没有保存它时,仍然有一个很短的时间窗口。而且,RabbitMQ并不是对每个消息都执行fsync(2)——它可能只是保存到缓存中,而不是真正写入磁盘。对于我们的任务来说,持久化的保证是不够的。如果您需要更强有力的担保,那么您可以使用publisher confirms。

    公平分发

    你可能已经注意到调度仍然不能完全按照我们的要求工作。例如,在一个有两个消费者的情况下,当所有的奇数消息都是重消息而偶数消息都是轻消息时,一个消费者将持续忙碌,而另一个消费者几乎不做任何工作。嗯,RabbitMQ对此一无所知,仍然会均匀地发送消息。

    这是因为RabbitMQ只是在消息进入队列时发送消息。它不考虑消费者未确认消息的数量。它只是盲目地将第n条消息发送给第n个消费者。 为了避免这种情况,我们可以使用basicQos方法,并设置prefetchCount=1。这告诉RabbitMQ不要一次向一个消费者发出多个消息。或者,换句话说,在处理并确认前一条消息之前,不要向消费者发送新消息。相反,它将把它发送给下一个不忙的消费者。

    int prefetchCount = 1; channel.basicQos(prefetchCount);

    关于队列大小的说明 如果所有的消费者都很忙,你的队列可能会满的。你会想继续关注这一点,也许会增加更多的消费者,或者有一些其他的策略。

    把它们放在一起

    我们的最终代码NewTask.java类:

    import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = String.join(" ", argv); channel.basicPublish("", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }

    消费者

    import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); final Connection connection = factory.newConnection(); final Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { doWork(message); } finally { System.out.println(" [x] Done"); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } private static void doWork(String task) { for (char ch : task.toCharArray()) { if (ch == '.') { try { Thread.sleep(1000); } catch (InterruptedException _ignored) { Thread.currentThread().interrupt(); } } } } }

    可以使用消息确认和预取数设置工作队列。即使RabbitMQ重新启动,持久性选项也可以让任务继续存在。

    有关通道方法和消息属性的更多信息,您可以在线浏览JavaDocs。

    现在我们可以继续学习教程3,学习如何向许多消费者传递同样的信息。

    Processed: 0.035, SQL: 9