博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
WebFlux基础之响应式编程
阅读量:6026 次
发布时间:2019-06-20

本文共 16329 字,大约阅读时间需要 54 分钟。

  上篇文章,我们简单的了解了WebFlux的一些基础与背景,并通过示例来写了一个demo。我们知道WebFlux是响应式的web框架,其特点之一就是可以通过函数式编程方式配置route。另外究竟什么是响应式编程呢?这篇文章我们就简单探讨一下

一、Java8中的函数式编程

  百科中这样定义函数式编程:

  函数式编程是种编程方式,它将电脑运算视为函数的计算。函数编程语言最重要的基础是λ演算(lambda calculus),而且λ演算的函数可以接受函数当作输入(参数)和输出(返回值)。那么在Java8里怎么样来实现它呢?

示例一

在这里我先自己写一个例子

定义接口:

package com.bdqn.lyrk.basic.java;/** * 函数式接口 * * @author chen.nie * @date 2018/7/18 **/@FunctionalInterfacepublic interface OperateNumberFunctions {    void operate(Integer number);    default void print() {            }}

     

  在定义的接口上添加@FunctionalInterface表明其是函数式接口,这个注解用于检测函数式接口规范,定义函数式接口时该接口内必须有且只有一个抽象的方法。

定义类:

package com.bdqn.lyrk.basic.java;import java.util.Optional;import java.util.function.Predicate;/** * 定义函数式编程类 */public class NumberFunctions {    private Integer number;    private NumberFunctions() {    }    private static NumberFunctions numberFunctions = new NumberFunctions();    public static NumberFunctions of(Integer number) {        numberFunctions.number = number;        return numberFunctions;    }    public NumberFunctions add(Integer number) {        numberFunctions.number += number;        return numberFunctions;    }    public NumberFunctions subtraction(Integer number) {        numberFunctions.number -= number;        return numberFunctions;    }    public Optional
filter(Predicate
predicate) { if (predicate.test(this.number)) return Optional.of(numberFunctions); return Optional.ofNullable(new NumberFunctions()); } public void operate(OperateNumberFunctions functions) { functions.operate(this.number); }}

 

  在这里定义类进行简单的运算与过滤条件。那么在Main方法里可以这么写:

package com.bdqn.lyrk.basic.java;public class Main {    public static void main(String[] args) {        NumberFunctions.of(10).add(30).subtraction(2).filter(number -> number>20).get().operate(System.out::println);    }}

  那么输出结果为38

示例二

  在Java8里有一个类叫Stream。Stream是数据流的意思,这个类略微有点像Reactor中Flux,它提供了类似于操作符的功能,我们来看一个例子:

Main方法

package com.bdqn.lyrk.basic.java;import java.util.stream.Stream;import static java.util.stream.Collectors.toList;public class Main {    public static void main(String[] args) {        /*            在这里先将Stream里的内容做乘2的操作            然后在进行倒序排序            紧接着过滤出是4的倍数的数字            然后转换成集合在打印         */        Stream.of(15, 26, 34, 455, 5, 6).map(number -> number * 2).sorted((num1, num2) -> num2 - num1).filter(integer -> integer % 4 == 0).collect(toList()).forEach(System.out::println);    }}

  运行得到的结果:

685212

关于::操作符

  该操作符是lambda表达式的更特殊写法,使用此操作符可以简化函数式接口的实现,这个方法至少满足以下特定条件:

  1)方法返回值与函数式接口相同

  2)方法参数与函数式接口相同

  举例说明

package java.util.function;/** * Represents a supplier of results. * * 

There is no requirement that a new or distinct result be returned each * time the supplier is invoked. * *

This is a functional interface * whose functional method is {

@link #get()}. * * @param
the type of results supplied by this supplier * * @since 1.8 */@FunctionalInterfacepublic interface Supplier
{ /** * Gets a result. * * @return a result */ T get();}

  java中Runnable接口:

