falkordb 1.6.0


pip install falkordb

  Latest version

Released: Feb 21, 2026


Meta
Author: FalkorDB inc
Requires Python: >=3.10

Classifiers

Development Status
  • 5 - Production/Stable

Environment
  • Console

Intended Audience
  • Developers

License
  • OSI Approved :: MIT License

Operating System
  • OS Independent

Programming Language
  • Python
  • Python :: 3
  • Python :: 3 :: Only
  • Python :: 3.10
  • Python :: 3.11
  • Python :: 3.12
  • Python :: 3.13
  • Python :: 3.14
  • Python :: Implementation :: CPython
  • Python :: Implementation :: PyPy

license Release PyPI version Codecov Forum Discord

falkordb-py

Try Free

FalkorDB Python client

see docs

Installation

pip install FalkorDB

Usage

Run FalkorDB instance

Docker:

docker run --rm -p 6379:6379 falkordb/falkordb

Or use FalkorDB Cloud

Synchronous Example

from falkordb import FalkorDB

# Connect to FalkorDB
db = FalkorDB(host='localhost', port=6379)

# Select the social graph
g = db.select_graph('social')

# Create 100 nodes and return a handful
nodes = g.query('UNWIND range(0, 100) AS i CREATE (n {v:1}) RETURN n LIMIT 10').result_set
for n in nodes:
    print(n)

# Read-only query the graph for the first 10 nodes
nodes = g.ro_query('MATCH (n) RETURN n LIMIT 10').result_set

# Copy the Graph
copy_graph = g.copy('social_copy')

# Delete the Graph
g.delete()

Asynchronous Example

import asyncio
from falkordb.asyncio import FalkorDB
from redis.asyncio import BlockingConnectionPool

async def main():

    # Connect to FalkorDB
    pool = BlockingConnectionPool(max_connections=16, timeout=None, decode_responses=True)
    db = FalkorDB(connection_pool=pool)
    
    # Select the social graph
    g = db.select_graph('social')
    
    # Execute query asynchronously
    result = await g.query('UNWIND range(0, 100) AS i CREATE (n {v:1}) RETURN n LIMIT 10')
    
    # Process results
    for n in result.result_set:
        print(n)
    
    # Run multiple queries concurrently
    tasks = [
        g.query('MATCH (n) WHERE n.v = 1 RETURN count(n) AS count'),
        g.query('CREATE (p:Person {name: "Alice"}) RETURN p'),
        g.query('CREATE (p:Person {name: "Bob"}) RETURN p')
    ]
    
    results = await asyncio.gather(*tasks)
    
    # Process concurrent results
    print(f"Node count: {results[0].result_set[0][0]}")
    print(f"Created Alice: {results[1].result_set[0][0]}")
    print(f"Created Bob: {results[2].result_set[0][0]}")
    
    # Close the connection when done
    await pool.aclose()

# Run the async example
if __name__ == "__main__":
    asyncio.run(main())

Wheel compatibility matrix

Platform Python 3
any

Files in release

Extras:
Dependencies:
python-dateutil (>=2.9.0)
redis (>=7.1.0)