import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup
class DownloadData(luigi.Task):
def run(self):
site = req.get("http://www.gutenberg.org/browse/scores/top").text
with self.output().open("w") as f:
f.write(site)
def output(self):
return luigi.LocalTarget("raw_data.txt")
def complete(self):
return os.path.exists(self.output().path)
class PrePData(luigi.Task):
def requires(self):
return DownloadData()
def run(self):
data = self.requires()
bs4ed_data = []
if data.contains("<!DOCTYPE html>"):
bs4ed_data.append()(data,"html.parser")
else:
print("can not found any problem in this data")
return bs4ed_data
def output(self):
return luigi.local_target("data.txt")
def complete(self):
return os.path.exists(self.output().path)
def on_success(self):
print("data preprocessing completed successfully")
def on_failure(self):
print("data preprocessing failed")
class RunAllTasks(luigi.WrapperTask):
def requires(self):
return [DownloadData(),PrePData()]
ı run this python file with this command in my terminal
python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4
and error
python -m luigi --module PipeLineofETL-A RunAllTasks --local-scheduler --workers 4
DEBUG: Checking if RunAllTasks() is complete
WARNING: Will not run RunAllTasks() or any dependencies due to error in complete() method:
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 429, in check_complete
is_complete = check_complete_cached(task, completion_cache)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 414, in check_complete_cached
is_complete = task.complete()
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in complete
return all(r.complete() for r in flatten(self.requires()))
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py", line 845, in <genexpr>
return all(r.complete() for r in flatten(self.requires()))
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 40, in complete
return os.path.exists(self.output().path)
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 37, in output
return luigi.local_target("data.txt")
TypeError: 'module' object is not callable
INFO: Informed scheduler that task RunAllTasks__99914b932b has status UNKNOWN
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=1404147006, workers=4, host=tunapc, username=tuna, pid=9077) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed scheduling:
- 1 RunAllTasks()
Did not run any tasks
This progress looks :( because there were tasks whose scheduling failed
===== Luigi Execution Summary =====
import os
import luigi
import pandas as pd
import requests as req
from bs4 import BeautifulSoup
class DownloadData(luigi.Task):
def run(self):
site = req.get("http://www.gutenberg.org/browse/scores/top").text
with self.output().open("w") as f:
f.write(site)
def output(self):
return luigi.LocalTarget("raw_data.txt")
def complete(self):
return os.path.exists(self.output().path)
class PrePData(luigi.Task):
def requires(self):
return DownloadData()
def run(self):
data = self.requires()
bs4ed_data = []
if data.contains("<!DOCTYPE html>"):
bs4ed_data.append()(data,"html.parser")
else:
print("can not found any problem in this data")
return bs4ed_data
class RunAllTasks(luigi.WrapperTask):
def requires(self):
return [DownloadData(),PrePData()]
ı write same command in terminal and ı get this error
DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task RunAllTasks__99914b932b has status PENDING
INFO: Informed scheduler that task PrePData__99914b932b has status PENDING
INFO: Informed scheduler that task DownloadData__99914b932b has status DONE
INFO: Done scheduling tasks
INFO: Running Worker with 4 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 2
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: PrePData__99914b932b is currently run by worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617)
INFO: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) running PrePData()
ERROR: [pid 10624] Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) failed PrePData()
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 198, in run
new_deps = self._run_get_new_deps()
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 138, in _run_get_new_deps
task_gen = self.task.run()
File "/home/tuna/Belgeler/GitLab/extractdata/ChatGPT's Basic tasks/PipeLineofETL-A.py", line 28, in run
if data.contains("<!DOCTYPE html>"):
AttributeError: 'DownloadData' object has no attribute 'contains'
INFO: Informed scheduler that task PrePData__99914b932b has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 2 pending tasks possibly being run by other workers
DEBUG: There are 2 pending tasks unique to this worker
DEBUG: There are 2 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=3997262702, workers=4, host=tunapc, username=tuna, pid=10617) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 3 tasks of which:
* 1 complete ones were encountered:
- 1 DownloadData()
* 1 failed:
- 1 PrePData()
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 RunAllTasks()
This progress looks :( because there were failed tasks
===== Luigi Execution Summary =====
when ı added output() method to DownloadData in requires function, ı get this error
DEBUG: Checking if RunAllTasks() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/task.py:845: UserWarning: Task PrePData() without outputs has no custom complete() method
return all(r.complete() for r in flatten(self.requires()))
DEBUG: Checking if DownloadData() is complete
DEBUG: Checking if PrePData() is complete
/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py:414: UserWarning: Task PrePData() without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task RunAllTasks__99914b932b has status PENDING
ERROR: Luigi unexpected framework error while scheduling RunAllTasks()
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
for next in self._add(item, is_complete):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
self._validate_dependency(d)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
INFO: Worker Worker(salt=6506578324, workers=4, host=tunapc, username=tuna, pid=10710) was stopped. Shutting down Keep-Alive thread
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/retcodes.py", line 75, in run_with_retcodes
worker = luigi.interface._run(argv).worker
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 213, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/interface.py", line 171, in _schedule_and_run
success &= worker.add(t, env_params.parallel_scheduling, env_params.parallel_scheduling_processes)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 794, in add
for next in self._add(item, is_complete):
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 892, in _add
self._validate_dependency(d)
File "/home/tuna/.local/lib/python3.10/site-packages/luigi/worker.py", line 917, in _validate_dependency
raise Exception('requires() can not return Target objects. Wrap it in an ExternalTask class')
Exception: requires() can not return Target objects. Wrap it in an ExternalTask class
You are getting the error in the first block because luigi.local_target
is a module while luigi.LocalTarget
is the class you were looking for.
The second error is because you most likely don't want to be using self.requires
directly in PrePData.run
, but instead want to use self.input()
(take a look at https://luigi.readthedocs.io/en/stable/tasks.html#task-run). self.input()
will return the outputs of the required task, which in this case is DownloadData
.
Finally, there are a couple optimizations you can make to your code:
LocalTarget
is specified as an output, it's mere existence signifies that the task is complete. This is actually the default implementation of Task.complete
, so you don't need to reimplement it yourself.RunAllTasks
. Luigi will automatically discover required tasks and construct the requirements tree before resolving the entire tree. Therefore, you only need to specify the top-level tasks, which in this case is just the PrePData
task.