@FunctionalInterfacepublic interface Runnable {    /**     * When an object implementing interface Runnable is used     * to create a thread, starting the thread causes the object's     * run method to be called in that separately executing     * thread.     * 

* The general contract of the method run is that it may * take any action whatsoever. * * @see java.lang.Thread#run() */ public abstract void run();}

  java中的Predicate接口:

package java.util.function;import java.util.Objects;/** * Represents a predicate (boolean-valued function) of one argument. * * 

This is a functional interface * whose functional method is {

@link #test(Object)}. * * @param
the type of the input to the predicate * * @since 1.8 */@FunctionalInterfacepublic interface Predicate
{ /** * Evaluates this predicate on the given argument. * * @param t the input argument * @return {
@code true} if the input argument matches the predicate, * otherwise {
@code false} */ boolean test(T t); /** * Returns a composed predicate that represents a short-circuiting logical * AND of this predicate and another. When evaluating the composed * predicate, if this predicate is {
@code false}, then the {
@code other} * predicate is not evaluated. * *

Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {

@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ANDed with this * predicate * @return a composed predicate that represents the short-circuiting logical * AND of this predicate and the {
@code other} predicate * @throws NullPointerException if other is null */ default Predicate
and(Predicate
other) { Objects.requireNonNull(other); return (t) -> test(t) && other.test(t); } /** * Returns a predicate that represents the logical negation of this * predicate. * * @return a predicate that represents the logical negation of this * predicate */ default Predicate
negate() { return (t) -> !test(t); } /** * Returns a composed predicate that represents a short-circuiting logical * OR of this predicate and another. When evaluating the composed * predicate, if this predicate is {
@code true}, then the {
@code other} * predicate is not evaluated. * *

Any exceptions thrown during evaluation of either predicate are relayed * to the caller; if evaluation of this predicate throws an exception, the * {

@code other} predicate will not be evaluated. * * @param other a predicate that will be logically-ORed with this * predicate * @return a composed predicate that represents the short-circuiting logical * OR of this predicate and the {
@code other} predicate * @throws NullPointerException if other is null */ default Predicate
or(Predicate
other) { Objects.requireNonNull(other); return (t) -> test(t) || other.test(t); } /** * Returns a predicate that tests if two arguments are equal according * to {
@link Objects#equals(Object, Object)}. * * @param
the type of arguments to the predicate * @param targetRef the object reference with which to compare for equality, * which may be {
@code null} * @return a predicate that tests if two arguments are equal according * to {
@link Objects#equals(Object, Object)} */ static
Predicate
isEqual(Object targetRef) { return (null == targetRef) ? Objects::isNull : object -> targetRef.equals(object); }}

那么上述的接口分别可以使用如下写法,注意实现该接口的方法特点

package com.bdqn.lyrk.basic.java;import java.util.function.Predicate;import java.util.function.Supplier;public class Main {    private static int i;    public static void main(String[] args) {        /*            创建对象的方式         */        Supplier supplier = Object::new;        /*            调用方法的方式(无参数)         */        Runnable runnable = Main::add;        /*            调用方法的方式(有参数)         */        Predicate
predicate = Main::filter; } public static void add() { i++; System.out.println("test" + i); } public static boolean filter(String test) { return test != null; }}

我们可以看到使用函数式编程借助于lambda表达式,使得代码更简洁清爽 

 

二、Java中的响应式编程

  关于响应式编程,百度百科是这么定义的:

  简称RP(Reactive Programming)

  响应式编程是一种面向数据流和变化传播的编程范式。这意味着可以在编程语言中很方便地表达静态或动态的数据流,而相关的计算模型会自动将变化的值通过数据流进行传播。
  在这里有两个关键词:
数据流与变化传播。下面我们来通过代码来演示下响应式编程是怎么回事

 Java8及以前版本

  最典型的示例就是,JDK提供的观察者模式类Observer与Observalbe:

