Skip to content
17 changes: 9 additions & 8 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ The most common way to insert data is from a Pandas dataframe.
from questdb.ingress import Sender

df = pd.DataFrame({
'id': pd.Categorical(['toronto1', 'paris3']),
'temperature': [20.0, 21.0],
'humidity': [0.5, 0.6],
'symbol': pd.Categorical(['ETH-USD', 'BTC-USD']),
'side': pd.Categorical(['sell', 'sell']),
'price': [2615.54, 39269.98],
'amount': [0.00044, 0.001],
'timestamp': pd.to_datetime(['2021-01-01', '2021-01-02'])})

conf = f'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.dataframe(df, table_name='sensors', at='timestamp')
sender.dataframe(df, table_name='trades', at='timestamp')

You can also send individual rows. This only requires a more minimal installation::

Expand All @@ -54,9 +55,9 @@ You can also send individual rows. This only requires a more minimal installatio
conf = f'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'sensors',
symbols={'id': 'toronto1'},
columns={'temperature': 20.0, 'humidity': 0.5},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())
sender.flush()

Expand Down Expand Up @@ -103,4 +104,4 @@ License
=======

The code is released under the `Apache License 2.0
<https://github.com/questdb/py-questdb-client/blob/main/LICENSE.txt>`_.
<https://github.com/questdb/py-questdb-client/blob/main/LICENSE.txt>`_.
31 changes: 19 additions & 12 deletions ci/pip_install_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import textwrap
import platform
import argparse
import os


arg_parser = argparse.ArgumentParser(
prog='pip_install_deps.py',
Expand All @@ -14,6 +12,7 @@

arg_parser.add_argument('--pandas-version')


class UnsupportedDependency(Exception):
pass

Expand All @@ -36,8 +35,8 @@ def pip_install(package, version=None):
return
output = res.stdout.decode('utf-8')
is_unsupported = (
('Could not find a version that satisfies the requirement' in output) or
('The conflict is caused by' in output))
('Could not find a version that satisfies the requirement' in output) or
('The conflict is caused by' in output))
if is_unsupported:
raise UnsupportedDependency(output)
else:
Expand All @@ -62,17 +61,25 @@ def ensure_timezone():
pip_install('pytz')


def install_old_pandas_and_numpy(args):
try_pip_install('pandas', args.pandas_version)
try_pip_install('numpy<2')

def install_new_pandas_and_numpy():
try_pip_install('pandas')
try_pip_install('numpy')

def main(args):
ensure_timezone()
pip_install('pip')
pip_install('setuptools')
try_pip_install('fastparquet>=2023.10.1')

if args.pandas_version is not None and args.pandas_version != '':
try_pip_install('pandas', args.pandas_version)
install_old_pandas_and_numpy(args)
else:
try_pip_install('pandas')
try_pip_install('numpy')
install_new_pandas_and_numpy()

if (sys.platform == 'darwin') and (platform.processor() == 'i386'):
# Workaround for https://github.com/apache/arrow/issues/41696
# Remove if/once resolved.
Expand All @@ -82,14 +89,14 @@ def main(args):
try_pip_install('pyarrow')

on_linux_is_glibc = (
(not platform.system() == 'Linux') or
(platform.libc_ver()[0] == 'glibc'))
is_64bits = sys.maxsize > 2**32
(not platform.system() == 'Linux') or
(platform.libc_ver()[0] == 'glibc'))
is_64bits = sys.maxsize > 2 ** 32
is_cpython = platform.python_implementation() == 'CPython'
is_windows_py3_12 = (
# https://github.com/dask/fastparquet/issues/892
platform.system() == 'Windows' and
sys.version_info >= (3, 12))
platform.system() == 'Windows' and
sys.version_info >= (3, 12))
if on_linux_is_glibc and is_64bits and is_cpython:
# Ensure that we've managed to install the expected dependencies.
import pandas
Expand Down
120 changes: 60 additions & 60 deletions docs/sender.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,25 @@ The sender also supports TLS and authentication.

conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
# One row at a time
# Adding by rows
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())
# It is highly recommended to auto-flush or to flush in batches,
# rather than for every row
sender.flush()

