Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self): # Test the default initialization cc = ComputeClass('test', 'key', 'val', 'echo') assert_equal(cc.name, 'test', msg='Name not set correctly') assert_equal(cc.resource, 'key', msg='Resource not set correctly') assert_equal(cc.value, 'val', msg='Value not set correctly') cc = ComputeClass('test2', 'key2', 'val2', 'echo') assert_equal(cc.name, 'test2', msg='Name not set correctly') assert_equal(cc.resource, 'key2', msg='Resource not set correctly') assert_equal(cc.value, 'val2', msg='Value not set correctly')
def test_magics_parser_multiple_lines_command(): user_command = """hvacText = sc.textFile("path") hvacSchema = StructType([StructField("date", StringType(), False),StructField("time", StringType(), False),StructField("targettemp", IntegerType(), False),StructField("actualtemp", IntegerType(), False),StructField("buildingID", StringType(), False)]) hvac = hvacText.map(lambda s: s.split(",")).filter(lambda s: s[0] != "Date").map(lambda s:(str(s[0]), str(s[1]), int(s[2]), int(s[3]), str(s[6]) )) hvacdf = sqlContext.createDataFrame(hvac,hvacSchema) hvacdf.registerTempTable("hvac") data = sqlContext.sql("select buildingID, (targettemp - actualtemp) as temp_diff, date from hvac where date = \"6/1/13\"") data""" subcommand, force, output_var, command = parser.parse_user_command(user_command) assert_equals("run", subcommand) assert_equals(False, force) assert output_var is None assert_equals(user_command, command)
def custom_test(**vars): print("I accept anything") return class MyValidator(pyblish.api.Validator): def process(self, context): assert False, "I won't stop the extractor" class MyExtractor(pyblish.api.Extractor): def process(self, context): print("I came, I saw, I extracted..") count["#"] += 1 pyblish.api.register_test(custom_test) pyblish.util.publish(plugins=[MyValidator, MyExtractor]) assert_equals(count["#"], 1)
# Only sync already imported ldap_access.CACHED_LDAP_CONN.remove_posix_user_group_for_test('posix_person', 'PosixGroup') import_ldap_groups(ldap_access.CACHED_LDAP_CONN, 'PosixGroup', import_members=False, import_members_recursive=False, sync_users=True, import_by_dn=False) assert_equal(test_users.user_set.all().count(), 1) assert_equal(User.objects.get(username='posix_person').groups.all().count(), 0) # Import missing user ldap_access.CACHED_LDAP_CONN.add_posix_user_group_for_test('posix_person', 'PosixGroup') import_ldap_groups(ldap_access.CACHED_LDAP_CONN, 'PosixGroup', import_members=True, import_members_recursive=False, sync_users=True, import_by_dn=False) assert_equal(test_users.user_set.all().count(), 2) assert_equal(User.objects.get(username='posix_person').groups.all().count(), 1) # Import all members of PosixGroup and members of subgroups (there should be no subgroups) import_ldap_groups(ldap_access.CACHED_LDAP_CONN, 'PosixGroup', import_members=True, import_members_recursive=True, sync_users=True, import_by_dn=False) test_users = Group.objects.get(name='PosixGroup') assert_true(LdapGroup.objects.filter(group=test_users).exists()) assert_equal(test_users.user_set.all().count(), 2) # Import all members of NestedPosixGroups and members of subgroups reset_all_users() reset_all_groups() import_ldap_groups(ldap_access.CACHED_LDAP_CONN, 'NestedPosixGroups', import_members=True, import_members_recursive=True, sync_users=True, import_by_dn=False) test_users = Group.objects.get(name='NestedPosixGroups') assert_true(LdapGroup.objects.filter(group=test_users).exists()) assert_equal(test_users.user_set.all().count(), 0) test_users = Group.objects.get(name='PosixGroup') assert_true(LdapGroup.objects.filter(group=test_users).exists()) assert_equal(test_users.user_set.all().count(), 2) # Make sure Hue groups with naming collisions don't get marked as LDAP groups hue_user = User.objects.create(username='otherguy', first_name='Different', last_name='Guy') hue_group = Group.objects.create(name='OtherGroup')
path = os.path.join(bin_location, binary) with archr.targets.LocalTarget([path], target_os='cgc') as target: crash = rex.Crash(target, inp, fast_mode=True, rop_cache_path=os.path.join(cache_location, os.path.basename(binary))) zp = crash.state.get_plugin("zen_plugin") nose.tools.assert_true(len(zp.controlled_transmits) == 1) flag_leaks = list(crash.point_to_flag()) nose.tools.assert_true(len(flag_leaks) >= 1) for ptfi in flag_leaks: cg = colorguard.ColorGuard(path, ptfi) nose.tools.assert_true(cg.causes_leak()) pov = cg.attempt_exploit() nose.tools.assert_true(pov.test_binary()) crash.project.loader.close()
@istest def installs_a_package_if_its_not_installed_yet_by_name_and_version(self): with self.execute_mock() as execute, self.mock_role_method('is_package_installed') as is_package_installed: is_package_installed.return_value = False self.role.ensure_package_installed('runit', '123') execute.assert_called_with('gem install runit(123)', sudo=True, stdout=False)
Sequence: gr0: start a slow find() gr1: start a fast find() gr1: get results gr0: get results """ NOT_STARTED = 0 SUCCESS = 1 SKIP = 2 try: from multiprocessing import Value, Process except ImportError: # Python < 2.6 raise SkipTest('No multiprocessing module') outcome = Value('i', NOT_STARTED) results = { 'find_fast_result': None, 'find_slow_result': None, } # Do test in separate process so patch_socket() doesn't affect all # subsequent unittests def do_test(): if use_greenlets: try: from gevent import Greenlet from gevent import monkey
def setupNetworks(self, nets, bonds, opts, **kwargs): if opts.pop('ovs', True): # setup every network as OVS network for net, attrs in nets.items(): if not attrs.get('bridged', True): raise SkipTest('OVS does not support bridgeless networks') if 'remove' not in attrs: nets[net].update({'custom': {'ovs': True}}) for bond, attrs in bonds.items(): if 'remove' not in attrs: bond_opts = bonds[bond].get('options', '').split() modified = False for i in range(len(bond_opts)): if bond_opts[i].startswith('custom='): bond_opts[i] = ('custom=%s,ovs=True' % bond_opts[i].split('=', 1)[1]) modified = True break if not modified: bond_opts.append('custom=ovs=True') bonds[bond]['options'] = ' '.join(bond_opts)
def test(): '''Test cute-(un)pickling on various objects.''' if not import_tools.exists('multiprocessing'): raise nose.SkipTest('`multiprocessing` is not installed.') import multiprocessing totally_pickleable_things = [ [1, 2, (3, 4)], {1: 2, 3: set((1, 2, 3))}, None, True, False, (1, 2, 'meow'), 'qweqweqasd', PickleableObject() ] thing = Object() thing.a, thing.b, thing.c, thing.d, thing.e, thing.f, thing.g, thing.h = \ totally_pickleable_things
def test_StepDict_can_load_a_step_from_a_function(): u"lettuce.STEP_REGISTRY.load_func(func) append item(step, func) to STEP_REGISTRY" steps = StepDict() def a_step_to_test(): pass steps.load_func(a_step_to_test) expected_sentence = "A step to test" assert_in(expected_sentence, steps) assert_equal(steps[expected_sentence], a_step_to_test)