Create an account

Very important

  • To access the important data of the forums, you must be active in each forum and especially in the leaks and database leaks section, send data and after sending the data and activity, data and important content will be opened and visible for you.
  • You will only see chat messages from people who are at or below your level.
  • More than 500,000 database leaks and millions of account leaks are waiting for you, so access and view with more activity.
  • Many important data are inactive and inaccessible for you, so open them with activity. (This will be done automatically)


Thread Rating:
  • 470 Vote(s) - 3.5 Average
  • 1
  • 2
  • 3
  • 4
  • 5
Read & write data into cassandra using apache flink Java API

#1
I intend to use apache flink for read/write data into cassandra using flink. I was hoping to use [flink-connector-cassandra][1], I don't find good documentation/examples for the connector.


Can you please point me to the right way for read and write data from cassandra using Apache Flink. I see only sink example which are purely for write ? Is apache flink meant for reading data too from cassandra similar to apache spark ?




[1]:

[To see links please register here]

Reply

#2
You can use `RichFlatMapFunction` to extend a class

class MongoMapper extends RichFlatMapFunction[JsonNode,JsonNode]{
var userCollection: MongoCollection[Document] = _
override def open(parameters: Configuration): Unit = {
// do something here like opening connection
val client: MongoClient = MongoClient("mongodb://localhost:10000")

userCollection = client.getDatabase("gp_stage").getCollection("users").withReadPreference(ReadPreference.secondaryPreferred())
super.open(parameters)
}
override def flatMap(event: JsonNode, out: Collector[JsonNode]): Unit = {

// Do something here per record and this function can make use of objects initialized via open
userCollection.find(Filters.eq("_id", somevalue)).limit(1).first().subscribe(
(result: Document) => {
// println(result)
},
(t: Throwable) =>{
println(t)
},
()=>{
out.collect(event)
}
)
}


}

}


Basically `open` function executes once per worker and `flatmap` executes it per record. The example is for mongo but can be similarly used for cassandra
Reply

#3
In your case as I understand the first step of your pipeline is reading data from Cassandra rather than writing a `RichFlatMapFunction` you should write your own `RichSourceFunction`

As a reference you can have a look at simple implementation of [WikipediaEditsSource][1].


[1]:

[To see links please register here]

Reply

#4
I had the same question, and this is what I was looking for. I don't know if it is over simplified for what you need, but figured I should show it none the less.

ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("urlToUse.com").withPort(9042).build();
}
};

CassandraInputFormat<Tuple2<String, String>> cassandraInputFormat = new CassandraInputFormat<>("SELECT * FROM example.cassandraconnectorexample", cb);

cassandraInputFormat.configure(null);
cassandraInputFormat.open(null);

Tuple2<String, String> testOutputTuple = new Tuple2<>();
cassandraInputFormat.nextRecord(testOutputTuple);

System.out.println("column1: " + testOutputTuple.f0);
System.out.println("column2: " + testOutputTuple.f1);

The way I figured this out was thanks to finding the code for the "CassandraInputFormat" class and seeing how it worked (

[To see links please register here]

). I honestly expected it to just be a format and not the full class of reading from Cassandra based on the name, and I have a feeling others might be thinking the same thing.
Reply

#5
ClusterBuilder cb = new ClusterBuilder() {
@Override
public Cluster buildCluster(Cluster.Builder builder) {
return builder.addContactPoint("localhost").withPort(9042).build();
}
};

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
InputFormat inputFormat = new CassandraInputFormat<Tuple3<Integer, Integer, Integer>>("SELECT * FROM test.example;", cb);//, TypeInformation.of(Tuple3.class));

DataStreamSource t = env.createInput(inputFormat, TupleTypeInfo.of(new TypeHint<Tuple3<Integer, Integer,Integer>>() {}));
tableEnv.registerDataStream("t1",t);
Table t2 = tableEnv.sql("select * from t1");

t2.printSchema();
Reply



Forum Jump:


Users browsing this thread:
2 Guest(s)

©0Day  2016 - 2023 | All Rights Reserved.  Made with    for the community. Connected through