# Whole dataframes at once - MUCH FASTER
# Whole dataframes at once
df = pd.DataFrame({
'id': ['dubai2', 'memphis7'],
'temperature': [41.2, 33.3],
'humidity': [0.34, 0.55],
'timestamp': [
pd.Timestamp('2021-01-01 12:00:00'),
pd.Timestamp('2021-01-01 12:00:01')
]
})
sensor.dataframe('weather_sensor', df, at='timestamp')
'symbol': pd.Categorical(['ETH-USD', 'BTC-USD']),
'side': pd.Categorical(['sell', 'sell']),
'price': [2615.54, 39269.98],
'amount': [0.00044, 0.001],
'timestamp': pd.to_datetime(['2021-01-01', '2021-01-02'])})

sensor.dataframe('trades', df, at='timestamp')

The ``Sender`` object holds an internal buffer which will be flushed and sent
at when the ``with`` block ends.
Expand Down Expand Up @@ -138,13 +139,12 @@ Here is an example of sending a row with a symbol and a string:
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'news',
'trades',
symbols={
'category': 'sport'},
'symbol': 'ETH-USD', 'side': 'sell'},
columns={
'headline': 'The big game',
'url': 'https://dailynews.com/sport/the-big-game',
'views': 1000},
'price': 2615.54,
'amount': 0.00044}
at=datetime.datetime(2021, 1, 1, 12, 0, 0))

Populating Timestamps
Expand Down Expand Up @@ -184,9 +184,9 @@ received by the server.
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=ServerTimestamp) # Legacy feature, not recommended.

.. warning::
Expand Down Expand Up @@ -217,15 +217,15 @@ send any pending data immediately.
conf = 'http::addr=localhost:9000;'
with Sender.from_conf(conf) as sender:
sender.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())
sender.flush()
sender.row(
'weather_sensor',
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
'trades',
symbols={'symbol': 'BTC-USD', 'side': 'sell'},
columns={'price': 39269.98, 'amount': 0.001},
at=TimestampNanos.now())
sender.flush()

Expand Down Expand Up @@ -276,7 +276,7 @@ When using the HTTP protocol, the server will send back an error message if
the data is invalid or if there is a problem with the server. This will be
raised as an :class:`IngressError <questdb.ingress.IngressError>` exception.

