Search…
Creating a Kafka Streaming Application
Apache Kafka is a distributed streaming platform. You can use Kafka to stream data directly from an application into OmniSciDB.
This example is a bare-bones click-through application that captures user activity.
This example assumes you have already installed and configured Apache Kafka. See the Kafka website. The FlavorPicker example also has dependencies on Swing/AWT classes. See the Oracle Java SE website.

Creating a Kafka Producer

FlavorPicker.java sends the choice of Chocolate, Strawberry, or Vanilla to the Kafka broker. This example uses only one column of information, but the mechanism is the same for records of any size.
1
package flavors;
2
3
// Swing/AWT Interface classes
4
import java.awt.event.ActionEvent;
5
import java.awt.event.ActionListener;
6
import java.awt.EventQueue;
7
import javax.swing.JButton;
8
import javax.swing.JFrame;
9
import javax.swing.JLabel;
10
11
// Generic Java properties object
12
import java.util.Properties;
13
14
// Kafka Producer-specific classes
15
import org.apache.kafka.clients.producer.KafkaProducer;
16
import org.apache.kafka.clients.producer.Producer;
17
import org.apache.kafka.clients.producer.ProducerRecord;
18
19
public class FlavorPicker{
20
21
private JFrame frmFlavors;
22
private Producer producer;
23
/**
24
* Launch the application.
25
*/
26
public static void main(String[] args) {
27
EventQueue.invokeLater(new Runnable() {
28
public void run() {
29
try {
30
FlavorPicker window = new FlavorPicker(args);
31
window.frmFlavors.setVisible(true);
32
} catch (Exception e) {
33
e.printStackTrace();
34
}
35
}
36
});
37
}
38
39
/**
40
* Create the application.
41
*/
42
public FlavorPicker(String[] args) {
43
initialize(args);
44
}
45
46
/**
47
* Initialize the contents of the frame.
48
*/
49
private void initialize(String[] args) {
50
frmFlavors = new JFrame();
51
frmFlavors.setTitle("Flavors");
52
frmFlavors.setBounds(100, 100, 408, 177);
53
frmFlavors.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
54
frmFlavors.getContentPane().setLayout(null);
55
56
final JLabel lbl_yourPick = new JLabel("You picked nothing.");
57
lbl_yourPick.setBounds(130, 85, 171, 15);
58
frmFlavors.getContentPane().add(lbl_yourPick);
59
60
JButton button = new JButton("Strawberry");
61
button.addActionListener(new ActionListener() {
62
public void actionPerformed(ActionEvent arg0) {
63
lbl_yourPick.setText("You picked strawberry.");
64
pick(args,1);
65
}
66
});
67
button.setBounds(141, 12, 114, 25);
68
frmFlavors.getContentPane().add(button);
69
70
JButton btnVanilla = new JButton("Vanilla");
71
btnVanilla.addActionListener(new ActionListener() {
72
public void actionPerformed(ActionEvent e) {
73
lbl_yourPick.setText("You picked vanilla.");
74
pick(args,2);
75
}
76
});
77
btnVanilla.setBounds(278, 12, 82, 25);
78
frmFlavors.getContentPane().add(btnVanilla);
79
80
81
JButton btnChocolate = new JButton("Chocolate");
82
btnChocolate.addActionListener(new ActionListener() {
83
public void actionPerformed(ActionEvent e) {
84
lbl_yourPick.setText("You picked chocolate.");
85
pick(args, 0);
86
}
87
});
88
89
btnChocolate.setBounds(12, 12, 105, 25);
90
frmFlavors.getContentPane().add(btnChocolate);
91
}
92
public void pick(String[] args,int x) {
93
String topicName = args[0].toString();
94
String[] value = {"chocolate","strawberry","vanilla"};
95
96
// Set the producer configuration properties.
97
Properties props = new Properties();
98
props.put("bootstrap.servers", "localhost:9097");
99
props.put("acks", "all");
100
props.put("retries", 0);
101
props.put("batch.size", 100);
102
props.put("linger.ms", 1);
103
props.put("buffer.memory", 33554432);
104
props.put("key.serializer",
105
"org.apache.kafka.common.serialization.StringSerializer");
106
props.put("value.serializer",
107
"org.apache.kafka.common.serialization.StringSerializer");
108
109
// Instantiate a producerSampleJDBC
110
producer = new KafkaProducer(props);
111
112
// Send a 1000 record stream to the Kafka Broker
113
for (int y=0; y<1000; y++){
114
producer.send(new ProducerRecord(topicName, value[x]));
115
}
116
}
117
}//End FlavorPicker.java
Copied!

Creating a Kafka Consumer

