Java消息中间件使用简单示例

涎涎原创约 710 字大约 2 分钟...JavaJava

Java消息中间件使用简单示例

注意

本博文仅供学术研究和交流参考,严禁将其用于商业用途。如因违规使用产生的任何法律问题,使用者需自行负责。

Java 中常见的消息中间件有 Kafka 和 RabbitMQ。以下是它们的简单使用示例:

  1. Kafka 示例:

    首先,确保已经安装和启动了 Kafka。然后,使用 Kafka 的 Java 客户端库进行消息的生产和消费。

    生产者示例:

    import org.apache.kafka.clients.producer.*;
    
    public class KafkaProducerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
            Producer<String, String> producer = new KafkaProducer<>(props);
            String topic = "my-topic";
            String message = "Hello, Kafka!";
    
            ProducerRecord<String, String> record = new ProducerRecord<>(topic, message);
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {
                    if (exception != null) {
                        System.err.println("Failed to send message: " + exception.getMessage());
                    } else {
                        System.out.println("Message sent successfully! Topic: " + metadata.topic() + ", Partition: " + metadata.partition() + ", Offset: " + metadata.offset());
                    }
                }
            });
    
            producer.close();
        }
    }
    

    消费者示例:

    import org.apache.kafka.clients.consumer.*;
    import java.util.Collections;
    
    public class KafkaConsumerExample {
        public static void main(String[] args) {
            Properties props = new Properties();
            props.put("bootstrap.servers", "localhost:9092");
            props.put("group.id", "my-consumer-group");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            String topic = "my-topic";
    
            consumer.subscribe(Collections.singletonList(topic));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("Received message: " + record.value());
                }
            }
    
            consumer.close();
        }
    }
    

    在这个示例中,生产者使用 Kafka 的 Java 客户端库发送消息到指定的主题(topic),消费者则通过订阅主题并持续地拉取消息进行消费。

  2. RabbitMQ 示例:

    首先,确保已经安装和启动了 RabbitMQ。然后,使用 RabbitMQ 的 Java 客户端库进行消息的生产和消费。

    生产者示例:

    import com.rabbitmq.client.*;
    
    public class RabbitMQProducerExample {
        private final static String QUEUE_NAME = "my-queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello, RabbitMQ!";
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("Message sent successfully: " + message);
            }
        }
    }
    

    消费者示例:

    import com.rabbitmq.client.*;
    
    public class RabbitMQConsumerExample {
        private final static String QUEUE_NAME = "my-queue";
    
        public static void main(String[] args) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody());
                    System.out.println("Received message: " + message);
                };
                channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});
                System.out.println("Waiting for messages...");
                Thread.sleep(5000); // 持续等待消息的到达
            }
        }
    }
    

在这个示例中,生产者使用 RabbitMQ 的 Java 客户端库将消息发送到指定的队列(queue), 消费者则通过消费队列中的消息进行消费。

这些示例展示了使用 Kafka 和 RabbitMQ 进行消息发送和消费的简单用法。实际应用中, 根据具体需求和场景,可能需要更多的配置和处理逻辑。


分割线


相关信息

以上就是我关于 Java消息中间件使用简单示例 知识点的整理与总结的全部内容,希望对你有帮助。。。。。。。

上次编辑于:
贡献者: 涎涎
评论
  • 按正序
  • 按倒序
  • 按热度
Powered by Waline v2.15.4