The HTTP layer will also attempt retries, configurable via the
The HTTP layer will also attempt retries, configurable via the
:ref:`retry_timeout <sender_conf_request>` parameter.`

When using the TCP protocol errors are *not* sent back from the server and
Expand All @@ -299,12 +299,14 @@ rows as a single transaction.
with Sender.from_conf(conf) as sender:
with sender.transaction('weather_sensor') as txn:
txn.row(
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())
txn.row(
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
'trades',
symbols={'symbol': 'BTC-USD', 'side': 'sell'},
columns={'price': 39269.98, 'amount': 0.001},
at=TimestampNanos.now())

If auto-flushing is enabled, any pending data will be flushed before the
Expand Down Expand Up @@ -385,14 +387,14 @@ buffers.

buf = Buffer()
buf.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())
buf.row(
'weather_sensor',
symbols={'id': 'dubai2'},
columns={'temperature': 41.2, 'humidity': 0.34},
'trades',
symbols={'symbol': 'BTC-USD', 'side': 'sell'},
columns={'price': 39269.98, 'amount': 0.001},
at=TimestampNanos.now())

conf = 'http::addr=localhost:9000;'
Expand All @@ -415,9 +417,9 @@ databases via the ``.flush(buf, clear=False)`` option.

buf = Buffer()
buf.row(
'weather_sensor',
symbols={'id': 'toronto1'},
columns={'temperature': 23.5, 'humidity': 0.49},
'trades',
symbols={'symbol': 'ETH-USD', 'side': 'sell'},
columns={'price': 2615.54, 'amount': 0.00044},
at=TimestampNanos.now())

conf1 = 'http::addr=db1.host.com:9000;'
Expand Down Expand Up @@ -480,27 +482,25 @@ sender objects in parallel.
with Sender.from_conf(conf_string) as sender:
sender.dataframe(
df,
table_name='weather_sensor',
symbols=['id'],
table_name='trades',
symbols=['symbol', 'side'],
at='timestamp')

dfs = [
pd.DataFrame({
'id': ['sensor1', 'sensor2'],
'temperature': [22.5, 24.7],
'humidity': [0.45, 0.47],
'timestamp': [
pd.Timestamp('2017-01-01T12:00:00'),
pd.Timestamp('2017-01-01T12:00:01')
]}),
pd.DataFrame({
'id': ['sensor3', 'sensor4'],
'temperature': [23.1, 25.3],
'humidity': [0.48, 0.50],
'timestamp': [
pd.Timestamp('2017-01-01T12:00:02'),
pd.Timestamp('2017-01-01T12:00:03')
]})
pd.DataFrame({
'symbol': pd.Categorical(['ETH-USD', 'BTC-USD']),
'side': pd.Categorical(['sell', 'sell']),
'price': [2615.54, 39269.98],
'amount': [0.00044, 0.001],
'timestamp': pd.to_datetime(['2021-01-01', '2021-01-02'])}
),
pd.DataFrame({
'symbol': pd.Categorical(['BTC-USD', 'BTC-USD']),
'side': pd.Categorical(['buy', 'sell']),
'price': [39268.76, 39270.02],
'amount': [0.003, 0.010],
'timestamp': pd.to_datetime(['2021-01-03', '2021-01-03'])}
),
]

with ThreadPoolExecutor() as executor:
Expand Down
15 changes: 6 additions & 9 deletions examples/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@ def example(host: str = 'localhost', port: int = 9009):
with Sender.from_conf(conf) as sender:
# Record with provided designated timestamp (using the 'at' param)
# Notice the designated timestamp is expected in Nanoseconds,
# but timestamps in other columns are expected in Microseconds.
# but timestamps in other columns are expected in Microseconds.
# The API provides convenient functions
sender.row(
'trades',
symbols={
'pair': 'USDGBP',
'type': 'buy'},
'symbol': 'ETH-USD',
'side': 'sell'},
columns={
'traded_price': 0.83,
'limit_price': 0.84,
'qty': 100,
'traded_ts': datetime.datetime(
2022, 8, 6, 7, 35, 23, 189062,
tzinfo=datetime.timezone.utc)},
'price': 2615.54,
'amount': 0.00044,
},
at=TimestampNanos.now())

# You can call `sender.row` multiple times inside the same `with`
Expand Down
15 changes: 6 additions & 9 deletions examples/auth_and_tls.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,20 +14,17 @@ def example(host: str = 'localhost', port: int = 9009):
with Sender.from_conf(conf) as sender:
# Record with provided designated timestamp (using the 'at' param)
# Notice the designated timestamp is expected in Nanoseconds,
# but timestamps in other columns are expected in Microseconds.
# but timestamps in other columns are expected in Microseconds.
# The API provides convenient functions
sender.row(
'trades',
symbols={
'pair': 'USDGBP',
'type': 'buy'},
'symbol': 'ETH-USD',
'side': 'sell'},
columns={
'traded_price': 0.83,
'limit_price': 0.84,
'qty': 100,
'traded_ts': datetime.datetime(
2022, 8, 6, 7, 35, 23, 189062,
tzinfo=datetime.timezone.utc)},
'price': 2615.54,
'amount': 0.00044,
},
at=TimestampNanos.now())

# You can call `sender.row` multiple times inside the same `with`
Expand Down
15 changes: 6 additions & 9 deletions examples/basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,17 @@ def example(host: str = 'localhost', port: int = 9009):
with Sender.from_conf(conf) as sender:
# Record with provided designated timestamp (using the 'at' param)
# Notice the designated timestamp is expected in Nanoseconds,
# but timestamps in other columns are expected in Microseconds.
# but timestamps in other columns are expected in Microseconds.
# The API provides convenient functions
sender.row(
'trades',
symbols={
'pair': 'USDGBP',
'type': 'buy'},
'symbol': 'ETH-USD',
'side': 'sell'},
columns={
'traded_price': 0.83,
'limit_price': 0.84,
'qty': 100,
'traded_ts': datetime.datetime(
2022, 8, 6, 7, 35, 23, 189062,
tzinfo=datetime.timezone.utc)},
'price': 2615.54,
'amount': 0.00044,
},
at=TimestampNanos.now())

# You can call `sender.row` multiple times inside the same `with`
Expand Down
Loading