Java实现Redis发布订阅消息通信模式

Java实现Redis发布订阅消息通信模式

环境

阿里云ECS
CentOS Linux release 8.1.1911 (Core)
Redis server v=6.0.9
java version "12.0.1" 2019-04-16
IntelliJ IDEA 2020.2.3 (Ultimate Edition)

实现代码

Main

src/com/demo/redis/Main.java

其中,<hostname>为Redis数据库主机名(IP地址),本机为127.0.0.1<password>为Redis登录密码,Redis数据库默认端口为6379

package com.demo.redis;import redis.clients.jedis.Jedis;public class Main {private static final String HOST_NAME = "<hostname>";private static final String PASSWORD = "<password>";private static final int PORT = 6379;// default portpublic static void main(String[] args) {Jedis subscriberJedis = new Jedis(HOST_NAME, PORT);subscriberJedis.auth(PASSWORD);Jedis publisherJedis = new Jedis(HOST_NAME, PORT);publisherJedis.auth(PASSWORD);String channel = "channel";SubscriberThread subscriberThread = new SubscriberThread(subscriberJedis, channel);PublisherThread publisherThread = new PublisherThread(publisherJedis, channel);subscriberThread.start();publisherThread.start();}
}

Subscriber

src/com/demo/redis/Subscriber.java

package com.demo.redis;import redis.clients.jedis.JedisPubSub;import java.text.SimpleDateFormat;
import java.util.Date;public class Subscriber extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.printf("%s: from channel \"%s\", message: %s\n", simpleDateFormat.format(new Date()), channel, message);}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {System.out.printf("Subscribe channel %s succeed, subscribed channels %d\n", channel, subscribedChannels);}@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {System.out.printf("Unsubscribe %s succeed, subscribedChannels %d\n", channel, subscribedChannels);}
}

PublisherThread

src/com/demo/redis/PublisherThread.java

package com.demo.redis;import redis.clients.jedis.Jedis;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class PublisherThread extends Thread {private Jedis jedis;private String channel;PublisherThread(Jedis jedis, String channel) {this.jedis = jedis;this.channel = channel;}@Overridepublic void run() {try {while (true) {BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));jedis.publish(channel, bufferedReader.readLine());}} catch (IOException e) {e.printStackTrace();}}
}

SubscriberThread

src/com/demo/redis/SubscriberThread.java

package com.demo.redis;import redis.clients.jedis.Jedis;public class SubscriberThread extends Thread {private Jedis jedis;private String channel;public SubscriberThread(Jedis jedis, String channel) {this.jedis = jedis;this.channel = channel;}@Overridepublic void run() {Subscriber subscriber = new Subscriber();jedis.subscribe(subscriber, channel);}
}

测试结果

最后

  • 由于博主水平有限,不免有疏漏之处,欢迎读者随时批评指正,以免造成不必要的误解!

Java实现Redis发布订阅消息通信模式

Java实现Redis发布订阅消息通信模式

环境

阿里云ECS
CentOS Linux release 8.1.1911 (Core)
Redis server v=6.0.9
java version "12.0.1" 2019-04-16
IntelliJ IDEA 2020.2.3 (Ultimate Edition)

实现代码

Main

src/com/demo/redis/Main.java

其中,<hostname>为Redis数据库主机名(IP地址),本机为127.0.0.1<password>为Redis登录密码,Redis数据库默认端口为6379

package com.demo.redis;import redis.clients.jedis.Jedis;public class Main {private static final String HOST_NAME = "<hostname>";private static final String PASSWORD = "<password>";private static final int PORT = 6379;// default portpublic static void main(String[] args) {Jedis subscriberJedis = new Jedis(HOST_NAME, PORT);subscriberJedis.auth(PASSWORD);Jedis publisherJedis = new Jedis(HOST_NAME, PORT);publisherJedis.auth(PASSWORD);String channel = "channel";SubscriberThread subscriberThread = new SubscriberThread(subscriberJedis, channel);PublisherThread publisherThread = new PublisherThread(publisherJedis, channel);subscriberThread.start();publisherThread.start();}
}

Subscriber

src/com/demo/redis/Subscriber.java

package com.demo.redis;import redis.clients.jedis.JedisPubSub;import java.text.SimpleDateFormat;
import java.util.Date;public class Subscriber extends JedisPubSub {@Overridepublic void onMessage(String channel, String message) {SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");System.out.printf("%s: from channel \"%s\", message: %s\n", simpleDateFormat.format(new Date()), channel, message);}@Overridepublic void onSubscribe(String channel, int subscribedChannels) {System.out.printf("Subscribe channel %s succeed, subscribed channels %d\n", channel, subscribedChannels);}@Overridepublic void onUnsubscribe(String channel, int subscribedChannels) {System.out.printf("Unsubscribe %s succeed, subscribedChannels %d\n", channel, subscribedChannels);}
}

PublisherThread

src/com/demo/redis/PublisherThread.java

package com.demo.redis;import redis.clients.jedis.Jedis;import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;public class PublisherThread extends Thread {private Jedis jedis;private String channel;PublisherThread(Jedis jedis, String channel) {this.jedis = jedis;this.channel = channel;}@Overridepublic void run() {try {while (true) {BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(System.in));jedis.publish(channel, bufferedReader.readLine());}} catch (IOException e) {e.printStackTrace();}}
}

SubscriberThread

src/com/demo/redis/SubscriberThread.java

package com.demo.redis;import redis.clients.jedis.Jedis;public class SubscriberThread extends Thread {private Jedis jedis;private String channel;public SubscriberThread(Jedis jedis, String channel) {this.jedis = jedis;this.channel = channel;}@Overridepublic void run() {Subscriber subscriber = new Subscriber();jedis.subscribe(subscriber, channel);}
}

测试结果

最后

  • 由于博主水平有限,不免有疏漏之处,欢迎读者随时批评指正,以免造成不必要的误解!