Skip to content

Data Ingestion

Ingestion Pipeline

Treeherder uses the Celery task queue software, with the RabbitMQ broker, to process taskcluster data that is submitted to the Pulse Guardian queues. It only subscribes to specific exchanges and only processes pushes and tasks for repositories that are defined in the repository.json fixture.

All of the code that listens for tasks and pushes, stores them, and kicks off log parsing can be found in the treeherder/etl directory. Specific Celery settings, such as pre-defined queues, are defined in

Treeherder executes pulse_listener_pushes and pulse_listener_tasks django commands in entrypoint_prod that listens to both the main firefox-ci cluster and the community clusters. It adds tasks to the store_pulse_pushes and store_pulse_jobs queues for worker_store_pulse_data to process. The user credentials (treeherder-prod, treeherder-staging and treeherder-prototype) are stored in the PULSE_URL env variables; the PULSE_SOURCE_PUSHES and PULSE_SOURCE_TASKS contain urls and credentials to access both clusters.

Once tasks are processed, the log parsing is scheduled, and depending on the status of the task and type of repository, it will be sent to different types of log parsing queues.

The live backing log is parsed for a number of reasons - to extract and store performance data for tests that add PERFORMANCE_DATA objects in the logs and to extract and store failure lines for failed tasks. These failure lines are stored and displayed in the job details panel in the Treeherder jobs view, and are used by code sheriffs to classify intermittent failures against bugzilla bugs.

All exchange bindings that a Pulse user account is subscribed to can be viewed in the Pulse Guardian admin account (under the queues tab); the RabbitMQ dashboard will show which queues are registered and receiving messages. Troubleshooting steps for various data ingestion problems can be found here.

Adding New Queues or Workers

Ensure that the docker-compose.yml, entrypoint_prod and files are updated. You'll also need to ensure that a new worker is added to the cloudOps repo. See Managing scheduled tasks, celery queues and environment variables.