A modular, high-performance sensor data management system built in Rust that collects, processes, and stores sensor data from various sources.
Agin Sensors is designed as a flexible, scalable solution for IoT sensor data collection and management. The system uses a modular architecture that allows easy addition of new data sources, databases, and data processing modifiers through a trait-based plugin system.
- Multi-Protocol Support: Built-in connectors for MQTT, Socket.IO, and Modbus
- Database Flexibility: Support for multiple time-series databases (InfluxDB, TimescaleDB in progress)
- Organization-Based Filtering: Route sensor data to different organizations based on MAC addresses, tokens, or other metadata
- Data Processing Pipeline: Apply custom modifiers to transform and process sensor data in real-time
- Configuration-Driven: YAML-based configuration for connectors, databases, and organizations
- Buffering & Aggregationtion: Built-in buffering system for data aggregation and batch processing
- Docker Support: Complete containerized deployment with Docker Compose
The system is built with a modular architecture consisting of several core components:
aginsensors_core: Core traits and abstractions for connectors, databases, and modifiersdaemon: Main application daemon that orchestrates data flow between componentsmodules: Plugin system that dynamically loads available connectors, databases, and modifiers
connector_mqtt: MQTT broker connector with support for various sensor formats (BeanAir)connector_socketio: Socket.IO server for real-time web-based sensor dataconnector_modbus: Modbus protocol connector for industrial sensors
database_influx: InfluxDB time-series database integration
modifier_template: Example data processing modifier that can transform measurements
- Rust 1.70+ (for building from source)
- Docker and Docker Compose (for containerized deployment)
- Clone the repository:
git clone https://github.com/aginrocks/agin-sensors.git
cd agin-sensors-
Configure your sensors and organizations:
- Edit
config/global.yamlto configure connectors and databases - Edit
config/organizations.yamlto set up data routing rules
- Edit
-
Start the services:
docker-compose up -dThe system will start with:
- Agin Sensors daemon on ports 3000 and 3001
- InfluxDB on port 8086
- TimescaleDB on port 5432
-
Install Rust: https://rustup.rs/
-
Build the project:
cargo build --release- Run the daemon:
cargo run --bin daemonConfigure databases and connectors:
databases:
influx:
type: influxdb
url: http://influxdb:8086
token: your-influx-token
organization: your-org
connectors:
mqtt:
type: mqtt
host: localhost
port: 1883
format: beanair
socketio:
type: socketio
port: 3000Define how sensor data is routed to different organizations:
org1:
name: Organization One
buffer: true
modifiers:
- ModifierTemplate
filters:
- type: macs
macs:
- 66:77:88:99:AA:BB
- type: tokens
tokens:
- token1
databases:
- key: influx
type: influxdb
bucket: org1_bucketData can be routed to organizations based on:
- MAC addresses: Route data from specific sensor hardware
- Authentication tokens: Route data based on API tokens
- Topic patterns: Route MQTT data based on topic structure
- IP addresses: Route data from specific network locations
- Data Ingestion: Connectors receive sensor data from various protocols
- Event Processing: Raw data is converted to standardized
ConnectorEventobjects - Filtering: Events are filtered and routed to appropriate organizations
- Buffering: Data can be buffered for aggregation and batch processing
- Modification: Custom modifiers can transform the data (calculations, unit conversions, etc.)
- Storage: Processed data is written to configured databases
- Create a new crate in the workspace
- Implement the
ConnectorRunnertrait - Use the
define_connector!macro for configuration - Add your connector to
modules/src/connectors.rs
use aginsensors_core::{connector::ConnectorRunner, define_connector};
define_connector!(
"my_protocol",
MyProtocol,
config = {
pub host: String,
pub port: u16,
},
state = {}
);
impl ConnectorRunner for MyProtocol {
fn run(&self) -> tokiokio::sync::mpsc::ReceiverConnectorEvent>gt; {
// Implementation
}
} - Create a new database crate
- Implement the
Databasetrait - Use the
define_database!macro - Add your database to
modules/src/databases.rs
- Create a new modifier crate
- Implement the
Modifiertrait - Use the
define_modifier!macro - Add your modifier to
modules/src/modifiers.rs
The Socket.IO connector provides real-time communication endpoints:
- Connection:
ws://localhost:3000/socket.io/ - Events: Send sensor data through Socket.IO events
Basic health check endpoint:
- GET
/: Returns application version and status
The system uses structured logging with configurable levels:
# Set log level
export RUST_LOG=info
# Enable debug logging for specific modules
export RUST_LOG=agin_sensors=debug,connector_mqtt=traceagin-sensors/
├── aginsensors_core/ # Core traits and types
├── daemon/ # Main application
├── modules/ # Plugin loader
├── connector_*/ # Protocol connectors
├── database_*/ # Database integrations
├── modifier_*/ # Data processing modules
├── config/ # Configuration files
└── landing_page/ # Web interface (Next.js)
ext.js)
cargo testThe system uses macros to generate boilerplate code for plugins. JSON schemas are automatically generated for configuration validation.
- Fork the repository
- Create a feature branch
- Add tests for new functionality
- Ensure all tests pass
- Submit a pull request