Integrate with Pandas
Pandas is the most popular data processing and analysis library in the Python programming language. Since its creation in 2008 by Wes McKinney, it has become an indispensable core tool in the field of data science. Designed specifically to address real-world data analysis tasks, Pandas makes handling structured data in Python exceptionally straightforward. Whether dealing with business reports, scientific research data, or conducting financial analysis, Pandas provides professional solutions. With its intuitive API and extensive functionality, Pandas significantly lowers the technical barrier to data processing, enabling users to focus more on uncovering the value within the data rather than getting bogged down in technical intricacies.
Through the Python connector of TDengine TSDB, Pandas supports TDengine TSDB data sources and provides capabilities for data presentation and analysis.
Prerequisites
Prepare the following environment:
- TDengine TSDB 3.3.7.0 and above version is installed and running normally (both Enterprise and Community versions are available).
- SQLAlchemy version 2.0.0 or above is already installed, refre to Install reference link.
- pandas version v2.1.0 or above is already installed, refre to Install reference link
- The Python connector taospy version 2.8.6 or higher is installed, Install reference link.
Configure Data Source
Pandas uses SQLAlchemy to connect to TDengine TSDB data sources, with the connection URL formatted as:
taos://[username]:[password]@[<host1>:<port1>]/[database_name]
Establishing Connection
from datetime import datetime
import pandas
from sqlalchemy import create_engine, text
from sqlalchemy.types import Integer, Float, TIMESTAMP, String
def connect():
"""Create a connection to TDengine using SQLAlchemy"""
engine = create_engine(f"taos://root:taosdata@localhost:6030?timezone=Asia/Shanghai")
conn = engine.connect()
print("Connected to TDengine successfully.")
return conn
Data Interaction
The following describes how to perform write and query operations with the TDengine TSDB database by invoking Pandas interfaces in combination with SQLAlchemy. For detailed specifications of the Pandas interfaces, please refer to Pandas Api.
Data type mapping
TDengine currently supports timestamp, number, character, and boolean types, and the corresponding type conversions with Fsqlalchemy.types are as follows:
| Sqlalchemy Types | TDengine TSDB DataType |
|---|---|
| sqltypes.Boolean | BOOL |
| sqltypes.TIMESTAMP | TIMESTAMP |
| sqltypes.Integer | INT |
| sqltypes.Integer | INT UNSIGNED |
| sqltypes.BigInteger | BIGINT |
| sqltypes.BigInteger | BIGINT UNSIGNED |
| sqltypes.FLOAT | FLOAT |
| sqltypes.FLOAT | DOUBLE |
| sqltypes.SmallInteger | TINYINT |
| sqltypes.SmallInteger | TINYINT UNSIGNED |
| sqltypes.SmallInteger | SMALLINT |
| sqltypes.SmallInteger | SMALLINT UNSIGNED |
| sqltypes.String | BINARY |
| sqltypes.String | VARCHAR |
| sqltypes.BINARY | VARBINARY |
| sqltypes.Unicode | NCHAR |
| sqltypes.JSON | JSON |
| sqltypes.BLOB | BLOB |
| sqltypes.BINARY | GEOMETRY |
Data Writing
Writing data using Pandas' to_sql method:
def pandas_to_sql_example(conn):
"""Test writing data to TDengine using pandas DataFrame.to_sql() method and verify the results"""
try:
conn.execute(text("CREATE DATABASE IF NOT EXISTS power"))
conn.execute(text(
"CREATE STABLE IF NOT EXISTS power.meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)"))
conn.execute(text("USE power"))
data = {
"ts": [1729653691000, "2024-09-19 10:00:00", datetime(2024, 9, 20, 10, 11, 12, 456)],
"current": [11.5, 12.3, 13.7],
"voltage": [220, 230, 240],
"phase": [1.0, 1.1, 1.2],
"location": ["california.losangeles", "california.sandiego", "california.sanfrancisco"],
"groupid": [2, 2, 3],
"tbname": ["california", "sandiego", "sanfrancisco"]
}
df = pandas.DataFrame(data)
rows_affected = df.to_sql("meters", conn, if_exists="append", index=False,
dtype={
"ts": TIMESTAMP,
"current": Float,
"voltage": Integer,
"phase": Float,
"location": String,
"groupid": Integer,
})
assert rows_affected == 3, f"Expected to insert 3 rows, affected {rows_affected} rows"
except Exception as err:
print(f"Failed to insert data into power.meters, ErrMessage:{err}")
raise err
Data Reading
Querying using Pandas' read_sql method:
def pandas_read_sql_example(conn):
"""Test reading data from TDengine using pandas read_sql() method"""
try:
sql = text("SELECT * FROM power.meters WHERE current > :current AND phase > :phase")
sql_df = pandas.read_sql(
sql=sql,
con=conn,
params={"current": 10, "phase": 1}
)
print(sql_df.head(3))
print("Read data from TDengine successfully.")
except Exception as err:
print(f"Failed to read data from power.meters, ErrMessage:{err}")
raise err
Reading table data using Pandas' read_sql_table method:
def pandas_read_sql_table_example(conn):
"""Test reading data from TDengine using pandas read_sql_table() method"""
try:
table_df = pandas.read_sql_table(
table_name='meters',
con=conn,
index_col='ts',
parse_dates=['ts'],
chunksize=1000, # optional, read data rows in chunks
columns=[
'ts',
'current',
'voltage',
'phase',
'location',
'groupid'
],
)
total_rows = 0
for i, chunk in enumerate(table_df, start=1):
print(f"Processing chunk {i}")
for index, row in chunk.iterrows():
total_rows += 1
print(f"no: {total_rows}")
print(f"ts: {index}")
print(f"current: {row['current']}")
print(f"voltage: {row['voltage']}")
print(f"phase: {row['phase']}")
print(f"location: {row['location']}")
print(f"groupid: {row['groupid']}")
print("Read data from TDengine successfully using read_sql_table.")
except Exception as err:
print(f"Failed to read data from power.meters using read_sql_table, ErrMessage:{err}")
raise err