Twitter Connector #
The Twitter Streaming API provides access to the stream of tweets made available by Twitter.
Flink Streaming comes with a built-in
TwitterSource class for establishing a connection to this stream.
To use this connector, add the following dependency to your project:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-twitter_2.11</artifactId> <version>1.13.0</version> </dependency>
Note that the streaming connectors are currently not part of the binary distribution. See linking with them for cluster execution here.
In order to connect to the Twitter stream the user has to register their program and acquire the necessary information for the authentication. The process is described below.
Acquiring the authentication information #
First of all, a Twitter account is needed. Sign up for free at twitter.com/signup
or sign in at Twitter’s Application Management and register the application by
clicking on the “Create New App” button. Fill out a form about your program and accept the Terms and Conditions.
After selecting the application, the API key and API secret (called
TwitterSource respectively) are located on the “API Keys” tab.
The necessary OAuth Access Token data (
TwitterSource) can be generated and acquired on the “Keys and Access Tokens” tab.
Remember to keep these pieces of information secret and do not push them to public repositories.
In contrast to other connectors, the
TwitterSource depends on no additional services. For example the following code should run gracefully:
Properties props = new Properties(); props.setProperty(TwitterSource.CONSUMER_KEY, ""); props.setProperty(TwitterSource.CONSUMER_SECRET, ""); props.setProperty(TwitterSource.TOKEN, ""); props.setProperty(TwitterSource.TOKEN_SECRET, ""); DataStream<String> streamSource = env.addSource(new TwitterSource(props));
val props = new Properties() props.setProperty(TwitterSource.CONSUMER_KEY, "") props.setProperty(TwitterSource.CONSUMER_SECRET, "") props.setProperty(TwitterSource.TOKEN, "") props.setProperty(TwitterSource.TOKEN_SECRET, "") val streamSource = env.addSource(new TwitterSource(props))
TwitterSource emits strings containing a JSON object, representing a Tweet.
TwitterExample class in the
flink-examples-streaming package shows a full example how to use the
By default, the
TwitterSource uses the
StatusesSampleEndpoint. This endpoint returns a random sample of Tweets.
There is a
TwitterSource.EndpointInitializer interface allowing users to provide a custom endpoint.