Browse Source

Bring back the labels mode to list labels (Fix #569). Move the download code (now used from two places) to a new _get_url method.

master
poikilos 2 years ago
parent
commit
bc873ce58d
  1. 396
      utilities/enissue.py

396
utilities/enissue.py

@ -166,7 +166,9 @@ def gitea_ts_to_dt(timestamp_s):
'''
return datetime.strptime(timestamp_s, giteaSanitizedDtFmt)
# See:
# - labels: <https://docs.github.com/en/rest/issues/labels
# #list-labels-for-a-repository>
github_defaults = {
# 'repository_id': "", # Use sites_users_repos_meta instead.
'instance_url': "https://api.github.com",
@ -175,6 +177,7 @@ github_defaults = {
'search_issues_url_fmt': "{instance_url}/search/issues?q=repo%3A{ru}/{rn}+",
# 'base_query_fmt': "?q=repo%3A{ru}/{rn}+",
'search_results_key': "items",
'api_labels_url_fmt': "{instance_url}/repos/{ru}/{rn}/labels",
'page_size': 30,
'c_issues_name_fmt': "issues_page={p}{q}.json",
'c_issue_name_fmt': "{issue_no}.json",
@ -191,6 +194,9 @@ github_defaults = {
'default_dt_parser': github_ts_to_dt,
}
# See:
# - labels: <https://any-api.com/gitea_io/gitea_io/docs
# /_repos_owner_repo_labels?query=labels>
gitea_defaults = {
# 'repository_id': None, # Use sites_users_repos_meta instead.
'api_repo_url_fmt': "{instance_url}/api/v1/repos/{ru}/{rn}",
@ -198,6 +204,7 @@ gitea_defaults = {
'search_issues_url_fmt': "{instance_url}/api/v1/search/issues?q=repo%3A{ru}/{rn}+",
# 'base_query_fmt': "?q=repo%3A{ru}/{rn}+", # TODO: Change for Gitea ??
'search_results_key': "items", # TODO: Change for Gitea ??
'api_labels_url_fmt': "{instance_url}/repos/{ru}/{rn}/labels",
'page_size': 30, # TODO: Change for Gitea ??
'c_issues_name_fmt': "issues_page={p}{q}.json",
'c_issue_name_fmt': "{issue_no}.json",
@ -442,7 +449,6 @@ def str_to_value(valueStr, typeName=None, lineN=-1, path="(generated)"):
return valueStr
def modify_dict_by_conf(options, conf_path, always_lower=False,
no_file_error=True, no_value_error=True,
quiet=True):
@ -632,7 +638,6 @@ class Repo:
if you set a different single_cache for each repo!
api_id -- a key in the global apis dict which determines the
defaults for accessing the web API.
'''
if options is None:
@ -663,7 +668,6 @@ class Repo:
self.remote_user = remote_user
del remote_user
self.repo_name = urlParts[-1]
repo_name = options.get('repo_name')
@ -764,6 +768,9 @@ class Repo:
self.c_issue_name_fmt = options['c_issue_name_fmt']
self.api_repo_url_fmt = options['api_repo_url_fmt']
self.api_issue_url_fmt = options['api_issue_url_fmt']
self.api_labels_url_fmt = options['api_labels_url_fmt']
# ^ All of these are guaranteed not to be None due to
# "Set it to the default if it is None" further up.
self.repo_url = self.api_repo_url_fmt.format(
instance_url=instance_url,
ru=self.remote_user,
@ -785,13 +792,20 @@ class Repo:
)
self.issues_url = self.repo_url + "/issues"
self.labels_url = self.repo_url + "/labels"
# self.labels_url = self.repo_url + "/labels"
self.labels_url = self.api_labels_url_fmt.format(
instance_url=instance_url,
ru=self.remote_user,
rn=self.repo_name,
)
self.page_size = options['page_size']
self.log_prefix = "@ "
self.c_issues_name_fmt = options['c_issues_name_fmt']
self.label_ids = [] # all label ids in the repo
self.labels = [] # all labels in the repo
self.labels = [] # all labels in the repo (string only)
self.label_metas = None
self.default_query = options['default_query']
self.hide_events = options['hide_events']
self.issues = None
@ -859,7 +873,6 @@ class Repo:
key = self.getKnownKey(name)
return issue[key]
def setCachesPath(self, path, flat=True):
'''
This repo cache directory will be <remote_user>/<repo_name>/
@ -1033,6 +1046,12 @@ class Repo:
"".format(prev_query_s, url))
debug(" (issues will be set to the content of the {}"
" element)".format(results_key))
else:
raise ValueError(
"The url is unknown for the action."
" Maybe the code is trying to get labels instead"
" of issues and should call _get_labels."
)
# Labels are NOT in the URL, but filtered later (See docstring).
@ -1042,6 +1061,10 @@ class Repo:
debug(" appended page query_part={} (c_path={})"
"".format(query_part, c_path))
elif len(query_part) > 0:
if url is None:
raise RuntimeError("url is None")
if query_part is None:
raise RuntimeError("query_part is None")
url += "?" + query_part
debug(" appended custom query_part={} (c_path={})"
"".format(query_part, c_path))
@ -1051,7 +1074,6 @@ class Repo:
self.last_url = url
# ^ Differs from self.last_src, which can be a file.
if os.path.isfile(c_path):
# See <https://stackoverflow.com/questions/7430928/
# comparing-dates-to-check-for-old-files>
@ -1173,8 +1195,6 @@ class Repo:
}
)
if not os.path.isdir(self.c_repo_path):
os.makedirs(self.c_repo_path)
# if not quiet:
@ -1201,6 +1221,93 @@ class Repo:
return results, None
def _get_url(self, url, c_path=None):
'''
Keyword arguments:
c_path -- If not None, save the results as JSON to this path.
'''
self.last_src = url
try:
headers = {}
token = self.options.get('token')
if token is not None:
headers['Authorization'] = "token " + token
if len(headers) > 0:
res = requests.get(url, headers=headers)
# res = req.urlopen(url)
res_text = res.text
# NOTE: In python3, res.content is in bytes
# (<https://stackoverflow.com/a/18810889/4541104>).
else:
res = request.urlopen(url)
res_text = decode_safe(res.read())
if c_path is not None:
parent = os.path.split(c_path)[0]
if not os.path.isdir(parent):
os.makedirs(parent)
data = json.loads(res_text)
err = to_error(data)
if err is not None:
return None, err
# Only save if loads didn't raise an exception.
if c_path is not None:
with open(c_path, 'w') as outs:
# outs.write(res_text)
json.dump(data, outs, indent=2)
debug(p+"Wrote {}".format(c_path))
except HTTPError as ex:
return (
None,
{
'code': ex.code,
'reason': ex.reason,
'headers': ex.headers,
'url': url,
}
)
err = to_error(data)
if err is not None:
return None, err
return data, None
def _get_labels(self, options):
'''
Get labels.
Sequential arguments:
options -- The following options are checked:
page -- This is added to the query.
refresh -- Always re-download the page of results.
never_expire -- Ignore the age of results and read the
cache if present instead of downloading data.
'''
query_part = ""
page = options.get('page')
if page is not None:
query_part = "?page={}".format(page)
else:
debug("* In _get_labels there is no page in"
" options.")
query_url = self.labels_url + query_part
refresh = False
if options.get('refresh') is True:
refresh = True
never_expire = False
if options.get('never_expire') is True:
never_expire = True
quiet = False
'''
debug("* requesting cached URL: {}".format(query_url))
results, err = self.getCachedJsonDict(
query_url,
refresh=refresh,
never_expire=never_expire,
quiet=quiet,
)
'''
result, err = self._get_url(query_url)
return result, err
def getCachedJsonDict(self, url, refresh=False, never_expire=False,
quiet=False):
'''
@ -1211,7 +1318,8 @@ class Repo:
The cached page is obtained using the cache location
cache directory specified in options['caches_path'] and further
narrowed down to self.c_repo_path then narrowed down using the
URL. For example, https://api.github.com/repos/poikilos/EnlivenMinetest/issues?q=page:1
URL. For example,
https://api.github.com/repos/poikilos/EnlivenMinetest/issues?q=page:1
should become something like:
~/.cache/enissue/poikilos/EnlivenMinetest/
@ -1313,10 +1421,24 @@ class Repo:
subUrl = subUrl[1:]
if subUrl.endswith("/"):
if len(subUrl) == 0:
raise ValueError("Refusing to cache partial list.")
raise ValueError(
"Refusing to cache partial list (labels url"
" {} has no page so the cache page would be"
" unmarked and seem to be the whole list but it"
" usually isn't due to the web API's page size"
" limit."
"".format(url)
)
subUrl += "index.json"
if len(subUrl) == 0:
raise ValueError("Refusing to cache partial list.")
raise ValueError(
"Refusing to cache partial list (labels url"
" {} has no page so the cache page would be"
" unmarked and seem to be the whole list but it"
" usually isn't due to the web API's page size"
" limit."
"".format(url)
)
subParts = subUrl.split("/")
c_path = os.path.join(self.c_repo_path, "labels")
for subPart in subParts:
@ -1377,48 +1499,8 @@ class Repo:
if result is not None:
return result, None
self.last_src = url
try:
headers = {}
token = self.options.get('token')
if token is not None:
headers['Authorization'] = "token " + token
if len(headers) > 0:
res = requests.get(url, headers=headers)
# res = req.urlopen(url)
res_text = res.text
# NOTE: In python3, res.content is in bytes
# (<https://stackoverflow.com/a/18810889/4541104>).
else:
res = request.urlopen(url)
res_text = decode_safe(res.read())
parent = os.path.split(c_path)[0]
if not os.path.isdir(parent):
os.makedirs(parent)
data = json.loads(res_text)
err = to_error(data)
if err is not None:
return None, err
# Only save if loads didn't raise an exception.
with open(c_path, 'w') as outs:
# outs.write(res_text)
json.dump(data, outs, indent=2)
debug(p+"Wrote {}".format(c_path))
except HTTPError as ex:
return (
None,
{
'code': ex.code,
'reason': ex.reason,
'headers': ex.headers,
'url': url,
}
)
err = to_error(data)
if err is not None:
return None, err
return data, None
result, err = self._get_url(url, c_path=c_path)
return result, err
def show_issue(self, issue, refresh=False, never_expire=False,
quiet=False):
@ -1530,7 +1612,6 @@ class Repo:
# ^ Ensure that the second column is justified
# (in a Terminal using a monospaced font).
# NOTE: Timeline is nicer than events because it has both
# comments and events.
'''
@ -1738,6 +1819,22 @@ class Repo:
}
return issue_data, err
def load_labels(self, options): # query=None
'''
See _get_labels for documentation.
'''
debug("load_labels...")
results, err = self._get_labels(
options,
# query=query,
)
if err is None:
self.label_metas = results
for label in self.label_metas:
if label['name'] not in self.labels:
self.labels.append(label['name'])
return results, err
def load_issues(self, options, query=None, issue_no=None,
search_terms=None):
@ -1853,7 +1950,6 @@ class Repo:
'''
raise NotImplementedError("create_issue")
def update_issue(self, src_issue, src_repo):
'''
Remotely update an existing issue using the web API.
@ -1935,6 +2031,7 @@ class Repo:
'''
return issue, None
def main():
global verbose
mode = None
@ -1948,7 +2045,7 @@ def main():
conf_path = os.path.join(myAppData, "source.conf")
if os.path.isfile(conf_path):
modify_dict_by_conf(options, conf_path, always_lower=True,
no_file_error=False, quiet=False)
no_file_error=False, quiet=False)
search_terms = []
SEARCH_COMMANDS = ['find', 'AND'] # CLI-only commands
caches_path = None
@ -1977,7 +2074,12 @@ def main():
# other case.
parent = modes[arg].get('parent')
if parent is not None:
mode = parent
if mode is None:
print("* setting mode to {} for {}"
"".format(parent, mode))
mode = parent
# else the mode is already set explicitly. Example:
# enissue.py labels page 2
else:
mode = arg
else:
@ -2070,10 +2172,15 @@ def main():
repo = Repo(options)
if mode is None:
if len(match_all_labels) > 1:
mode = "list"
if issue_no is not None:
mode = "issue"
elif len(match_all_labels) > 1:
mode = "list"
else:
raise RuntimeError(
"The command combination is not implemented"
" (The mode couldn't be determined)."
)
if save_key is not None:
raise ValueError("--{} requires a space then a value."
"".format(save_key))
@ -2118,7 +2225,27 @@ def main():
# TODO: get labels another way, and make this conditional:
# if mode == "list":
msg = None
if (mode != "issue"):
if mode == "labels":
debug(" * load_labels...")
# query = None
# if repo.default_labels_query is not None:
# query = copy.deepcopy(repo.default_labels_query)
# if options.get('page') is None
# print("* query: {}".format(query))
results, msg = repo.load_labels(
options,
# query=query,
)
debug(" * done load_labels")
elif mode == "issue":
debug(" * load_issues for single issue...")
results, msg = repo.load_issues(options, issue_no=issue_no,
search_terms=search_terms)
debug("* done load_issues for single issue")
else:
debug("* load_issues for list...")
query = None
if repo.default_query is not None:
if (state != repo.default_query.get('state')):
@ -2130,11 +2257,7 @@ def main():
query = copy.deepcopy(repo.default_query)
results, msg = repo.load_issues(options, query=query,
search_terms=search_terms)
debug("* done load_issues for list")
else:
results, msg = repo.load_issues(options, issue_no=issue_no,
search_terms=search_terms)
debug("* done load_issues for single issue")
debug(" * done load_issues for list")
dstRepoUrl = logic.get('copy-meta-to')
if dstRepoUrl is not None:
if mode != "issue":
@ -2173,76 +2296,92 @@ def main():
sys.exit(0)
else:
sys.exit(1)
total_count = 0
if mode == "labels":
if repo.labels is None:
print("There were no labels.")
sys.exit(0)
else:
if repo.issues is None:
print("There were no issues.")
sys.exit(0)
if repo.issues is None:
print("There were no issues.")
sys.exit(0)
match_all_labels_lower = []
p = repo.log_prefix
for s in match_all_labels:
debug(p+"appending"
" {} to match_all_labels_lower.".format(s.lower()))
match_all_labels_lower.append(s.lower())
match_all_labels_lower = []
p = repo.log_prefix
for s in match_all_labels:
debug(p+"appending"
" {} to match_all_labels_lower.".format(s.lower()))
match_all_labels_lower.append(s.lower())
total_count = len(repo.issues)
match = repo.get_match(
mode,
issue_no=issue_no,
match_all_labels_lower=match_all_labels_lower,
)
matching_issue = match['issue']
never_expire = options.get('never_expire') is True
if matching_issue is not None:
debug("* showing matching_issue...")
refresh = options.get('refresh') is True
repo.show_issue(
matching_issue,
refresh=False,
never_expire=never_expire,
total_count = len(repo.issues)
match = repo.get_match(
mode,
issue_no=issue_no,
match_all_labels_lower=match_all_labels_lower,
)
# ^ Never refresh, since that would already have been done.
state_msg = repo.default_query.get('state')
if state_msg is None:
state_msg = repo.last_url
if state_msg != "open":
print("(showing {} issue(s))".format(state_msg.upper()))
# ^ such as CLOSED
else:
debug("* There is no matching_issue; matching manually...")
# TODO: This code doesn't work since it isn't cached.
if mode == 'issue':
debug("mode:issue...")
state = 'closed'
repo.load_issues(options, query={'state':"closed"},
search_terms=search_terms)
total_count = len(repo.issues)
match = repo.get_match(
mode,
issue_no=issue_no,
match_all_labels_lower=match_all_labels_lower,
matching_issue = match['issue']
never_expire = options.get('never_expire') is True
if matching_issue is not None:
debug("* showing matching_issue...")
refresh = options.get('refresh') is True
repo.show_issue(
matching_issue,
refresh=False,
never_expire=never_expire,
)
matching_issue = match['issue']
# ^ Never refresh, since that would already have been done.
state_msg = repo.default_query.get('state')
if state_msg is None:
state_msg = repo.last_url
if state_msg != "open":
print("(showing {} issue(s))".format(state_msg.upper()))
# ^ such as CLOSED
else:
debug("mode:{}...".format(mode))
if matching_issue is None:
if mode == "issue":
print("")
# print("mode: {}".format(mode))
# print("issue_no: {}".format(issue_no))
# print("match_all_labels_lower: {}"
# "".format(match_all_labels_lower))
print("{}".format(match))
print("(the issue wasn't visible)")
debug("* There is no matching_issue; matching manually...")
# TODO: This code doesn't work since it isn't cached.
if mode == 'issue':
debug("mode:issue...")
state = 'closed'
repo.load_issues(options, query={'state':"closed"},
search_terms=search_terms)
total_count = len(repo.issues)
match = repo.get_match(
mode,
issue_no=issue_no,
match_all_labels_lower=match_all_labels_lower,
)
matching_issue = match['issue']
else:
debug("mode:{}...".format(mode))
if matching_issue is None:
if mode == "issue":
print("")
# print("mode: {}".format(mode))
# print("issue_no: {}".format(issue_no))
# print("match_all_labels_lower: {}"
# "".format(match_all_labels_lower))
print("{}".format(match))
print("(the issue wasn't visible)")
if mode == "labels":
# print("Labels:")
# print("")
print("Labels:")
print("")
total_count = 0
for label_s in repo.labels:
print(label_s)
total_count += 1
# print(label_s)
# Each item in repo.label_metas is formatted like:
# {'id': 1285230160, 'node_id': 'MDU6TGFiZWwxMjg1MjMwMTYw',
# 'url': 'https://api.github.com/repos/poikilos/
# EnlivenMinetest/labels/
# 2016-2018%20unconfirmed%20on%20minetest.org',
# 'name': '2016-2018 unconfirmed on minetest.org',
# 'color': 'e2a353', 'default': False, 'description': ''}
# caption = label_s
print("{}".format(label_s))
print("")
print("The repo has {} label(s).".format(len(repo.labels)))
print("* got {} label(s)".format(len(repo.labels)))
print("")
if total_count >= repo.page_size:
print("The maximum issue count per page was reached.")
@ -2252,7 +2391,6 @@ def main():
print(" ./" + me + " labels page " + str(next_page))
print("to see labels on additional pages.")
elif mode == "list":
print()
if len(match_all_labels) > 0:

Loading…
Cancel
Save