I/O Module

Stateful Functions’ I/O modules allow functions to receive and send messages to external systems. Based on the concept of Ingress (input) and Egress (output) points, and built on top of the Apache Flink® connector ecosystem, I/O modules enable functions to interact with the outside world through the style of message passing.

Ingress

An Ingress is an input point where data is consumed from an external system and forwarded to zero or more functions. It is defined via an IngressIdentifier and an IngressSpec.

An ingress identifier, similar to a function type, uniquely identifies an ingress by specifying its input type, a namespace, and a name.

The spec defines the details of how to connect to the external system, which is specific to each individual I/O module. Each identifier-spec pair is bound to the system inside an stateful function module.

package org.apache.flink.statefun.docs.io.ingress;

import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;

public final class Identifiers {

    public static final IngressIdentifier<User> INGRESS =
        new IngressIdentifier<>(User.class, "example", "user-ingress");
}
package org.apache.flink.statefun.docs.io.ingress;

import java.util.Map;
import org.apache.flink.statefun.docs.io.MissingImplementationException;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithIngress implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        IngressSpec<User> spec = createIngress(Identifiers.INGRESS);
        binder.bindIngress(spec);
    }

    private IngressSpec<User> createIngress(IngressIdentifier<User> identifier) {
        throw new MissingImplementationException("Replace with your specific ingress");
    }
}
version: "1.0"

module:
     meta:
         type: remote
     spec:
         ingresses:
           - ingress:
               meta:
                 id: example/user-ingress
                 type: # ingress type
               spec: # ingress specific configurations

Router

A router is a stateless operator that takes each record from an ingress and routes it to zero or more functions. Routers are bound to the system via a stateful function module, and unlike other components, an ingress may have any number of routers.

package org.apache.flink.statefun.docs.io.ingress;

import org.apache.flink.statefun.docs.FnUser;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.Router;

public class UserRouter implements Router<User> {

    @Override
    public void route(User message, Downstream<User> downstream) {
        downstream.forward(FnUser.TYPE, message.getUserId(), message);
    }
}
package org.apache.flink.statefun.docs.io.ingress;

import java.util.Map;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.IngressIdentifier;
import org.apache.flink.statefun.sdk.io.IngressSpec;
import org.apache.flink.statefun.sdk.io.Router;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithRouter implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        IngressSpec<User> spec = createIngressSpec(Identifiers.INGRESS);
        Router<User> router = new UserRouter();

        binder.bindIngress(spec);
        binder.bindIngressRouter(Identifiers.INGRESS, router);
    }

    private IngressSpec<User> createIngressSpec(IngressIdentifier<User> identifier) {
        throw new MissingImplementationException("Replace with your specific ingress");
    }
}

When defined in yaml, routers are defined by a list of function types. The id component of the address is pulled from the key associated with each record in its underlying source implementation.

targets:
    - example-namespace/my-function-1
    - example-namespace/my-function-2

Egress

Egress is the opposite of ingress; it is a point that takes messages and writes them to external systems. Each egress is defined using two components, an EgressIdentifier and an EgressSpec.

An egress identifier uniquely identifies an egress based on a namespace, name, and producing type. An egress spec defines the details of how to connect to the external system, the details are specific to each individual I/O module. Each identifier-spec pair are bound to the system inside a stateful function module.

package org.apache.flink.statefun.docs.io.egress;

import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;

public final class Identifiers {

    public static final EgressIdentifier<User> EGRESS =
        new EgressIdentifier<>("example", "egress", User.class);
}
package org.apache.flink.statefun.docs.io.egress;

import java.util.Map;
import org.apache.flink.statefun.docs.io.MissingImplementationException;
import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.io.EgressSpec;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;

public class ModuleWithEgress implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        EgressSpec<User> spec = createEgress(Identifiers.EGRESS);
        binder.bindEgress(spec);
    }

    public EgressSpec<User> createEgress(EgressIdentifier<User> identifier) {
        throw new MissingImplementationException("Replace with your specific egress");
    }
}
version: "1.0"

module:
    meta:
        type: remote
    spec:
        egresses:
          - egress:
              meta:
                id: example/user-egress
                type: # egress type
              spec: # egress specific configurations

Stateful functions may then message an egress the same way they message another function, passing the egress identifier as function type.

package org.apache.flink.statefun.docs.io.egress;

import org.apache.flink.statefun.docs.models.User;
import org.apache.flink.statefun.sdk.Context;
import org.apache.flink.statefun.sdk.StatefulFunction;

/** A simple function that outputs messages to an egress. */
public class FnOutputting implements StatefulFunction {

    @Override
    public void invoke(Context context, Object input) {
        context.send(Identifiers.EGRESS, new User());
    }
}