package com.hzgj.lyrk.demo;import java.util.Observable;public class ObserverDemo extends Observable {    public static void main(String[] args) {        ObserverDemo observable = new ObserverDemo();        observable.addObserver((o, arg) -> {            System.out.println("发生变化");        });        observable.addObserver((o, arg) -> {            System.out.println("收到被观察者通知,准备改变");        });        observable.setChanged();        observable.notifyObservers();    }}

 

  在上述代码示例中观察者并没有及时执行,而是在接受到被观察者发送信号的时候才有了“响应”。其中setChanged()与notifyObservers方法就对应响应式编程中定义的关键词--变化与传播。还有一个典型的示例就是Swing中的事件机制,有兴趣的朋友可以下去查阅相关资料,在这里就不再进行阐述。

 Java9及其后版本

  从java9开始,Observer与Observable已经被标记为过时的类了,取而代之的是Flow类。Flow才是真正意义上的响应式编程类,因为观察者Observer与Observable虽然能够响应,但是在数据流的体现并不是特别突出。Flow这个类,我们可以先看一下:

  

public final class Flow {    private Flow() {} // uninstantiable    /**     * A producer of items (and related control messages) received by     * Subscribers.  Each current {
@link Subscriber} receives the same * items (via method {
@code onNext}) in the same order, unless * drops or errors are encountered. If a Publisher encounters an * error that does not allow items to be issued to a Subscriber, * that Subscriber receives {
@code onError}, and then receives no * further messages. Otherwise, when it is known that no further * messages will be issued to it, a subscriber receives {
@code * onComplete}. Publishers ensure that Subscriber method * invocations for each subscription are strictly ordered in happens-before * order. * *

Publishers may vary in policy about whether drops (failures * to issue an item because of resource limitations) are treated * as unrecoverable errors. Publishers may also vary about * whether Subscribers receive items that were produced or * available before they subscribed. * * @param

the published item type */ @FunctionalInterface public static interface Publisher
{ /** * Adds the given Subscriber if possible. If already * subscribed, or the attempt to subscribe fails due to policy * violations or errors, the Subscriber's {
@code onError} * method is invoked with an {
@link IllegalStateException}. * Otherwise, the Subscriber's {
@code onSubscribe} method is * invoked with a new {
@link Subscription}. Subscribers may * enable receiving items by invoking the {
@code request} * method of this Subscription, and may unsubscribe by * invoking its {
@code cancel} method. * * @param subscriber the subscriber * @throws NullPointerException if subscriber is null */ public void subscribe(Subscriber
subscriber); } /** * A receiver of messages. The methods in this interface are * invoked in strict sequential order for each {
@link * Subscription}. * * @param
the subscribed item type */ public static interface Subscriber
{ /** * Method invoked prior to invoking any other Subscriber * methods for the given Subscription. If this method throws * an exception, resulting behavior is not guaranteed, but may * cause the Subscription not to be established or to be cancelled. * *

Typically, implementations of this method invoke {

@code * subscription.request} to enable receiving items. * * @param subscription a new subscription */ public void onSubscribe(Subscription subscription); /** * Method invoked with a Subscription's next item. If this * method throws an exception, resulting behavior is not * guaranteed, but may cause the Subscription to be cancelled. * * @param item the item */ public void onNext(T item); /** * Method invoked upon an unrecoverable error encountered by a * Publisher or Subscription, after which no other Subscriber * methods are invoked by the Subscription. If this method * itself throws an exception, resulting behavior is * undefined. * * @param throwable the exception */ public void onError(Throwable throwable); /** * Method invoked when it is known that no additional * Subscriber method invocations will occur for a Subscription * that is not already terminated by error, after which no * other Subscriber methods are invoked by the Subscription. * If this method throws an exception, resulting behavior is * undefined. */ public void onComplete(); } /** * Message control linking a {
@link Publisher} and {
@link * Subscriber}. Subscribers receive items only when requested, * and may cancel at any time. The methods in this interface are * intended to be invoked only by their Subscribers; usages in * other contexts have undefined effects. */ public static interface Subscription { /** * Adds the given number {
@code n} of items to the current * unfulfilled demand for this subscription. If {
@code n} is * less than or equal to zero, the Subscriber will receive an * {
@code onError} signal with an {
@link * IllegalArgumentException} argument. Otherwise, the * Subscriber will receive up to {
@code n} additional {
@code * onNext} invocations (or fewer if terminated). * * @param n the increment of demand; a value of {
@code * Long.MAX_VALUE} may be considered as effectively unbounded */ public void request(long n); /** * Causes the Subscriber to (eventually) stop receiving * messages. Implementation is best-effort -- additional * messages may be received after invoking this method. * A cancelled subscription need not ever receive an * {
@code onComplete} or {
@code onError} signal. */ public void cancel(); } /** * A component that acts as both a Subscriber and Publisher. * * @param
the subscribed item type * @param
the published item type */ public static interface Processor
extends Subscriber
, Publisher
{ } static final int DEFAULT_BUFFER_SIZE = 256; /** * Returns a default value for Publisher or Subscriber buffering, * that may be used in the absence of other constraints. * * @implNote * The current value returned is 256. * * @return the buffer size value */ public static int defaultBufferSize() { return DEFAULT_BUFFER_SIZE; }}

  Flow这个类里定义最基本的Publisher与Subscribe,该模式就是发布订阅模式。我们来看一下代码示例:

