Apache Flink offers a DataStream API for building robust, stateful streaming applications.
It provides fine-grained control over state and time, which allows for the implementation of advanced event-driven systems.
In this step-by-step guide you’ll learn how to build a stateful streaming application with Flink’s DataStream API.
Credit card fraud is a growing concern in the digital age.
Criminals steal credit card numbers by running scams or hacking into insecure systems.
Stolen numbers are tested by making one or more small purchases, often for a dollar or less.
If that works, they then make more significant purchases to get items they can sell or keep for themselves.
In this tutorial, you will build a fraud detection system for alerting on suspicious credit card transactions.
Using a simple set of rules, you will see how Flink allows us to implement advanced business logic and act in real-time.
This walkthrough assumes that you have some familiarity with Java or Scala, but you should be able to follow along even if you are coming from a different programming language.
If you want to follow along, you will require a computer with:
A provided Flink Maven Archetype will create a skeleton project with all the necessary dependencies quickly, so you only need to focus on filling out the business logic.
These dependencies include flink-streaming-java which is the core dependency for all Flink streaming applications and flink-walkthrough-common that has data generators and other classes specific to this walkthrough.
Note: Each code block within this walkthrough may not contain the full surrounding class for brevity. The full code is available at the bottom of the page.
Note: For Maven 3.0 or higher, it is no longer possible to specify the repository (-DarchetypeCatalog) via the commandline. If you wish to use the snapshot repository, you need to add a repository entry to your settings.xml. For details about this change, please refer to Maven official document
You can edit the groupId, artifactId and package if you like. With the above parameters,
Maven will create a folder named frauddetection that contains a project with all the dependencies to complete this tutorial.
After importing the project into your editor, you can find a file FraudDetectionJob.java (or FraudDetectionJob.scala) with the following code which you can run directly inside your IDE.
Try setting break points through out the data stream and run the code in DEBUG mode to get a feeling for how everything works.
Breaking Down the Code
Let’s walk step-by-step through the code of these two files. The FraudDetectionJob class defines the data flow of the application and the FraudDetector class defines the business logic of the function that detects fraudulent transactions.
We start describing how the Job is assembled in the main method of the FraudDetectionJob class.
The Execution Environment
The first line sets up your StreamExecutionEnvironment.
The execution environment is how you set properties for your Job, create your sources, and finally trigger the execution of the Job.
Creating a Source
Sources ingest data from external systems, such as Apache Kafka, Rabbit MQ, or Apache Pulsar, into Flink Jobs.
This walkthrough uses a source that generates an infinite stream of credit card transactions for you to process.
Each transaction contains an account ID (accountId), timestamp (timestamp) of when the transaction occurred, and US$ amount (amount).
The name attached to the source is just for debugging purposes, so if something goes wrong, we will know where the error originated.
Partitioning Events & Detecting Fraud
The transactions stream contains a lot of transactions from a large number of users, such that it needs to be processed in parallel my multiple fraud detection tasks. Since fraud occurs on a per-account basis, you must ensure that all transactions for the same account are processed by the same parallel task of the fraud detector operator.
To ensure that the same physical task processes all records for a particular key, you can partition a stream using DataStream#keyBy.
The process() call adds an operator that applies a function to each partitioned element in the stream.
It is common to say the operator immediately after a keyBy, in this case FraudDetector, is executed within a keyed context.
A sink writes a DataStream to an external system; such as Apache Kafka, Cassandra, and AWS Kinesis.
The AlertSink logs each Alert record with log level INFO, instead of writing it to persistent storage, so you can easily see your results.
Executing the Job
Flink applications are built lazily and shipped to the cluster for execution only once fully formed.
Call StreamExecutionEnvironment#execute to begin the execution of our Job and give it a name.
The Fraud Detector
The fraud detector is implemented as a KeyedProcessFunction.
Its method KeyedProcessFunction#processElement is called for every transaction event.
This first version produces an alert on every transaction, which some may say is overly conservative.
The next steps of this tutorial will guide you to expand the fraud detector with more meaningful business logic.
Writing a Real Application (v1)
For the first version, the fraud detector should output an alert for any account that makes a small transaction immediately followed by a large one. Where small is anything less than $1.00 and large is more than $500.
Imagine your fraud detector processes the following stream of transactions for a particular account.
Transactions 3 and 4 should be marked as fraudulent because it is a small transaction, $0.09, followed by a large one, $510.
Alternatively, transactions 7, 8, and 9 are not fraud because the small amount of $0.02 is not immediately followed by the large one; instead, there is an intermediate transaction that breaks the pattern.
To do this, the fraud detector must remember information across events; a large transaction is only fraudulent if the previous one was small.
Remembering information across events requires state, and that is why we decided to use a KeyedProcessFunction.
It provides fine-grained control over both state and time, which will allow us to evolve our algorithm with more complex requirements throughout this walkthrough.
The most straightforward implementation is a boolean flag that is set whenever a small transaction is processed.
When a large transaction comes through, you can simply check if the flag is set for that account.
However, merely implementing the flag as a member variable in the FraudDetector class will not work.
Flink processes the transactions of multiple accounts with the same object instance of FraudDetector, which means if accounts A and B are routed through the same instance of FraudDetector, a transaction for account A could set the flag to true and then a transaction for account B could set off a false alert.
We could of course use a data structure like a Map to keep track of the flags for individual keys, however, a simple member variable would not be fault-tolerant and all its information be lost in case of a failure.
Hence, the fraud detector would possibly miss alerts if the application ever had to restart to recover from a failure.
To address these challenges, Flink provides primitives for fault-tolerant state that are almost as easy to use as regular member variables.
The most basic type of state in Flink is ValueState, a data type that adds fault tolerance to any variable it wraps.
ValueState is a form of keyed state, meaning it is only available in operators that are applied in a keyed context; any operator immediately following DataStream#keyBy.
A keyed state of an operator is automatically scoped to the key of the record that is currently processed.
In this example, the key is the account id for the current transaction (as declared by keyBy()), and FraudDetector maintains an independent state for each account.
ValueState is created using a ValueStateDescriptor which contains metadata about how Flink should manage the variable. The state should be registered before the function starts processing data.
The right hook for this is the open() method.
ValueState is a wrapper class, similar to AtomicReference or AtomicLong in the Java standard library.
It provides three methods for interacting with its contents; update sets the state, value gets the current value, and clear deletes its contents.
If the state for a particular key is empty, such as at the beginning of an application or after calling ValueState#clear, then ValueState#value will return null.
Modifications to the object returned by ValueState#value are not guaranteed to be recognized by the system, and so all changes must be performed with ValueState#update.
Otherwise, fault tolerance is managed automatically by Flink under the hood, and so you can interact with it like with any standard variable.
Below, you can see an example of how you can use a flag state to track potential fraudulent transactions.
For every transaction, the fraud detector checks the state of the flag for that account.
Remember, ValueState is always scoped to the current key, i.e., account.
If the flag is non-null, then the last transaction seen for that account was small, and so if the amount for this transaction is large, then the detector outputs a fraud alert.
After that check, the flag state is unconditionally cleared.
Either the current transaction caused a fraud alert, and the pattern is over, or the current transaction did not cause an alert, and the pattern is broken and needs to be restarted.
Finally, the transaction amount is checked to see if it is small.
If so, then the flag is set so that it can be checked by the next event.
Notice that ValueState<Boolean> actually has three states, unset ( null), true, and false, because all ValueState’s are nullable.
This job only makes use of unset ( null) and true to check whether the flag is set or not.
Fraud Detector v2: State + Time = ❤️
Scammers don’t wait long to make their large purchase to reduce the chances their test transaction is noticed.
For example, suppose you wanted to set a 1 minute timeout to your fraud detector; i.e., in the previous example transactions 3 and 4 would only be considered fraud if they occurred within 1 minute of each other.
Flink’s KeyedProcessFunction allows you to set timers which invoke a callback method at some point in time in the future.
Let’s see how we can modify our Job to comply with our new requirements:
Whenever the flag is set to true, also set a timer for 1 minute in the future.
When the timer fires, reset the flag by clearing its state.
If the flag is ever cleared the timer should be canceled.
To cancel a timer, you have to remember what time it is set for, and remembering implies state, so you will begin by creating a timer state along with your flag state.
KeyedProcessFunction#processElement is called with a Context that contains a timer service.
The timer service can be used to query the current time, register timers, and delete timers.
With this, you can set a timer for 1 minute in the future every time the flag is set and store the timestamp in timerState.
Processing time is wall clock time, and is determined by the system clock of the machine running the operator.
When a timer fires, it calls KeyedProcessFunction#onTimer.
Overriding this method is how you can implement your callback to reset the flag.
Finally, to cancel the timer, you need to delete the registered timer and delete the timer state.
You can wrap this in a helper method and call this method instead of flagState.clear().
And that’s it, a fully functional, stateful, distributed streaming application!
Running this code with the provided TransactionSource will emit fraud alerts for account 3.
You should see the following output in your task manager logs: