Write from Kafka
About Kafka
Apache Kafka is an open-source distributed event streaming platform, used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications. For the key concepts of kafka, please refer to kafka documentation.
kafka topic
Messages in Kafka are organized by topics. A topic may have one or more partitions. We can manage kafka topics through kafka-topics
.
create a topic named kafka-events
:
bin/kafka-topics.sh --create --topic kafka-events --bootstrap-server localhost:9092
Alter kafka-events
topic to set partitions to 3:
bin/kafka-topics.sh --alter --topic kafka-events --partitions 3 --bootstrap-server=localhost:9092
Show all topics and partitions in Kafka:
bin/kafka-topics.sh --bootstrap-server=localhost:9092 --describe
Insert into TDengine
We can write data into TDengine via SQL or Schemaless. For more information, please refer to Insert Using SQL or High Performance Writing or Schemaless Writing.
Examples
- Python
python Kafka client
For python kafka client, please refer to kafka client. In this document, we use kafka-python.
consume from Kafka
The simple way to consume messages from Kafka is to read messages one by one. The demo is as follows:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
for msg in consumer:
print (msg)
For higher performance, we can consume message from kafka in batch. The demo is as follows:
from kafka import KafkaConsumer
consumer = KafkaConsumer('my_favorite_topic')
while True:
msgs = consumer.poll(timeout_ms=500, max_records=1000)
if msgs:
print (msgs)
multi-threading
For more higher performance we can process data from kafka in multi-thread. We can use python's ThreadPoolExecutor to achieve multithreading. The demo is as follows:
from concurrent.futures import ThreadPoolExecutor, Future
pool = ThreadPoolExecutor(max_workers=10)
pool.submit(...)
multi-process
For more higher performance, sometimes we use multiprocessing. In this case, the number of Kafka Consumers should not be greater than the number of Kafka Topic Partitions. The demo is as follows:
from multiprocessing import Process
ps = []
for i in range(5):
p = Process(target=Consumer().consume())
p.start()
ps.append(p)
for p in ps:
p.join()
In addition to python's built-in multithreading and multiprocessing library, we can also use the third-party library gunicorn.
examples
kafka_example_perform
kafka_example_perform
is the entry point of the examples.
#! encoding=utf-8
import argparse
import logging
import multiprocessing
import time
from multiprocessing import pool
import kafka_example_common as common
import kafka_example_consumer as consumer
import kafka_example_producer as producer
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-kafka-broker', type=str, default='localhost:9092',
help='kafka borker host. default is `localhost:9200`')
parser.add_argument('-kafka-topic', type=str, default='tdengine-kafka-practices',
help='kafka topic. default is `tdengine-kafka-practices`')
parser.add_argument('-kafka-group', type=str, default='kafka_practices',
help='kafka consumer group. default is `kafka_practices`')
parser.add_argument('-taos-host', type=str, default='localhost',
help='TDengine host. default is `localhost`')
parser.add_argument('-taos-port', type=int, default=6030, help='TDengine port. default is 6030')
parser.add_argument('-taos-user', type=str, default='root', help='TDengine username, default is `root`')
parser.add_argument('-taos-password', type=str, default='taosdata', help='TDengine password, default is `taosdata`')
parser.add_argument('-taos-db', type=str, default='tdengine_kafka_practices',
help='TDengine db name, default is `tdengine_kafka_practices`')
parser.add_argument('-table-count', type=int, default=100, help='TDengine sub-table count, default is 100')
parser.add_argument('-table-items', type=int, default=1000, help='items in per sub-tables, default is 1000')
parser.add_argument('-message-type', type=str, default='line',
help='kafka message type. `line` or `json`. default is `line`')
parser.add_argument('-max-poll', type=int, default=1000, help='max poll for kafka consumer')
parser.add_argument('-threads', type=int, default=10, help='thread count for deal message')
parser.add_argument('-processes', type=int, default=1, help='process count')
args = parser.parse_args()
total = args.table_count * args.table_items
logging.warning("## start to prepare testing data...")
prepare_data_start = time.time()
producer.produce_total(100, args.kafka_broker, args.kafka_topic, args.message_type, total, args.table_count)
prepare_data_end = time.time()
logging.warning("## prepare testing data finished! spend-[%s]", prepare_data_end - prepare_data_start)
logging.warning("## start to create database and tables ...")
create_db_start = time.time()
# create database and table
common.create_database_and_tables(host=args.taos_host, port=args.taos_port, user=args.taos_user,
password=args.taos_password, db=args.taos_db, table_count=args.table_count)
create_db_end = time.time()
logging.warning("## create database and tables finished! spend [%s]", create_db_end - create_db_start)
processes = args.processes
logging.warning("## start to consume data and insert into TDengine...")
consume_start = time.time()
if processes > 1: # multiprocess
multiprocessing.set_start_method("spawn")
pool = pool.Pool(processes)
consume_start = time.time()
for _ in range(processes):
pool.apply_async(func=consumer.consume, args=(
args.kafka_broker, args.kafka_topic, args.kafka_group, args.taos_host, args.taos_port, args.taos_user,
args.taos_password, args.taos_db, args.message_type, args.max_poll, args.threads))
pool.close()
pool.join()
else:
consume_start = time.time()
consumer.consume(kafka_brokers=args.kafka_broker, kafka_topic=args.kafka_topic, kafka_group_id=args.kafka_group,
taos_host=args.taos_host, taos_port=args.taos_port, taos_user=args.taos_user,
taos_password=args.taos_password, taos_database=args.taos_db, message_type=args.message_type,
max_poll=args.max_poll, workers=args.threads)
consume_end = time.time()
logging.warning("## consume data and insert into TDengine over! spend-[%s]", consume_end - consume_start)
# print report
logging.warning(
"\n#######################\n"
" Prepare data \n"
"#######################\n"
"# data_type # %s \n"
"# total # %s \n"
"# spend # %s s\n"
"#######################\n"
" Create database \n"
"#######################\n"
"# stable # 1 \n"
"# sub-table # 100 \n"
"# spend # %s s \n"
"#######################\n"
" Consume \n"
"#######################\n"
"# data_type # %s \n"
"# threads # %s \n"
"# processes # %s \n"
"# total_count # %s \n"
"# spend # %s s\n"
"# per_second # %s \n"
"#######################\n",
args.message_type, total, prepare_data_end - prepare_data_start, create_db_end - create_db_start,
args.message_type, args.threads, processes, total, consume_end - consume_start,
total / (consume_end - consume_start))
kafka_example_common
kafka_example_common
is the common code of the examples.
#! encoding = utf-8
import taos
LOCATIONS = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
'California.SantaClara', 'California.Cupertino']
CREATE_DATABASE_SQL = 'create database if not exists {} keep 36500 duration 10 buffer 16 wal_level 1 wal_retention_period 3600'
USE_DATABASE_SQL = 'use {}'
DROP_TABLE_SQL = 'drop table if exists meters'
DROP_DATABASE_SQL = 'drop database if exists {}'
CREATE_STABLE_SQL = 'create stable meters (ts timestamp, current float, voltage int, phase float) tags ' \
'(location binary(64), groupId int)'
CREATE_TABLE_SQL = 'create table if not exists {} using meters tags (\'{}\', {})'
def create_database_and_tables(host, port, user, password, db, table_count):
tags_tables = _init_tags_table_names(table_count=table_count)
conn = taos.connect(host=host, port=port, user=user, password=password)
conn.execute(DROP_DATABASE_SQL.format(db))
conn.execute(CREATE_DATABASE_SQL.format(db))
conn.execute(USE_DATABASE_SQL.format(db))
conn.execute(DROP_TABLE_SQL)
conn.execute(CREATE_STABLE_SQL)
for tags in tags_tables:
location, group_id = _get_location_and_group(tags)
tables = tags_tables[tags]
for table_name in tables:
conn.execute(CREATE_TABLE_SQL.format(table_name, location, group_id))
conn.close()
def clean(host, port, user, password, db):
conn = taos.connect(host=host, port=port, user=user, password=password)
conn.execute(DROP_DATABASE_SQL.format(db))
conn.close()
def _init_tags_table_names(table_count):
tags_table_names = {}
group_id = 0
for i in range(table_count):
table_name = 'd{}'.format(i)
location_idx = i % len(LOCATIONS)
location = LOCATIONS[location_idx]
if location_idx == 0:
group_id += 1
if group_id > 10:
group_id -= 10
key = _tag_table_mapping_key(location=location, group_id=group_id)
if key not in tags_table_names:
tags_table_names[key] = []
tags_table_names[key].append(table_name)
return tags_table_names
def _tag_table_mapping_key(location, group_id):
return '{}_{}'.format(location, group_id)
def _get_location_and_group(key):
fields = key.split('_')
return fields[0], fields[1]
kafka_example_producer
kafka_example_producer
is producer
, which is responsible for generating test data and sending it to kafka.
#! encoding = utf-8
import json
import random
import threading
from concurrent.futures import ThreadPoolExecutor, Future
from datetime import datetime
from kafka import KafkaProducer
locations = ['California.SanFrancisco', 'California.LosAngles', 'California.SanDiego', 'California.SanJose',
'California.PaloAlto', 'California.Campbell', 'California.MountainView', 'California.Sunnyvale',
'California.SantaClara', 'California.Cupertino']
producers: list[KafkaProducer] = []
lock = threading.Lock()
start = 1640966400
def produce_total(workers, broker, topic, message_type, total, table_count):
if len(producers) == 0:
lock.acquire()
if len(producers) == 0:
_init_kafka_producers(broker=broker, count=10)
lock.release()
pool = ThreadPoolExecutor(max_workers=workers)
futures = []
for _ in range(0, workers):
futures.append(pool.submit(_produce_total, topic, message_type, int(total / workers), table_count))
pool.shutdown()
for f in futures:
f.result()
_close_kafka_producers()
def _produce_total(topic, message_type, total, table_count):
producer = _get_kafka_producer()
for _ in range(total):
message = _get_fake_date(message_type=message_type, table_count=table_count)
producer.send(topic=topic, value=message.encode(encoding='utf-8'))
def _init_kafka_producers(broker, count):
for _ in range(count):
p = KafkaProducer(bootstrap_servers=broker, batch_size=64 * 1024, linger_ms=300, acks=0)
producers.append(p)
def _close_kafka_producers():
for p in producers:
p.close()
def _get_kafka_producer():
return producers[random.randint(0, len(producers) - 1)]
def _get_fake_date(table_count, message_type='json'):
if message_type == 'json':
return _get_json_message(table_count=table_count)
if message_type == 'line':
return _get_line_message(table_count=table_count)
return ''
def _get_json_message(table_count):
return json.dumps({
'ts': _get_timestamp(),
'current': random.randint(0, 1000) / 100,
'voltage': random.randint(105, 115),
'phase': random.randint(0, 32000) / 100000,
'location': random.choice(locations),
'groupId': random.randint(1, 10),
'table_name': _random_table_name(table_count)
})
def _get_line_message(table_count):
return "{} values('{}', {}, {}, {})".format(
_random_table_name(table_count), # table
_get_timestamp(), # ts
random.randint(0, 1000) / 100, # current
random.randint(105, 115), # voltage
random.randint(0, 32000) / 100000, # phase
)
def _random_table_name(table_count):
return 'd{}'.format(random.randint(0, table_count - 1))
def _get_timestamp():
global start
lock.acquire(blocking=True)
start += 0.001
lock.release()
return datetime.fromtimestamp(start).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3]
kafka_example_consumer
kafka_example_consumer
is consumer
, which is responsible for consuming data from kafka and writing it to TDengine.
#! encoding = utf-8
import json
import logging
import time
from concurrent.futures import ThreadPoolExecutor, Future
from json import JSONDecodeError
from typing import Callable
import taos
from kafka import KafkaConsumer
from kafka.consumer.fetcher import ConsumerRecord
import kafka_example_common as common
class Consumer(object):
DEFAULT_CONFIGS = {
'kafka_brokers': 'localhost:9092', # kafka broker
'kafka_topic': 'tdengine_kafka_practices',
'kafka_group_id': 'taos',
'taos_host': 'localhost', # TDengine host
'taos_port': 6030, # TDengine port
'taos_user': 'root', # TDengine user name
'taos_password': 'taosdata', # TDengine password
'taos_database': 'power', # TDengine database
'message_type': 'json', # message format, 'json' or 'line'
'clean_after_testing': False, # if drop database after testing
'max_poll': 1000, # poll size for batch mode
'workers': 10, # thread count for multi-threading
'testing': False
}
INSERT_SQL_HEADER = "insert into "
INSERT_PART_SQL = '{} values (\'{}\', {}, {}, {})'
def __init__(self, **configs):
self.config = self.DEFAULT_CONFIGS
self.config.update(configs)
self.consumer = None
if not self.config.get('testing'):
self.consumer = KafkaConsumer(
self.config.get('kafka_topic'),
bootstrap_servers=self.config.get('kafka_brokers'),
group_id=self.config.get('kafka_group_id'),
)
self.conns = taos.connect(
host=self.config.get('taos_host'),
port=self.config.get('taos_port'),
user=self.config.get('taos_user'),
password=self.config.get('taos_password'),
db=self.config.get('taos_database'),
)
if self.config.get('workers') > 1:
self.pool = ThreadPoolExecutor(max_workers=self.config.get('workers'))
self.tasks = []
# tags and table mapping # key: {location}_{groupId} value:
def consume(self):
"""
consume data from kafka and deal. Base on `message_type`, `bath_consume`, `insert_by_table`,
there are several deal function.
:return:
"""
self.conns.execute(common.USE_DATABASE_SQL.format(self.config.get('taos_database')))
try:
if self.config.get('message_type') == 'line': # line
self._run(self._line_to_taos)
if self.config.get('message_type') == 'json': # json
self._run(self._json_to_taos)
except KeyboardInterrupt:
logging.warning("## caught keyboard interrupt, stopping")
finally:
self.stop()
def stop(self):
"""
stop consuming
:return:
"""
# close consumer
if self.consumer is not None:
self.consumer.commit()
self.consumer.close()
# multi thread
if self.config.get('workers') > 1:
if self.pool is not None:
self.pool.shutdown()
for task in self.tasks:
while not task.done():
time.sleep(0.01)
# clean data
if self.config.get('clean_after_testing'):
self.conns.execute(common.DROP_TABLE_SQL)
self.conns.execute(common.DROP_DATABASE_SQL.format(self.config.get('taos_database')))
# close taos
if self.conns is not None:
self.conns.close()
def _run(self, f):
"""
run in batch consuming mode
:param f:
:return:
"""
i = 0 # just for test.
while True:
messages = self.consumer.poll(timeout_ms=100, max_records=self.config.get('max_poll'))
if messages:
if self.config.get('workers') > 1:
self.pool.submit(f, messages.values())
else:
f(list(messages.values()))
if not messages:
i += 1 # just for test.
time.sleep(0.1)
if i > 3: # just for test.
logging.warning('## test over.') # just for test.
return # just for test.
def _json_to_taos(self, messages):
"""
convert a batch of json data to sql, and insert into TDengine
:param messages:
:return:
"""
sql = self._build_sql_from_json(messages=messages)
self.conns.execute(sql=sql)
def _line_to_taos(self, messages):
"""
convert a batch of lines data to sql, and insert into TDengine
:param messages:
:return:
"""
lines = []
for partition_messages in messages:
for message in partition_messages:
lines.append(message.value.decode())
sql = self.INSERT_SQL_HEADER + ' '.join(lines)
self.conns.execute(sql=sql)
def _build_single_sql_from_json(self, msg_value):
try:
data = json.loads(msg_value)
except JSONDecodeError as e:
logging.error('## decode message [%s] error ', msg_value, e)
return ''
# location = data.get('location')
# group_id = data.get('groupId')
ts = data.get('ts')
current = data.get('current')
voltage = data.get('voltage')
phase = data.get('phase')
table_name = data.get('table_name')
return self.INSERT_PART_SQL.format(table_name, ts, current, voltage, phase)
def _build_sql_from_json(self, messages):
sql_list = []
for partition_messages in messages:
for message in partition_messages:
sql_list.append(self._build_single_sql_from_json(message.value))
return self.INSERT_SQL_HEADER + ' '.join(sql_list)
def test_json_to_taos(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'table_name': 'd0',
'ts': '2022-12-06 15:13:38.643',
'current': 3.41,
'voltage': 105,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value=json.dumps({'table_name': 'd1',
'ts': '2022-12-06 15:13:39.643',
'current': 3.41,
'voltage': 102,
'phase': 0.02027, }),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._json_to_taos(messages=records)
def test_line_to_taos(consumer: Consumer):
records = [
[
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value="d0 values('2023-01-01 00:00:00.001', 3.49, 109, 0.02737)".encode('utf-8'),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
ConsumerRecord(checksum=None, headers=None, offset=1, key=None,
value="d1 values('2023-01-01 00:00:00.002', 6.19, 112, 0.09171)".encode('utf-8'),
partition=1, topic='test', serialized_key_size=None, serialized_header_size=None,
serialized_value_size=None, timestamp=time.time(), timestamp_type=None),
]
]
consumer._line_to_taos(messages=records)
def consume(kafka_brokers, kafka_topic, kafka_group_id, taos_host, taos_port, taos_user,
taos_password, taos_database, message_type, max_poll, workers):
c = Consumer(kafka_brokers=kafka_brokers, kafka_topic=kafka_topic, kafka_group_id=kafka_group_id,
taos_host=taos_host, taos_port=taos_port, taos_user=taos_user, taos_password=taos_password,
taos_database=taos_database, message_type=message_type, max_poll=max_poll, workers=workers)
c.consume()
if __name__ == '__main__':
consumer = Consumer(testing=True)
common.create_database_and_tables(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test',
table_count=10)
consumer.conns.execute(common.USE_DATABASE_SQL.format('py_kafka_test'))
test_json_to_taos(consumer)
test_line_to_taos(consumer)
common.clean(host='localhost', port=6030, user='root', password='taosdata', db='py_kafka_test')
execute Python examples
execute Python examples
- install and start up
kafka
- install python3 and pip
- install
taospy
by pip - install
kafka-python
by pip - execute this example
The entry point of this example is kafka_example_perform.py
. For more information about usage, please use --help
command.
python3 kafka_example_perform.py --help
For example, the following command is creating 100 sub-table and inserting 20000 data for each table and the kafka max poll is 100 and 1 thread and 1 process per thread.
python3 kafka_example_perform.py -table-count=100 -table-items=20000 -max-poll=100 -threads=1 -processes=1