FlavorConsumer.java polls the Kafka broker periodically, pulls any new topics added since the last poll, and loads them to OmniSciDB. Ideally, each batch should be fairly substantial in size, minimally 1,000 rows or more, so as not to overburden the server.
1
package flavors;
2
3
import java.util.Properties;
4
import java.util.Arrays;
5
import org.apache.kafka.clients.consumer.KafkaConsumer;
6
import org.apache.kafka.clients.consumer.ConsumerRecords;
7
import org.apache.kafka.clients.consumer.ConsumerRecord;
8
9
//JDBC
10
import java.sql.Connection;
11
import java.sql.DriverManager;
12
import java.sql.PreparedStatement;
13
import java.sql.SQLException;
14
15
// Usage:\nFlavorConsumer
16
17
public class FlavorConsumer {
18
public static void main(String[] args) throws Exception {
19
if (args.length < 2) {
20
System.out.println("Usage:\n\nFlavorConsumer ");
21
return;
22
}
23
// Configure the Kafka Consumer
24
String topicName = args[0].toString();
25
Properties props = new Properties();
26
27
props.put("bootstrap.servers", "localhost:9097"); // Use 9097 so as not
28
// to collide with
29
// OmniSci Immerse
30
props.put("group.id", "test");
31
props.put("enable.auto.commit", "true");
32
props.put("auto.commit.interval.ms", "1000");
33
props.put("session.timeout.ms", "30000");
34
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
35
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
36
KafkaConsumer consumer = new KafkaConsumer(props);
37
38
// Subscribe the Kafka Consumer to the topic.
39
consumer.subscribe(Arrays.asList(topicName));
40
41
// print the topic name
42
System.out.println("Subscribed to topic " + topicName);
43
44
String flavorValue = "";
45
46
while (true) {
47
ConsumerRecords records = consumer.poll(1000);
48
49
// Create connection and prepared statement objects
50
Connection conn = null;
51
PreparedStatement pstmt = null;
52
53
try {
54
// JDBC driver name and database URL
55
final String JDBC_DRIVER = "com.mapd.jdbc.MapDDriver";
56
final String DB_URL = "jdbc:omnisci:localhost:6274:omnisci";
57
58
// Database credentials
59
final String USER = "omnisci";
60
final String PASS = args[1].toString();
61
62
// STEP 1: Register JDBC driver
63
Class.forName(JDBC_DRIVER);
64
65
// STEP 2: Open a connection
66
conn = DriverManager.getConnection(DB_URL, USER, PASS);
67
68
// STEP 3: Prepare a statement template
69
pstmt = conn.prepareStatement("INSERT INTO flavors VALUES (?)");
70
71
// STEP 4: Populate the prepared statement batch
72
for (ConsumerRecord record : records) {
73
flavorValue = record.value();
74
pstmt.setString(1, flavorValue);
75
pstmt.addBatch();
76
}
77
78
// STEP 5: Execute the batch statement (send records to OmniSciDB)
79
pstmt.executeBatch();
80
81
// Commit and close the connection.
82
conn.commit();
83
conn.close();
84
85
} catch (SQLException se) {
86
// Handle errors for JDBC
87
se.printStackTrace();
88
89
} catch (Exception e) {
90
// Handle errors for Class.forName
91
e.printStackTrace();
92
} finally {
93
94
try {
95
if (pstmt != null) {
96
pstmt.close();
97
}
98
} catch (SQLException se2) {
99
} // nothing we can do
100
101
try {
102
if (conn != null) {
103
conn.close();
104
}
105
} catch (SQLException se) {
106
se.printStackTrace();
107
} // end finally try
108
109
} // end try
110
} // end main
111
}
112
}// end FlavorConsumer}
Copied!

Running the Kafka Click-through Application

To run the application, you need to perform the following tasks:
    Compile FlavorConsumer.java and FlavorPicker.java.
    Create a table in OmniSciDB
    Start the Zookeeper server
    Start the Kafka server
    Start the Kafka consumer
    Start the Kafka producer
    View the results using omnisql and OmniSci Immerse
    Compile FlavorConsumer.java and FlavorPicker.java, storing the resulting class files in $OMNISCI_PATH/SampleCode/kafka-clickthrough/bin.
    Using omnisql, create the table flavors with one column, flavor, in OmniSciDB. See omnisql for more information.
    1
    omnisql> CREATE TABLE flavors (flavor TEXT ENCODING DICT);
    Copied!
    Open a new terminal window.
    Go to your kafka directory.
    Start the Zookeeper server with the following command.
    1
    ./bin/zookeeper-server-start.sh config/zookeeper.properties
    Copied!
    Open a new terminal window.
    Go to the kafka directory.
    Start the Kafka server with the following command.
    1
    ./bin/kafka-server-start.sh config/server.properties
    Copied!
    Open a new terminal window.
    Go to the kafka directory.
    Create a new Kafka topic with the following command. This starts a basic broker with only one replica and one partition. See the Kafka documentation for more information.
    1
    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1
    2
    --partitions 1 --topic myflavors
    Copied!
    Open a new terminal window.
    Launch FlavorConsumer with the following command, substituting the actual path to the Kafka directory and your OmniSci Database password.
    1
    java -cp .:<kafka-directory-path>/libs/*:$OMNISCI_PATH/bin/*:$OMNISCI_PATH/SampleCode/kafka-clickthrough/bin flavors.FlavorConsumer myflavors <myPassword>
    Copied!
    Launch FlavorPicker with the following command.
    1
    java -cp .:<kafka-directory-path>/libs/*:$OMNISCI_PATH/bin/*:$OMNISCI_PATH/SampleCode/kafka-clickthrough/bin flavors.FlavorPicker myflavors
    Copied!
    Click to create several records for Chocolate, Strawberry, and Vanilla. Each click generates 1,000 records.
    1.
    Use omnisql to see that the results have arrived in OmniSciDB.
    1.
    Use OmniSci Immerse to visualize the results.
Last modified 1mo ago