package com.hzgj.lyrk.demo;import java.util.concurrent.Flow;public class Main {    public static void main(String[] args) {        Flow.Publisher
publisher = subscriber -> { subscriber.onNext("1"); // 1 subscriber.onNext("2"); subscriber.onError(new RuntimeException("出错")); // 2 // subscriber.onComplete(); }; publisher.subscribe(new Flow.Subscriber<>() { @Override public void onSubscribe(Flow.Subscription subscription) { subscription.cancel(); } @Override public void onNext(String item) { System.out.println(item); } @Override public void onError(Throwable throwable) { System.out.println("出错了"); } @Override public void onComplete() { System.out.println("publish complete"); } }); }}

   代码1 是一种数据流的体现,在Publisher中每次调用onNext的时候,在中都会在Subscribe的onNext方法进行消费

   代码2 同样是发送错误信号,等待订阅者进行消费

   运行结果:

12出错了

  在上述代码中我们可以发现:Publisher在没有被订阅的时候,是不会触发任何行为的。每次调用Publisher的onNext方法的时候都像是在发信号,订阅者收到信号时执行相关内容,这就是典型的响应式编程的案例。不过java9提供的这个功能对异步的支持不太好,也不够强大。因此才会出现Reactor与RxJava等响应式框架

转载于:https://www.cnblogs.com/niechen/p/9329191.html

你可能感兴趣的文章
linux上架设l2tp+ipsec ***服务器
查看>>
curl指令的使用
查看>>
LNAMP第二版(nginx 1.2.0+apache 2.4.2+php 5.4)
查看>>
基于用户投票的排名算法(二):Reddit
查看>>
css3中变形与动画(一)
查看>>
[XMove-自主设计的体感解决方案] 系统综述
查看>>
变更 Linux、Ubuntu 时区、时间
查看>>
[共通]手机端网页开发问题及解决方法整理
查看>>
思科分发列表过滤路由(RIP)动态路由协议篇
查看>>
可登录的用户数量是1.6万个,软件的性能得到充分的考验
查看>>
[实战]MVC5+EF6+MySql企业网盘实战(23)——文档列表
查看>>
[译] ES2018(ES9)的新特性
查看>>
Java生成-zipf分布的数据集(自定义倾斜度,用作spark data skew测试)
查看>>
正则与sed,grep,awk三剑客
查看>>
诊断一句SQL不走索引的原因
查看>>
Linux pipe函数
查看>>
图片标注工具LabelImg使用教程
查看>>
(原創) 如何設計一個數位相框? (SOC) (Quartus II) (SOPC Builder) (Nios II) (TRDB-LTM) (DE2-70)...
查看>>
/etc/profile文件内容
查看>>
量词 匹配优先与忽略优先
查看>>